## Initialization

In [1]:
import os
import json
import shutil
from pprint import pprint
from datetime import datetime
from tqdm.notebook import tqdm
import redis
import ray
from cineplex.db import get_db
from cineplex.config import Settings
from cineplex.logger import Logger
import cineplex.youtube as yt

settings = Settings()
logger = Logger()

pprint(settings)

Settings(log_name='cineplex', log_level='DEBUG', log_dir='./logs', log_to_console=True, mongo_url='mongodb://localhost:27017', mongo_db='cineplex', tmp_dir='./tmp', bkp_dir='./bkp', data_dir='./data', my_youtube_channel_id='UCqsUJL5xIWuidR7sIrPLhAw', youtube_channels_dir='/Volumes/Cineplex00/YouTube/channels')


In [None]:
ray.shutdown()
ray.init()

## Discover Videos

In [None]:
# build a recursive list of all files in a directory
def get_all_files(dir_path):
    all_files = []
    for root, dirs, files in os.walk(dir_path):
        for file in files:
            all_files.append(os.path.join(root, file))
    return all_files

# files = os.listdir(settings.youtube_channels_dir)
files = get_all_files(settings.youtube_channels_dir)
print(f'found {len(files)} files in {settings.youtube_channels_dir}')
with open('data/channel_files.json', 'w') as outfile:
    json.dump(files, outfile)

In [None]:
with open(os.path.join(settings.data_dir, 'channel_files.json'), 'r') as infile:
    channel_files = json.load(infile)


In [None]:
for root, _, files in os.walk(os.path.join(settings.data_dir, 'yt_playlist_items')):
    for file in files:
        if not file.startswith('yt_'):
            os.rename(os.path.join(root, file), os.path.join(root, f'yt_{file}'))


In [None]:
with open(os.path.join(settings.data_dir, 'bad_metadata.json'), 'r') as infile:
    bad_metadata = json.load(infile)

In [None]:
new_bad_metadata = [x for x in bad_metadata if x]
len(new_bad_metadata)



In [None]:
with open(os.path.join(settings.data_dir, 'bad_metadata.json'), 'w') as outfile:
    json.dump(new_bad_metadata, outfile)

### Move files from channel ID dirs to uploader dirs

In [None]:
import glob

channels_dir = "/Volumes/Cineplex00/youtube/channels"

for file in files:
    if file.endswith('.json'):
        # get dir from file name
        dir_name = os.path.dirname(file).split('/')[-1]
        full_dir_name = os.path.join(channels_dir, dir_name)

        if not os.path.exists(full_dir_name):
            continue

        if not os.path.exists(file):
            continue

        with open(file) as json_file:
            data = json.load(json_file)

        channel_id = data['channel_id'] if 'channel_id' in data else None
        if not channel_id:
            print(f'{file} does not have a channel_id')
            continue

        if dir_name != channel_id:
            continue

        uploader = data['uploader'] if 'uploader' in data else None
        if not uploader:
            print(f'{file} does not have an uploader')
            continue

        dst_dir = os.path.join(channels_dir, uploader)
        os.makedirs(dst_dir, exist_ok=True)

        file_glob = glob.glob(f"{file[:-9]}*")
        for f in file_glob:
            filename = os.path.basename(f)
            dst_file = os.path.join(dst_dir, filename)
            if os.path.exists(dst_file):
                print(f'🗑️ Deleting duplicate {f}')
                os.remove(f)
            else:
                print(f'🗂️ Copying {f} to {dst_dir}')            
                shutil.move(f, dst_dir)

        if not len(os.listdir(full_dir_name)):
            print(f'🗑️ Removing {full_dir_name}')
            shutil.rmtree(full_dir_name)

In [None]:
with open('data/missing_channels.json') as json_file:
    missing_channels = json.load(json_file)

In [None]:
channel_with_meta_batch = ytch.get_channel_from_youtube_batch(missing_channels)
print(f'channel_with_meta_batch: {len(channel_with_meta_batch)}')
ytch.save_channel_to_db_batch(channel_with_meta_batch)

