## Importing necessary libraries

In [1]:
from Google import Create_Service
import re
from pprint import pprint
from googleapiclient import errors

import pandas as pd
import numpy as np

In [2]:
from googleapiclient.discovery import build

import os
import dotenv

dotenv.load_dotenv()

key = os.environ.get('API_KEY')
key

'AIzaSyAZ1mltDqECxD7ucaMwjf4kZRmgXoF-sk0'

In [3]:
youtube = build('youtube',
                'v3',
                developerKey=key)

In [4]:
request = youtube.channels().list(
        part='contentDetails',
        id='UCSJBJ3sP5GRUJMON12v28ew',
#         maxResults=50
)

In [5]:
response = request.execute()

In [6]:
response.keys()

dict_keys(['kind', 'etag', 'pageInfo', 'items'])

In [7]:
response

{'kind': 'youtube#channelListResponse',
 'etag': 'AJuRQ6tC7FIp3_uIqV5KCeJ2_9U',
 'pageInfo': {'totalResults': 1, 'resultsPerPage': 5},
 'items': [{'kind': 'youtube#channel',
   'etag': '0y8oy_fHW9alHHHykLsG4rWxtbM',
   'id': 'UCSJBJ3sP5GRUJMON12v28ew',
   'contentDetails': {'relatedPlaylists': {'likes': '',
     'uploads': 'UUSJBJ3sP5GRUJMON12v28ew'}}}]}

In [31]:
# Creating YouTube class to communicate with YouTube API
class YouTube:
    """
    Communicate with YouTube API

    ...

    Attributes:
        key: str
            Api key used to create service and authenticate user

    Methods:
        construct_service():
            Construct service using API_KEY
        upload_response():
            Retrieve all uploaded videos playlist's ID
        get_playlist_items():
            Retrieve all videos information from playlist
    """
    def __init__(self, key, scopes: list = None):
        # self.secret_file = secret_file
        self.key = key
        self.scopes = scopes

    # def construct_service(self):
    #     """
    #         Responsible for creating service instance from 'google.Create_Service'
    #     """
    #     API_SERVICE = 'youtube'
    #     API_VERSION = 'v3'
    #     service = Create_Service(self.secret_file, API_SERVICE, API_VERSION, self.scopes)
    #     return service

    def construct_service(self):
        """
        Creates service object from build method
        """

        API_SERVICE = 'youtube'
        API_VERSION = 'v3'
        service = build(
            API_SERVICE,
            API_VERSION,
            developerKey=self.key
        )
        return service

    @staticmethod
    def upload_response(service, channel_id: str) -> str:
        """
        Send request to retrieve uploaded videos response as playlist ID.

            Parameters:
                service: Instance of Create_Service()
                    service object created using construct_service()
                channel_id: str
                    Channel's id required for request
            Returns:
                str: playlist_id
        """
        request = service.channels().list(
            part='contentDetails',
            id=channel_id
        )

        response = request.execute()  # Send request and receive response

        # Extract playlist_id from the received response
        playlist_id = response['items'][0]['contentDetails']['relatedPlaylists']['uploads']

        return playlist_id

    @staticmethod
    def get_playlist_items(service, playlist_Id: str):
        """
        Retrieve all videos information from playlist.

        Parameters:
            service: Instance of Create_Service()
                service object created using construct_service()
            playlist_Id: str
                Id of the playlist from which to retrieve data

        Returns:
            List: contains information of all videos
        """
        try:
            # Create request to retrieve playlist items
            request = service.playlistItems().list(
                part='contentDetails',
                playlistId=playlist_Id,
                maxResults=50  # Max results per request (maximum: 50)
            )

            response = request.execute()  # Send request and receive response

            #print(response)

            playlist_items = response['items']  # Grabs only videos info from the response
            nextPageToken = response['nextPageToken']  # Grabs next page token

            print(nextPageToken)

            current_page = 1

            # Retrieve data while the next page is available
            while nextPageToken:
                request = service.playlistItems().list(
                    part='contentDetails',
                    playlistId=playlist_Id,
                    maxResults=50,  # max results per request (maximum: 50)
                    pageToken=nextPageToken
                )

                response = request.execute()  # Send request

                print(f'Current Page: {current_page}')  # prints current page
                current_page += 1

                # Add items to playlist_items that are retrieved from next page
                playlist_items.extend(response['items'])
                nextPageToken = response.get('nextPageToken')

                #print(response)
                print(nextPageToken)
                
            print(f'Total Videos found: {len(playlist_items)}')
                
        except KeyError as e:
            print(f'Total Videos found: {len(playlist_items)}')
            pass

        videos_id = []  # Holds all available videos id's

        # Go through playlist items list and retrieve all videos id's
        for video_id in playlist_items:
            try:
                id = video_id['snippet']['resourceId']['videoId']
                videos_id.append(id)

            except KeyError:
                id = video_id['contentDetails']['videoId']
                videos_id.append(id)

        videos_info = []  # Holds info about all available videos

        # Loop through all videos id's and retrieve info
        for batch_num in range(0, len(videos_id), 50):
            # Create batches of videos to request data
            videos_batch = videos_id[batch_num: batch_num + 50]  # Batch Size: 50

            # Send request to retrieve video's details
            response_videos = service.videos().list(
                # video details to be retrieved for each video
                part='contentDetails,snippet,statistics',
                id=videos_batch,
                maxResults=50
            ).execute()
            # batch items received from videos response
            batch_items = response_videos['items']
            # Adding batch items to videos_info list
            videos_info.extend(batch_items)

        return videos_info

    @staticmethod
    def convert_duration_to_seconds(duration: str) -> int:
        """
        Converts video duration to seconds

        Parameters:
            duration: str ->.
                time duration in format '00H00M00S'

        Returns:
            int: total number of seconds
        """

        h = int(re.search('\d+H', duration)[0][:-1]) * 60**2 if re.search('\d+H', duration) else 0
        m = int(re.search('\d+M', duration)[0][:-1]) * 60 if re.search('\d+M', duration) else 0
        s = int(re.search('\d+S', duration)[0][:-1]) if re.search('\d+S', duration) else 0
        return h + m + s

    @staticmethod
    def create_csv(data):
        for item in data:
            title = item['snippet']['title']
            print(title)

