In [1]:
from typing import Iterator
from typing import List 
import glob
import difflib
import sys
import os 


import json

from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build

In [2]:
def get_authenticated_service(client_secret_file: str, scopes: List[str],
                              api_service_name: str, api_version: str):
    flow = InstalledAppFlow.from_client_secrets_file(client_secret_file, scopes)
    credentials = flow.run_local_server()
    return build(api_service_name, api_version, credentials=credentials)


current_directory = os.getcwd()
client_secret_filename = "client_secret_file.json"
client_secret_path = os.path.join(current_directory, client_secret_filename)
scopes = ['https://www.googleapis.com/auth/youtube.readonly']
api_service_name = 'youtube'
api_version = 'v3'

service = get_authenticated_service(client_secret_path, scopes, api_service_name, api_version)

Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=324687515765-ied91lfmrsgkqb39cjmecdubjn03tovk.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A8080%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fyoutube.readonly&state=1FQr3eDB6XkgmQlAJQbHjwgrVwvowz&access_type=offline


In [3]:
def get_authenticated_readonly_service(client_secret_file: str):
    scopes = ['https://www.googleapis.com/auth/youtube.readonly']
    api_service_name = 'youtube'
    api_version = 'v3'
    return get_authenticated_service(client_secret_file, scopes, api_service_name, api_version)

In [4]:
def get_authenticated_readwrite_service(client_secret_file: str):
    scopes = ['https://www.googleapis.com/auth/youtube']
    api_service_name = 'youtube'
    api_version = 'v3'
    return get_authenticated_service(client_secret_file, scopes, api_service_name, api_version)

In [5]:

def paginated_results(youtube_listable_resource, list_request, limit_requests=10) -> Iterator:
    remaining = -1 if limit_requests is None else limit_requests
    while list_request and remaining != 0:
        list_response = list_request.execute()
        yield list_response
        # see googleapiclient/discovery.py createNextMethod for *_next methods
        list_request = youtube_listable_resource.list_next(list_request, list_response)
        remaining -= 1

In [6]:
def dump_json_to_file(obj, filename):
    with open(filename, 'w', encoding='utf-8') as f:
        json.dump(obj, f)

In [7]:
def get_my_uploads_playlist_id(youtube):
    # see https://developers.google.com/youtube/v3/docs/channels/list
    channels_response = youtube.channels().list(
        mine=True,
        part='contentDetails'
    ).execute()
    
    for channel in channels_response['items']:
        return channel['contentDetails']['relatedPlaylists']['uploads']

    return None


In [8]:
def download_playlist_video_snippets(youtube, playlist_id, file_prefix):
    # see https://developers.google.com/youtube/v3/docs/playlistItems/list
    playlistitems_list_request = youtube.playlistItems().list(
        playlistId=playlist_id,
        part='snippet',
        maxResults=50
    )

    results = paginated_results(youtube.playlistItems(), playlistitems_list_request)
    for page_no, playlistitems_list_response in enumerate(results):
        dump_json_to_file(playlistitems_list_response, f'{file_prefix}{page_no}.json')

In [9]:
os.makedirs('data', exist_ok=True)

def main():
    client_secret_file = client_secret_path
    youtube = get_authenticated_readonly_service(client_secret_file)
    uploads_playlist_id = get_my_uploads_playlist_id(youtube)
    file_prefix = 'data/my_videos_page_'
    if uploads_playlist_id is not None:
        download_playlist_video_snippets(youtube, uploads_playlist_id, file_prefix)
    else:
        print('There is no uploaded videos playlist for this user.')

if __name__ == "__main__":
    main()


Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=324687515765-ied91lfmrsgkqb39cjmecdubjn03tovk.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A8080%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fyoutube.readonly&state=lZZmy4MA8ibDNfLys4JDgOwpXcKdP3&access_type=offline


In [10]:
def format_json_file(input_file, output_file):
    with open(input_file, 'r', encoding='utf-8') as f:
        data = json.load(f)

    with open(output_file, 'w', encoding='utf-8') as f:
        json.dump(data, f, indent=4)

# Example usage with your input file path
input_file = r"data\my_videos_page_0.json"
output_file = 'formatted_output.json'  # Replace with the desired output file path

format_json_file(input_file, output_file)


In [11]:
def get_videos_from_json_files(files):
    videos = []
    for filename in files:
        with open(filename, 'r', encoding='utf-8') as f:
            playlist_list_response = json.load(f)
            videos.extend(playlist_list_response['items'])
    return videos



