In [2]:
import datetime as dt
from typing import Union

import datarobot as dr
from logzero import logger
import pandas as pd
import yaml

from query_youtube import get_videos, compile_data, convert_to_dataframe

CONFIG_FILE = "config.yaml"
CREDENTIALS_FILE = "credentials.yaml"
CURRENT_HOUR = pd.Timestamp.now().floor("h")

with open(CONFIG_FILE, "r") as f:
    config = yaml.load(f, Loader=yaml.FullLoader)

with open(CREDENTIALS_FILE, "r") as f:
    credentials = yaml.load(f, Loader=yaml.FullLoader)
    YOUTUBE_API_KEY = credentials["youtube_api_key"]
    CLIENT = dr.Client(endpoint=credentials["datarobot"]["endpoint"], token=credentials["datarobot"]["api_token"])


In [3]:
def check_if_dataset_exists(name: str) -> Union[str, None]:
    """
    Check if a dataset with the given name exists in the AI Catalog
    """
    datasets = dr.Dataset.list()
    return next((dataset.id for dataset in datasets if dataset.name == name), None)


def write_new_dataset_to_catalog(df: pd.DataFrame, dataset_name) -> str:
    """
    Write the metadata and stats dataframes to the AI Catalog
    """
    dr_url = CLIENT.endpoint.split("/api")[0]
    catalog_id = dr.Dataset.create_from_in_memory_data(df, fname=dataset_name).id
    logger.info(f"Dataset {dataset_name} created: {dr_url + '/' + catalog_id}")
    return catalog_id


def write_new_version_to_catalog(stats_df: pd.DataFrame, dataset_id: str) -> None:
    """
    Write a new version of a dataset to the AI Catalog
    """
    dr_url = CLIENT.endpoint.split("/api")[0]
    current_stats = dr.Dataset.get(dataset_id).get_as_dataframe()
    if max(pd.to_datetime(current_stats["AS_OF_DATE"])) < CURRENT_HOUR:
        full_stats = pd.concat([current_stats, stats_df]).reset_index(drop=True)
        dr.Dataset.create_version_from_in_memory_data(dataset_id, full_stats)
        logger.info(f"New version of dataset created: {dr_url + '/' + dataset_id}")
    else:
        logger.info("Dataset already up to date. Not writing new version.")


def remove_old_dataset_versions(client: dr.client.RESTClientObject, dataset_id: str):
    """
    Clean up dataset when it gets too big
    """
    url = f"datasets/{dataset_id}/versions/"
    dataset_versions = client.get(url).json()
    logger.info(f"Found {dataset_versions['count']} versions of {dataset_id}")
    if dataset_versions['count'] > 75:
        sorted_versions = sorted(dataset_versions['data'], key=lambda x: pd.to_datetime(x['creationDate']))
        for version in sorted_versions[:-50]:
            url = f"datasets/{dataset_id}/versions/{version['versionId']}"
            client.delete(url)
        logger.info(f"Deleted {dataset_versions['count'] - 50} versions of {dataset_id}")

In [5]:
metadata_dataset_name= config['storage']['metadata']
stats_dataset_name = config['storage']['statistics']

stats_id = check_if_dataset_exists(stats_dataset_name)
if stats_id is not None:
    current_stats = dr.Dataset.get(stats_id).get_as_dataframe()
    last_updated = max(pd.to_datetime(current_stats["AS_OF_DATE"]))
else:
    last_updated = dt.datetime.now() - dt.timedelta(weeks=1000)

if last_updated == CURRENT_HOUR:
    logger.info("Data already up to date. Not pulling new data.")
else:
    stats_all_df, metadata_all_df = pd.DataFrame(), pd.DataFrame()
    for playlist in config["playlists"]:
        logger.info(f"Pulling data from playlist {playlist['name']}")
        videos = get_videos(playlist['id'], YOUTUBE_API_KEY)
        stats, metadata = compile_data(videos, YOUTUBE_API_KEY, CURRENT_HOUR)
        stats_df, metadata_df = convert_to_dataframe(stats, metadata)
        stats_all_df = pd.concat([stats_all_df, stats_df])
        metadata_df["PLAYLIST_NAME"] = playlist["name"]
        metadata_all_df = pd.concat([metadata_all_df, metadata_df])

    metadata_id = check_if_dataset_exists(metadata_dataset_name)
    if metadata_id is None:
        write_new_dataset_to_catalog(metadata_all_df, metadata_dataset_name)

    if stats_id is None:
        write_new_dataset_to_catalog(stats_all_df, stats_dataset_name)
    else:
        write_new_version_to_catalog(stats_all_df, stats_id)
        remove_old_dataset_versions(CLIENT, stats_id)


[I 240520 14:56:43 1618267384:16] Pulling data from playlist Endless Summer Vacation
[I 240520 14:56:43 query_youtube:81] Pulled Youtube Metadata on Flowers
[I 240520 14:56:43 query_youtube:81] Pulled Youtube Metadata on Jaded
[I 240520 14:56:43 query_youtube:81] Pulled Youtube Metadata on Rose Colored Lenses
[I 240520 14:56:43 query_youtube:81] Pulled Youtube Metadata on Used To Be Young
[I 240520 14:56:43 query_youtube:81] Pulled Youtube Metadata on Thousand Miles
[I 240520 14:56:43 query_youtube:81] Pulled Youtube Metadata on You
[I 240520 14:56:43 query_youtube:81] Pulled Youtube Metadata on Handstand
[I 240520 14:56:44 query_youtube:81] Pulled Youtube Metadata on River
[I 240520 14:56:44 query_youtube:81] Pulled Youtube Metadata on Violet Chemistry
[I 240520 14:56:44 query_youtube:81] Pulled Youtube Metadata on Muddy Feet
[I 240520 14:56:44 query_youtube:81] Pulled Youtube Metadata on Wildcard
[I 240520 14:56:44 query_youtube:81] Pulled Youtube Metadata on Island
[I 240520 14:56:4