## Dedupe Videos

In [None]:
filename_idx = {}

with open('data/file_list_videos.json') as json_file:
    data = json.load(json_file)
    print(len(data))
    for file in data:
        filename, ext = os.path.splitext(file)
        if filename in filename_idx:
            filename_idx[filename].append(ext)
        else:
            filename_idx[filename] = [ext]

dupes = []
for filename, extensions in filename_idx.items():
    if len(extensions) > 1:
        dupes.append(filename)

remove = []
print(f'found {len(dupes)} duplicate filenames')
for dup in dupes:
    sizes = {}
    for ext in filename_idx[dup]:
        size = os.path.getsize(os.path.join(settings.youtube_videos_dir, f'{dup}{ext}'))
        sizes[ext] = size

    # if len(sizes) > 2:
    #     print(f'{dup} has {len(sizes)} copies: {sizes}')

    smallest = min(sizes, key=sizes.get)

    # print(f'Keeping: {dup}{smallest} @ {sizes[smallest]}')
    sizes.pop(smallest)
    for ext in sizes:
        # print(f'Removing: {dup}{ext} @ {sizes[ext]}')
        remove.append(f'{dup}{ext}')

print(f'found {len(remove)} files to remove')

for file in remove:
    os.remove(os.path.join(settings.youtube_videos_dir, file))
    data.remove(file)

print(len(data))

# write the json file
with open('data/file_list_videos_deduped.json', 'w') as outfile:
    json.dump(data, outfile)

## Missing Ids

In [None]:
fragments = set()

file_list_videos_clean = []

with open(os.path.join(settings.data_dir, 'file_list_videos_deduped.json')) as json_file:
    data = json.load(json_file)

    for file in data:
        filename, ext = os.path.splitext(file)

        # extract the youtube id from the filename
        id = filename[-12:]

        # handle fragments
        if '.' in id:
            filename, ext = os.path.splitext(filename)
            id = filename[-11:]
            fragments.add(id)

        elif id[0] != '-':
            print(f'{id}|{filename}')

        else:
            file_list_videos_clean.append(file)

print(f'found {len(fragments)} fragments')
print(fragments)

with open('data/video_fragments.json', 'w') as outfile:
    json.dump(list(fragments), outfile)

with open('data/file_list_videos_clean.json', 'w') as outfile:
    json.dump(file_list_videos_clean, outfile)

## File Indices

### Video File Index

In [None]:
video_file_index = {}

with open(os.path.join(settings.data_dir, 'file_list_videos_clean.json')) as json_file:
    data = json.load(json_file)
    for file in data:
        filename, ext = os.path.splitext(file)
        id = filename[-11:]
        video_file_index[id] = {'id': id, 'filename': file}
with open(os.path.join(settings.data_dir, 'file_index_videos.json'), 'w') as outfile:
    json.dump(video_file_index, outfile)

In [None]:
with open(os.path.join(settings.data_dir, 'file_index_videos.json')) as json_file:
    video_file_index = json.load(json_file)

### Thumbnail File Index

In [None]:
thumbnail_file_index = {}

with open(os.path.join(settings.data_dir, 'file_list_thumbnails.json')) as json_file:
    data = json.load(json_file)
    for file in data:
        filename, ext = os.path.splitext(file)
        id = filename[-11:]
        thumbnail_file_index[id] = {'id': id, 'filename': file}
with open(os.path.join(settings.data_dir, 'file_index_thumbnails.json'), 'w') as outfile:
    json.dump(thumbnail_file_index, outfile)

In [None]:
with open(os.path.join(settings.data_dir, 'file_index_thumbnails.json')) as json_file:
    thumbnail_file_index = json.load(json_file)

### Metadata File Index

In [None]:
metadata_file_index = {}

with open(os.path.join(settings.data_dir, 'file_list_metadata.json')) as json_file:
    data = json.load(json_file)
    for file in data:
        # metadata files have two extensions
        filename, ext = os.path.splitext(file)
        filename, ext = os.path.splitext(filename)
        id = filename[-11:]
        metadata_file_index[id] = {'id': id, 'filename': file}