In [32]:
yt = YouTube(key)

In [10]:
service = yt.construct_service()

In [11]:
service

<googleapiclient.discovery.Resource at 0x22749787fa0>

In [12]:
response = yt.upload_response(service, 'UCSJBJ3sP5GRUJMON12v28ew')

In [13]:
response

'UUSJBJ3sP5GRUJMON12v28ew'

In [33]:
items = yt.get_playlist_items(service, 'UUSJBJ3sP5GRUJMON12v28ew')

Total Videos found: 47


In [29]:
type(items)

list

In [30]:
from pprint import pprint

pprint(items[0])

{'contentDetails': {'caption': 'false',
                    'contentRating': {},
                    'definition': 'hd',
                    'dimension': '2d',
                    'duration': 'PT42M39S',
                    'licensedContent': False,
                    'projection': 'rectangular'},
 'etag': 'gKtTkegC9DBQQNnMJxEkLAaoebU',
 'id': 'c6Y9VC70AVU',
 'kind': 'youtube#video',
 'snippet': {'categoryId': '27',
             'channelId': 'UCSJBJ3sP5GRUJMON12v28ew',
             'channelTitle': 'Financial Education',
             'defaultAudioLanguage': 'ur',
             'description': 'Analysis of financial statements of NETSOL '
                            'Technologies Limited Pakistan - know before '
                            'investing in Pakistan Stock Exchange (PSX)',
             'liveBroadcastContent': 'none',
             'localized': {'description': 'Analysis of financial statements of '
                                          'NETSOL Technologies Limited '
        

In [None]:
items[0]['id']

In [None]:
title = items[0]['snippet']['title']
title

In [None]:
date = items[0]['snippet']['publishedAt']
date[:10]

In [None]:
duration = items[0]['contentDetails']['duration']
duration

In [None]:
views = items[0]['statistics']['viewCount']
views

In [None]:
yt.convert_duration_to_seconds(duration)

In [None]:
import re

In [None]:
int(re.search('\d+S', 'PT42M39S')[0][:-1]) if re.search('\d+S', 'PT42M39S') else 0

In [None]:
int(re.search('\d+M', 'PT42M39S')[0][:-1]) * 60 if re.search('\d+M', 'PT42M39S') else 0

In [None]:
int(re.search('\d+H', 'PT42M39S')[0][:-1]) * 60**2 if re.search('\d+H', 'PT42M39S') else 0

In [None]:
yt.create_csv(items)

In [None]:
titles, dates, views, durations = [], [], [], []

for item in items:
    title = item['snippet']['title']
    date = item['snippet']['publishedAt'][:10]
    view = item['statistics']['viewCount']
    duration = item['contentDetails']['duration']
    duration = YouTube.convert_duration_to_seconds(duration)

    titles.append(title)
    dates.append(date)
    views.append(view)
    durations.append(duration)
    
    
len(titles), len(dates), len(views), len(durations)

In [None]:
yt_data = pd.DataFrame({
    'Title': titles,
    'Upload_Date': dates,
    'Views': views,
    'Duration': durations
})

In [None]:
yt_data.head()

In [None]:
yt_data.to_csv('data.csv', index=False)