In [None]:
#!pip install --upgrade google-api-python-client
#!pip install pysentimiento
#!pip install --upgrade pandas
#!pip install --upgrade matplotlib

In [2]:
import os
import requests
import re
import json
import time # to call the .sleep() method to include a pause that respects potential respect API limits
import pandas as pd
import matplotlib.pyplot as plt

from googleapiclient.discovery import build
from youtube_api import YouTubeDataAPI
from youtube_api import parsers as P
from datetime import datetime

YT_KEY = os.environ.get('YOUTUBE_API_KEY') # Key is saved in ~/.zshrc
api_key = YT_KEY
yt = YouTubeDataAPI(YT_KEY)

In [3]:
from requests.exceptions import HTTPError, ReadTimeout

In [4]:
class DateTimeEncoder(json.JSONEncoder):
    """Custom encoder for datetime objects."""
    def default(self, obj):
        if isinstance(obj, datetime):
            return obj.isoformat()  # Convert datetime to ISO format string
        # Let the base class default method raise the TypeError
        return super().default(obj)


In [11]:
directory = '../data/other_media'

In [None]:
'''searches = yt.search(q='alexandria ocasio-cortez',
                     max_results=5)
print(searches[0])'''

In [None]:
'''
vid_mtdata = yt.get_video_metadata(video_id="N8TCILfoxqk", parser=P.parse_video_metadata, part=['statistics','snippet'])
print(vid_mtdata)
'''

In [None]:
'''
vid_comments = yt.get_video_comments(video_id="N8TCILfoxqk", parser=P.parse_comment_metadata, get_replies=True,
                                     max_results=None, next_page_token=False,
                                     part=['snippet'])
'''

In [None]:
''' 
# Method get playlist from channel

ch_playlists = yt.get_playlists(channel_id, next_page_token=False, parser=P.parse_playlist_metadata, part=['id', 'snippet', 'contentDetails'])

'''

In [None]:
'''
df_prsd_comments = pd.DataFrame(vid_comments)
df_prsd_comments.head(5)
'''

In [None]:
''' Function to get the ID of a YouTube channel using the channel's display_name'''

def find_channel_id_by_display_name(display_name, api_key):
    # Define the endpoint and parameters for the request
    search_url = 'https://www.googleapis.com/youtube/v3/search'
    params = {
        'part': 'snippet',
        'q': display_name,
        'type': 'channel',
        'key': api_key
    }
    
    # Make the GET request
    response = requests.get(search_url, params=params)
    
    # Check if the request was successful
    if response.status_code == 200:
        # Parse the response
        search_results = response.json()
        
        # Loop through the search results and find the matching channel
        for item in search_results.get('items', []):
            # Check if the snippet's channel title closely matches the search query (display name)
            if item['snippet']['channelTitle'].lower() == display_name.lower():
                return item['snippet']['channelId']
        
        # If no exact match is found, return a message
        return "No exact match found for the display name."
    else:
        # If the request was not successful, return an error message with the status code
        return f"Failed to retrieve data: HTTP Status Code {response.status_code}"


In [None]:
''' Function to get the ID of a YouTube channel using the Channel's username'''

def get_channel_id(channel_username, api_key):
    try:
        # Make a GET request to the YouTube Data API's channels.list method
        response = requests.get(
            'https://www.googleapis.com/youtube/v3/channels',
            params={
                'part': 'id',
                'forUsername': channel_username,
                'key': api_key
            }
        )
        
        # Check if the request was successful
        if response.status_code == 200: # status code of 200 indicates successful request (API endpoint was reached without any errors)
            response_json = response.json() # parse the JSON content returned by the API into a Python dictionary
            items = response_json.get('items', []) # retrieves 'items' list from JSON object
            
            if items:
                # Assuming the first item is the correct channel
                channel_id = items[0]['id']
                return channel_id
            else:
                return "No channel found for the given username."
        else:
            return "Failed to retrieve data: HTTP Status Code {}".format(response.status_code)
    except Exception as e:
        # Handle potential exceptions, such as a network error
        return "An error occurred: {}".format(e)


In [None]:
# Create variables for channel user name or display name

display_name_list = ['América Noticias', 'Diario El Comercio', 'Latina Noticias',
                     'Latina Televisión', 'Minsa Peru', 'Peru21 TV Channel', 'TVPerú Noticias'] # List of channel usernames