with open(os.path.join(settings.data_dir, 'file_index_metadata.json'), 'w') as outfile:
    json.dump(metadata_file_index, outfile)

In [None]:
with open(os.path.join(settings.data_dir, 'file_index_metadata.json')) as json_file:
    metadata_file_index = json.load(json_file)

## Missing Thumbnails and Metadata

In [None]:
missing_thumbnails = []
missing_metadata = []

for id in video_file_index.keys():
    if id not in thumbnail_file_index:
        missing_thumbnails.append(id)
    if id not in metadata_file_index:
        missing_metadata.append(id)

print(f'found {len(missing_thumbnails)} missing thumbnails')
with open(os.path.join(settings.data_dir, 'missing_thumbnails.json'), 'w') as outfile:
    json.dump(missing_thumbnails, outfile)

print(f'found {len(missing_metadata)} missing metadata')
with open(os.path.join(settings.data_dir, 'missing_metadata.json'), 'w') as outfile:
    json.dump(missing_metadata, outfile)

## Extract metadata and save to DB

In [None]:
%%time

@ray.remote
def extract_metadata(info_file):

    video_with_meta = ytv.extract_video_info_from_file(info_file)
    if video_with_meta is None:
        return info_file

    ytv.save_video_to_db(video_with_meta, False)
    return None

info_files = []

for x in channel_files:
    basename, ext = os.path.splitext(x)
    if ext == '.json':
        info_files.append(x)

print(f'Found {len(info_files)} info files')

futures = []
for x in tqdm(info_files):
    ref = extract_metadata.remote(x)
    futures.append(ref)

# bad_metadata = [lambda x: extract_metadata.remote(x, files_index[get_basename(x)]) for x in tqdm(info_files[:1])]
bad_metadata = [x for x in tqdm(ray.get(futures)) if x]

with open(os.path.join(settings.data_dir, 'bad_metadata.json'), 'w') as outfile:
    json.dump(bad_metadata, outfile)

print(f'Found {len(bad_metadata)} bad metadata')


## Move Files to Channel Dirs

In [None]:
import shutil

def move_file(src, dst):
    try:
        # print(f'Moving {src} to {dst}')
        if not os.path.exists(dst):
            os.makedirs(os.path.dirname(dst), exist_ok=True)
            shutil.move(src, dst)
        return True
    except Exception as e:
        logger.error(f'Failed to move {src} to {dst}: {e}')
        return False

@ray.remote
def move_files(id):

    info = get_db().get(f'video#{id}')
    if not info:
        logger.error(f'Failed to find {id} in db')
        return id
    info = json.loads(info)

    src_video_file = os.path.join(settings.youtube_videos_dir, info['video_file'])
    src_thumbnail_file = os.path.join(settings.youtube_thumbnails_dir, info['thumbnail_file'])
    src_metadata_file = os.path.join(settings.youtube_metadata_dir, info['metadata_file'])

    channel_id = info['channel_id']
    if not channel_id:
        channel_id = '__unknown__'
        
    dst_video_file = os.path.join(settings.youtube_channels_dir, channel_id, info['video_file'])
    dst_thumbnail_file = os.path.join(settings.youtube_channels_dir, channel_id, info['thumbnail_file'])
    dst_metadata_file = os.path.join(settings.youtube_channels_dir, channel_id, info['metadata_file'])

    res = []
    if not move_file(src_video_file, dst_video_file):
        res.append(src_video_file)
    if not move_file(src_thumbnail_file, dst_thumbnail_file):
        res.append(src_thumbnail_file)
    if move_file(src_metadata_file, dst_metadata_file):
        res.append(src_metadata_file)

    return res

video_ids = list(video_file_index.keys())
not_moved = [move_files.remote(x) for x in tqdm(video_ids)]
not_moved = [x for x in tqdm(ray.get(not_moved)) if len(x) > 0]

with open(os.path.join(settings.data_dir, 'not_moved.json'), 'w') as outfile:
    json.dump(not_moved, outfile)