In [12]:
def load_text_to_prepend(file) -> str:
    with open(file, encoding='utf-8') as f:
        return ''.join(f.readlines()).strip() + '\n\n'

In [13]:
def confirm_diff(old: str, new: str) -> bool:
    print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< OLD")
    print(old)
    print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> NEW")
    print(new)
    print("<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< DIFF")
    differ = difflib.unified_diff(old.splitlines(keepends=True), new.splitlines(keepends=True))
    sys.stdout.writelines(differ)
    print(">>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DONE")
    while True:
        decision = input("Does this look correct [Y/n]? ")
        decision = decision.lower()
        if decision == '' or 'y' in decision:
            return True
        elif 'n' in decision:
            return False
        print("Invalid input, try again")

In [14]:
def update_description_on_youtube(youtube, video_id, new_description):
    if '<' in new_description or '>' in new_description:
        raise ValueError('new_description cannot contain < or >')

    # see https://developers.google.com/youtube/v3/docs/videos/list
    videos_list_response = youtube.videos().list(
        id=video_id,
        part='snippet'
    ).execute()

    if not videos_list_response['items']:
        raise KeyError(f'Video {video_id} was not found.')

    # Since the request specified a video ID, the response only contains one
    # video resource. This code extracts the snippet from that resource.
    videos_list_snippet = videos_list_response['items'][0]['snippet']

    if videos_list_snippet['description'] == new_description:
        print(f'Video {video_id}: new description and old description are the same, skipping...')
        return

    if not confirm_diff(old=videos_list_snippet['description'], new=new_description):
        print('diff rejected, aborting program...')
        sys.exit(0)

    # user has confirmed the new version looks good, go ahead
    videos_list_snippet['description'] = new_description

    # it seems like a bug in the youtube api that this needs to be done
    # a downloaded video may have no tags,
    # but omitting tags during upload results in 400 error
    if 'tags' not in videos_list_snippet:
        videos_list_snippet['tags'] = []

    # see https://developers.google.com/youtube/v3/docs/videos/update
    videos_update_response = youtube.videos().update(
        part='snippet',
        body=dict(
            snippet=videos_list_snippet,
            id=video_id
        )).execute()

    if videos_update_response['snippet']['description'] != new_description:
        raise RuntimeError('update failed')

In [15]:
def get_videos_from_json_files(files):
    videos = []
    for filename in files:
        with open(filename, 'r', encoding='utf-8') as f:
            playlist_list_response = json.load(f)
            videos.extend(playlist_list_response['items'])
    return videos


def load_text_to_prepend(file) -> str:
    with open(file, encoding='utf-8') as f:
        return ''.join(f.readlines()).strip() + '\n\n'


def main():
    # Create the 'videos' variable inside the main function
    files = glob.glob('data/my_videos_page_*.json')
    videos = get_videos_from_json_files(files)
    print(f'loaded {len(videos)=}')

    # Filter videos based on description
    videos = [v for v in videos if 'Visit us online at www.southphillyfridge.com !' not in v['snippet']['description'].lower()]

    # Load the new text to prepend
    new_text = load_text_to_prepend(r"fridge_donate.txt")
    
    # Create new descriptions
    new_descriptions = {
        v['snippet']['resourceId']['videoId']: new_text + v['snippet']['description']
        for v in videos
    }

    if input("Start update process (must type YES)? ") != "YES":
        return

    client_secret_file = client_secret_path
    youtube = get_authenticated_readwrite_service(client_secret_file)

    for video_id, new_description in new_descriptions.items():
        update_description_on_youtube(youtube, video_id, new_description)

main()

loaded len(videos)=4


Start update process (must type YES)?  YES


Please visit this URL to authorize this application: https://accounts.google.com/o/oauth2/auth?response_type=code&client_id=324687515765-ied91lfmrsgkqb39cjmecdubjn03tovk.apps.googleusercontent.com&redirect_uri=http%3A%2F%2Flocalhost%3A8080%2F&scope=https%3A%2F%2Fwww.googleapis.com%2Fauth%2Fyoutube&state=xqiSnFRPVGiSnwF5LcSFNOTuZLvKlH&access_type=offline
<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< OLD

>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> NEW
Visit us online at www.southphillyfridge.com !


<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<<< DIFF
--- 
+++ 
@@ -0,0 +1,2 @@
+Visit us online at www.southphillyfridge.com !
+
>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>>> DONE


Does this look correct [Y/n]?  Y


RuntimeError: update failed