chan_names_list = ['24horasperu', 'afpes', 'AméricaNoticiasOficial', 'BBCMundo', 'cnnee',
                     'DiarioElComercio', 'LaRepublica', 'latinanoticias', 'Latinape',
                     'MinsaPeruSalud', 'DiarioPeru21', 'PresidenciaPeru', 'tvperunoticias', 'Wayka']  # The display name of the channel I am searching for


In [None]:
# Dictionary to store channel usernames and their IDs
channel_ids = {}

unmatched_list = []

# Loop through each channel username in the list
for channel_username in chan_names_list:
    # Call get_channel_id for each username
    channel_id = get_channel_id(channel_username, api_key)
    if channel_id == "No channel found for the given username.":
        unmatched_list.append(channel_username)

        
        channel_id = find_channel_id_by_display_name(channel_username, api_key) # Using the find_channel_id_by_display_name
    
    # Store the result in the dictionary
    channel_ids[channel_username] = channel_id

#Debug: Print resulting dictionary
print(channel_ids)
print(unmatched_list)


In [None]:
channel_ids = {}

unmatched_list = []

# Loop through each channel username in the list
for channel_disp_name in display_name_list:
    # Call function find_channel_id_by_display_name to get the channel id
    channel_id = find_channel_id_by_display_name(channel_disp_name, api_key)
    if channel_id == "No exact match found for the display name.":
        unmatched_list.append(channel_username)       
    
    # Store the result in the dictionary
    channel_ids[channel_disp_name] = channel_id

#Debug: Print resulting dictionary
print(channel_ids)
print(unmatched_list)


In [None]:
"Get Channel ID using the channel handle (`@channel_handle`) or the username (e.g. `AméricaNoticiasOficial`)


filename = "VIDS_TO_GET_disinfo_Sinopharm.csv"
filepath = os.path.join(directory, filename)

# Build the YouTube service object
youtube = build('youtube', 'v3', developerKey=api_key)

try:
    df1 = pd.read_csv(filepath)
except FileNotFoundError as e:
    print(f"File not found: {e}")
except pd.errors.ParserError as e:
    print(f"Error parsing CSV file: {e}")

# Convert the 'Ch. Username' column to string and handle NaN values
df1["Ch. Username"] = df1["Ch. Username"].astype(str)


ch_username = df1["Ch. Username"]

seen_channel_handlename = set()
seen_unmatchd_hndl = set()

for handle in ch_username:
    if handle == 'nan':
        pass
    elif handle.startswith('@') and handle not in seen_channel_handlename:
        seen_channel_handlename.add(handle)
    elif not handle.startswith('@') and handle not in seen_channel_handlename:
        seen_unmatchd_hndl.add(handle)

#print(seen_channel_handlename)
#print(seen_unmatchd_hndl)

ch_id_dict = {}
try:
    for i in seen_channel_handlename:
        print(f"Retrieving id for {i}")
        # Execute the search request
        request = youtube.channels().list(
                part="contentDetails,id,snippet",
                forHandle=i,
                forUsername=None
            )
        response = request.execute()
        ch_id_dict[i] = response
except Exception as e:
        # Handle potential exceptions, such as a network error or API error
        print("An error occurred for channel {}: {}".format(i, e))

try:
    for j in seen_unmatchd_hndl:
        print(f"Retrieving id for {j}")
        # Execute the search request
        request = youtube.channels().list(
                part="contentDetails,id,snippet",
                forHandle=None,
                forUsername=j
            )
        response = request.execute()
        ch_id_dict[j] = response
        #print(response)
except Exception as e:
        # Handle potential exceptions, such as a network error or API error
        print("An error occurred for channel {}: {}".format(j, e))

In [None]:
# Save search result in json file
chID_file = "media_chIDs.json"
file_path = os.path.join(directory, chID_file)

with open(file_path, 'w') as file:
    json.dump(ch_id_dict, file, cls=DateTimeEncoder, indent=4)

print(f"Data saved to {file_path}")

In [None]:
# Function to extract video ID from YouTube video URLs
def extract_video_id(url):
    match = re.search(r'v=([a-zA-Z0-9_-]{11})', url)
    return match.group(1) if match else None

In [None]:
# Get video IDs from YouTube URLs in CSV
filename = "VIDS_TO_GET_disinfo_Sinopharm.csv"
file_path = os.path.join(directory, filename)
df2 = pd.read_csv(file_path)

# Convert the 'URL' column to string
df2["URL"] = df2["URL"].astype(str)

# Apply the function to the URL column
df2['video_id'] = df2['URL'].apply(extract_video_id)