print(f'{len(not_moved)} files not moved')

## Channels

### Save cached channel info to DB

In [None]:
all_files = []
for root, _, files in os.walk(os.path.join(settings.data_dir, 'channels')):
    for file in files:
        file = os.path.join(root, file)
        all_files.append(file)
print(f'Found {len(all_files)} files')

In [None]:
for file in all_files:
    with open(file, 'r') as infile:
        data = json.load(infile)
    if 'channel_id' in data:
        data['_id'] = data['channel_id']
        del data['channel_id']
    if '_id' not in data:
        logger.error(f'{file} has no channel_id: {data.keys()}')        
        continue
    save_channel(data, to_disk=False)

### Channel Playlists

In [None]:
channel_ids = ['UCqsUJL5xIWuidR7sIrPLhAw']
channel_with_meta_batch = get_channel_from_youtube_batch(channel_ids)
print(channel_with_meta_batch)

In [None]:
save_channel_to_db_batch(channel_with_meta_batch)

In [None]:
res = get_db().yt_ch_playlists.find_one({'_id': 'UCqsUJL5xIWuidR7sIrPLhAw'})
pprint(res)

### Resolve Channel Ids

#### Scan the channel directory for all subdirs

In [None]:
channel_ids = os.listdir(settings.youtube_channels_dir)
channel_ids = [x for x in channel_ids if x != '__unknown__']
logger.info(f'Found {len(channel_ids)} channels')

In [None]:
with open(os.path.join(settings.data_dir, 'channel_ids.json'), 'w') as outfile:
    json.dump(channel_ids, outfile)

In [None]:
with open(os.path.join(settings.data_dir, 'channel_ids.json'), 'r') as infile:
    channel_ids = json.load(infile)

### Request channel info from YouTube for a collection of channel ids

In [None]:
logger.info(f'Requesting info for {len(channel_ids)} channels')

# get 50 channels at a time
channels_with_meta = []
for i in range(0, len(channel_ids), 50):
    channels_with_meta += get_channel_from_youtube_batch(channel_ids[i:i+50])

logger.info(f'Retrieved info for {len(channels_with_meta)} channels')

In [None]:
with open(os.path.join(settings.data_dir, 'channels_with_meta.json'), 'w') as outfile:
    json.dump(channels_with_meta, outfile, indent=2)

In [None]:
with open(os.path.join(settings.data_dir, 'channels_with_meta.json'), 'r') as infile:
    channels_with_meta = json.load(infile)

save_channels(channels_with_meta)

In [None]:
%%time

channels_with_meta_db = get_channels_from_db(channel_ids)

channel_meta_ids = set([x['_id'] for x in channels_with_meta_db])

missing_ids = [x for x in channel_ids if x not in channel_meta_ids]

print(f'{len(missing_ids)} channels missing meta')
print(missing_ids)

print(len(channels_with_meta))

In [None]:
missing_meta = get_channels_from_youtube(missing_ids)

In [None]:
channels_with_meta_index = {}
for x in channels_with_meta:
    channels_with_meta_index[x['channel_id']] = x

### Rename channel id dirs

In [None]:
def safe_name(name):
    return name.replace('/', '_')

In [None]:
channel_dirs = os.listdir(settings.youtube_channels_dir)
for channel_dir in channel_dirs:
    if channel_dir in channels_with_meta_index:
        meta = channels_with_meta_index[channel_dir]
        title = safe_name(meta["channel"]["snippet"]["title"])
        print(f'{channel_dir} => {title}')
        src_dir = os.path.join(settings.youtube_channels_dir, channel_dir)
        dst_dir = os.path.join(settings.youtube_channels_dir, title)
        if os.path.exists(dst_dir):
            shutil.copytree(src_dir, dst_dir, dirs_exist_ok=True)
        else:
            os.rename(src_dir, dst_dir)

## Download Video