# Convert the 'video_id' column to string
df2['video_id'] = df2['video_id'].astype(str)
#df2

# Save the dataframe with the new column
df2.to_csv(filepath, index=False)


In [19]:
# Code for youtube.videos.list (multiple video IDs)

filename = "VIDS_TO_GET_disinfo_Sinopharm.csv"
file_path = os.path.join(directory, filename)
df2 = pd.read_csv(file_path)

v_ids = df2['video_id']
v_id_list = []
for id in v_ids:
    v_id_list.append(id)


# Build the YouTube service object
youtube = build('youtube', 'v3', developerKey=api_key)

video_list = []
try:
    for id in v_id_list:
        request = youtube.videos().list(
                part="snippet,contentDetails,statistics",
                id=id
        )
        response = request.execute()
        video_list.append(response)

except Exception as e:
        # Handle potential exceptions, such as a network error or API error
        print("An error occurred for channel {}: {}".format(id, e))

print(video_list)

[{'kind': 'youtube#videoListResponse', 'etag': 'ASLAbWloRUZ6rAtEkStLThLzOhc', 'items': [{'kind': 'youtube#video', 'etag': 'GbDrnPZ55zJXIZpuR6MWR_8fzlU', 'id': 'VTEQQsm0ME8', 'snippet': {'publishedAt': '2021-08-05T02:17:55Z', 'channelId': 'UCsakKsRww3J7pM6DavkrAIQ', 'title': 'Minsa: vacuna Sinopharm muestra 94% de efectividad', 'description': 'Varias personas programadas para recibir su segunda dosis de Pfizer en el vacunatorio de Plaza Norte, no pudieron recibirla ya que estas se habían agotado.\r\n\r\nEsto se da en medio de los cuestionamientos y rechazos de un grupo de la población tras el uso de la vacuna Sinopharm, representantes del Instituto Nacional de Salud brindaron una conferencia para asegurar la efectividad de la vacuna.\r\n\r\n"Hay una alta probabilidad de tener una tercera ola\xa0(…) y la vacuna de Sinopharm es altamente efectiva para prevenir la muerte", señaló desde la sede ministerial de la avenida Salaverry, junto al director del INS, Víctor Suárez Moreno.\r\n\r\nDe a

In [7]:
video_file = "other_media_videos.json"
file_path = os.path.join(directory, video_file)

with open(file_path, 'w') as file:
    json.dump(video_list, file, cls=DateTimeEncoder, indent=4)

print(f"Data saved to {file_path}")

NameError: name 'video_list' is not defined

In [12]:
video_file = "other_media_videos.json"
file_path = os.path.join(directory, video_file)

In [13]:


with open(file_path, 'r') as file:
    vids = json.load(file)

videos = []
for i in vids:
    items = i["items"]
    for item in items:
        new_video_dict = {}
        snippet = item["snippet"]
        stats = item["statistics"]
        
        new_video_dict['channel_id'] = snippet["channelId"]
        new_video_dict['channel_title'] = snippet["channelTitle"]
        new_video_dict['publish_date'] = snippet["publishedAt"]
        new_video_dict['video_id'] = item['id']
        new_video_dict['video_title'] = snippet["title"]
        new_video_dict['video_description'] = snippet["description"]
        new_video_dict['view_count'] = stats["viewCount"]
        new_video_dict['like_count'] = stats["likeCount"]
        new_video_dict['favorite_count'] = stats["favoriteCount"]
        new_video_dict['comment_count'] = stats["commentCount"]
        
        videos.append(new_video_dict)

name = "other_media_videos.csv"
path = os.path.join(directory, name)

df3 = pd.DataFrame(videos)
df3.to_csv(path, index=False)

In [39]:
#df_willax_sinoph.dtypes
df3.info

<bound method DataFrame.info of                   channel_id                         channel_title  \
0   UCsakKsRww3J7pM6DavkrAIQ                              24 Horas   
1   UCsakKsRww3J7pM6DavkrAIQ                              24 Horas   
2   UCsakKsRww3J7pM6DavkrAIQ                              24 Horas   
3   UCY8d83kIW6sK4au73qeZZTw                           AFP Español   
4   UCPhm2I2wk4vqjENwhn3px8A                      América Noticias   
5   UCPhm2I2wk4vqjENwhn3px8A                      América Noticias   
6   UCPhm2I2wk4vqjENwhn3px8A                      América Noticias   
7   UCPhm2I2wk4vqjENwhn3px8A                      América Noticias   
8   UCPhm2I2wk4vqjENwhn3px8A                      América Noticias   
9   UCUBIrDsIVzRpKsClMlSlTpQ                        BBC News Mundo   
10  UC_lEiu6917IJz03TnntWUaQ                        CNN en Español   
11  UCLtGUPjKLqa3zgdmhKCZONg             Diario El Comercio Videos   
12  UCLtGUPjKLqa3zgdmhKCZONg             Diario El Comerci

In [40]:
# Get comments from csv containing videos

filename = "other_media_videos.csv"
filepath = os.path.join(directory, filename)


video_list = []

try:
    df3 = pd.read_csv(filepath)
    #print(df.head())  # Display the first 5 rows of the dataframe
except FileNotFoundError as e:
    print(f"File not found: {e}")
except pd.errors.ParserError as e:
    print(f"Error parsing CSV file: {e}")

video_ids = df3["video_id"]

# Dictionary to store the comments for each video
video_comments_dict = {}

for video_id in video_ids: 
    try:
        vid_comments = yt.get_video_comments(video_id=video_id, parser=P.parse_comment_metadata, get_replies=True,
                                             max_results=None, next_page_token=False,
                                             part=['snippet'])
        
        # Assuming yt.get_video_comments raises an exception on failure and does not return a response object
        # Store the comments in a dictionary using the video_id as the key
        video_comments_dict[video_id] = vid_comments
        
    except Exception as e:
        # Handle potential exceptions, such as a network error or API error
        print("An error occurred for video ID {}: {}".format(video_id, e))
        
    time.sleep(5)  # Delay of 5 seconds between requests'''


In [41]:
video_comments_dict

{'VTEQQsm0ME8': [{'video_id': 'VTEQQsm0ME8',
   'commenter_channel_url': 'http://www.youtube.com/@Ochins36',
   'commenter_channel_id': 'UCQhG1PAsM_e7_sBE3oAHvIw',
   'commenter_channel_display_name': '@Ochins36',
   'comment_id': 'Ugw2p3oTYJNcyBYpopp4AaABAg',
   'comment_like_count': 0,
   'comment_publish_date': 1643076082.0,
   'text': 'Fui a un centro de salud a vacunarme con Pfizer y no quisieron, solo Sinopharm, no sé a dónde ir para vacunarme con Pfizer.',
   'commenter_rating': 'none',
   'comment_parent_id': None,
   'collection_date': datetime.datetime(2024, 5, 21, 20, 31, 11, 573510),
   'reply_count': 0},
  {'video_id': 'VTEQQsm0ME8',
   'commenter_channel_url': 'http://www.youtube.com/@adrianrodriguez9405',
   'commenter_channel_id': 'UCwP2esdQCm9hMmzOU_jyWCw',
   'commenter_channel_display_name': '@adrianrodriguez9405',
   'comment_id': 'UgwS1ltXsf0kc164tt14AaABAg',
   'comment_like_count': 0,
   'comment_publish_date': 1628512894.0,
   'text': 'Estos medios de comunicaci

In [43]:

comments_file = "comments_other_media_videos.json"
file_path = os.path.join(directory, comments_file)

with open(file_path, 'w') as file:
    json.dump(video_comments_dict, file, cls=DateTimeEncoder, indent=4)

print(f"Data saved to {file_path}")

Data saved to ../data/comments_other_media_videos.json


In [None]:
'''# Convert UNIX timestamp to human-readable date (the unit='s' specifies that the timestamp is in seconds)
df_willax_sinoph["video_publish_date"] = pd.to_datetime(df_willax_sinoph["video_publish_date"], unit='s', origin='unix')
df_willax_sinoph["video_publish_date"] = df_willax_sinoph["video_publish_date"].dt.date
df_willax_sinoph = df_willax_sinoph.sort_values(by="video_publish_date", ascending=True)
#print(df_willax_sinoph)'''

In [None]:
'''willax_sinoph_sub = df_willax_sinoph[['video_publish_date', 'video_id', 'video_title', 
                                      'video_description', 'video_view_count', 'video_comment_count', 
                                      'channel_title', 'channel_id']].rename(columns = {
        'video_publish_date': 'publish_date',
        'video_title': 'title', 
        'video_description': 'description',
        'video_view_count': 'views', 
        'video_comment_count': 'comment_count',
        'channel_title': 'channel' 
    }
)
willax_sinoph_sub'''

In [None]:
df_prsd_comments = pd.DataFrame(video_comments_dict['9K9Vpk2N38M'])
df_prsd_comments