In [None]:
try:
    res = download_video('https://www.youtube.com/watch?v=BaW_jenozKc')

    info = res['info']
    id = info['id']
    title = info['title']
    channel = info['channel']
    channel_id = info['channel_id']
    video_filename = res['video_filename'] 
    thumbnail_filename = res['thumbnail_filename']
    info_filename = res['info_filename']

    print(f'{id=}\n{title=}\n{channel=}\n{channel_id=}\n{video_filename=}\n{thumbnail_filename=}\n{info_filename=}')

except Exception as e:
    # Logging is already being performed in the download_video function
    print(e)


In [None]:
    # # Get channel details
    # request = youtube.channels().list(
    #     part="snippet,contentDetails,statistics",
    #     mine=True
    #     # id="UCqsUJL5xIWuidR7sIrPLhAw",
    # )
    # request = youtube.search().list(
    #     channelId = CHANNEL_ID,
    #     part = 'id,snippet',
    #     type = 'video',
    #     publishedAfter = '2018-12-31T23:59:59Z',
    #     publishedBefore = '2020-01-01T00:00:00Z',
    #     order = 'date',
    #     fields = 'nextPageToken,items(id,snippet)',
    #     maxResults = 50
    # )
    # video_data = {}


## Mongo

### Migrate from Redis

In [None]:
with open(os.path.join(settings.data_dir, f'playlists_{settings.youtube_my_channel_id}.json'), 'r') as infile:
    playlists = json.load(infile)

    # remove the channel_id key
    playlists['_id'] = playlists['channel_id']
    del playlists['channel_id']

    get_db().yt_ch_playlists.insert_one(playlists)

In [None]:
@ray.remote
def migrate_video_info(id):

    try:
        redis_db = redis.Redis(host=settings.db_host, port=settings.db_port, db=settings.db)
        info = redis_db.get(f'video#{id}')
        if not info:
            logger.error(f'Failed to find {id} in db')
            return id
        info = json.loads(info)

        info['_id'] = info['id']
        del info['id']

        get_db().yt_videos.insert_one(info)

        return None

    except Exception as e:
        logger.error(f'Failed to migrate video info for {id}: {e}')
        return id

# from ray.util import inspect_serializability
# inspect_serializability(migrate_video_info, name='migrate_video_info')

with open(os.path.join(settings.data_dir, 'file_index_videos.json'), 'r') as infile:
    video_ids = list(json.load(infile).keys())

    not_migrated = [migrate_video_info.remote(id) for id in tqdm(video_ids)]
    not_migrated = [x for x in ray.get(not_migrated) if x is not None]

    with open(os.path.join(settings.data_dir, 'not_migrated.json'), 'w') as outfile:
        json.dump(not_migrated, outfile)

    print(f'{len(not_migrated)} files not migrated')


In [None]:
cursor = get_db().yt_videos.find({})

videos = list(cursor)

skipped = 0
for video in videos[:1]:
    print(video)
    continue
    if 'info' in video:
        skipped += 1
        continue
    video_with_meta = {}
    video_with_meta['_id'] = video['_id']
    video_with_meta['as_of'] = str(datetime.now())
    video_with_meta['channel_id'] = video['channel_id']
    video['id'] = video['_id']
    del video['_id']
    video_with_meta['info'] = video

    get_db().yt_videos.update_one({'_id': video['id']}, {'$set': video_with_meta}, upsert=True)

print(f'{skipped} videos skipped')

In [None]:
foo = {
    '_id': 'bar'
}

print(foo['_id'])

## Offline

In [None]:
with open(os.path.join(settings.data_dir, 'offline_playlists.json'), 'r') as infile:
    offline_playlists = json.load(infile)

with open(os.path.join(settings.data_dir, 'offline_channels.json'), 'r') as infile:
    offline_channels = json.load(infile)


In [None]:
x = list(ytpl.get_offline_playlists_from_db())
for y in x:
    print(f"{y['_id']} {y['playlist']['snippet']['title']} @ {y['offline_as_of']}")

In [None]:
x = ytch.get_offline_channels_from_db()
for y in x:
    print(f"{y['_id']} {y['channel']['snippet']['title']} @ {y['offline_as_of']}")

## Fix Video Channel Titles