# **Prototyping Data Pipeline**
In this notebook, I'm going to be writing a bunch of functions that prototype a data pipeline for this app. Once I write the functions, I'll move them out of this notebook and into a utility file that the main pipeline script can also access. 

# Setup
The cells below will set up the rest of the notebook.

I'll start by configuring the kernel: 

In [None]:
# Change the working directory 
%cd ..

# Enable the autoreload extension, which will automatically load in new code as it's written
%load_ext autoreload
%autoreload 2

Now I'll import some necessary modules:

In [None]:
# General import statements
import pandas as pd
from pytubefix import YouTube, Channel
from google.cloud import bigquery
import traceback
import time
import random
from tqdm import tqdm
import pandas_gbq
import datetime
import uuid
from datetime import timedelta
from pathlib import Path
from google.cloud import storage
from google.cloud.exceptions import NotFound

# Importing custom utility functions
import utils.gbq as gbq_utils
import utils.youtube as youtube_utils
import utils.gcs as gcs_utils

# Indicate whether or not we want tqdm progress bars
tqdm_enabled = True

# Set some constants for the project
GBQ_PROJECT_ID = "neural-needledrop"
GBQ_DATASET_ID = "backend_data"

# Set the pandas_gbq context to the project ID
# pandas_gbq.context.project = GBQ_PROJECT_ID

# Checking GBQ Table
The **very** first thing I need to do: check the actual `video_metadata` GBQ table to determine the most recent video I've downloaded. 

In [None]:
# Define the query that'll grab the most recent video url
most_recent_video_url_query = """
SELECT
  metadata.url
FROM
  `neural-needledrop.backend_data.video_metadata` metadata
ORDER BY
  publish_date DESC, scrape_date DESC
LIMIT 1
"""

# Use pandas-gbq to run the query
most_recent_video_url_df = pd.read_gbq(most_recent_video_url_query, project_id=GBQ_PROJECT_ID)

# If the length of the dataframe is zero, then we need to set the url to None
if len(most_recent_video_url_df) == 0:
    most_recent_video_url = None

# Otherwise, we can just grab the url from the dataframe
else:
    most_recent_video_url = most_recent_video_url_df.iloc[0]["url"]

# Identifying New Videos
The first portion of the pipeline: determining if there are any videos to work with in the first place! 

I'll start by parameterizing the method: 

In [None]:
# Parameterize the identification method
video_limit = 250 # If video_limit is None, then we're going to download information for all of the videos

# Define the channel of interest
channel_url = "https://www.youtube.com/c/theneedledrop"

most_recent_video_url = None

# Indicate the step size for parsing the videos
video_parse_step_size = 100

Now that I've got the method scoped out, I'm going to write it. I'll identify the first couple of videos. 

In [None]:
def get_video_urls_from_channel(channel, most_recent_video_url=None, video_limit=None, video_parse_step_size=10):
    """
    Helper method to identify all of the video URLs from a channel.
    If `most_recent_video_url` is None, then we're going to download information for all of the videos we can, 
    all the way up to the `video_limit`. If *that* is None, then we're going to download information for all of the videos.
    The `video_parse_step_size` indicates how many videos we're going to parse at a time.
    """
    
    # Initialize the video URLs
    video_urls = []
    
    # Initialize the video count
    video_count = 0
    
    # Iterate through the channel's videos until we find the `most_recent_video_url`
    while most_recent_video_url not in video_urls:
        
        # Fetch the video URLs
        new_video_urls = channel.video_urls[video_count:video_count+video_parse_step_size]
        
        # Break out if no new video URLs were found
        if len(new_video_urls) == 0:
            break
        
        video_urls.extend(new_video_urls)
        
        # Update the video count
        video_count += video_parse_step_size
        
        # If we've reached the video limit, then break
        if (video_limit is not None and video_count >= video_limit):
            break
    
    # Return the video URLs
    return video_urls

This method should function to do what I want. Let's test it: 

In [None]:
# This is the list of video URLs we're going to parse
video_urls_to_parse = get_video_urls_from_channel(
    channel=Channel(channel_url),
    most_recent_video_url=None,
    video_limit=video_limit,
    video_parse_step_size=video_parse_step_size,
)

# If the most_recent_video_url is not None, then we're going to remove all of the videos that come after it
try:
    if most_recent_video_url is not None:
        video_urls_to_parse = video_urls_to_parse[
            : video_urls_to_parse.index(most_recent_video_url)
        ]
    else:
        pass
# If we run into an error, then we're going to print out the traceback
except Exception as e:
    traceback.print_exc()

# Print some information about the video URLs we're going to parse
print(f"Identified {len(video_urls_to_parse)} videos to parse.")

# Removing Already Parsed Videos
We're going to upload a temporary table with all of the video URLs to GBQ. 

In [None]:
# Create a DataFrame from the video URLs
video_urls_to_parse_df = pd.DataFrame(video_urls_to_parse, columns=["url"])

# Create a temporary table in GBQ
temporary_table_name = gbq_utils.create_temporary_table_in_gbq(
    dataframe=video_urls_to_parse_df,
    project_id=GBQ_PROJECT_ID,
    dataset_name=GBQ_DATASET_ID,
    table_name="temporary_video_urls_to_parse",
    if_exists="replace",
)

With this temporary table in hand, we'll query GBQ to figure out the *actual* videos to download. 

In [None]:
# Create the query to identify the videos that we need to parse
actual_videos_to_parse_query = f"""
SELECT
  temp_urls.url
FROM
  `{temporary_table_name}` temp_urls
LEFT JOIN
  `backend_data.video_metadata` metadata
ON
  metadata.url = temp_urls.url
WHERE
  metadata.id IS NULL
"""

# Execute the query
actual_videos_to_parse_df = pd.read_gbq(
    actual_videos_to_parse_query, project_id=GBQ_PROJECT_ID
)

Finally, some cleanup: setting the `actual_videos_to_parse_df` contents to the video_urls_to_parse, and deleting the temporary table. 

In [None]:
# Print some information about the videos we're going to parse
print(f"After filtering out videos that have already been parsed, we have {len(actual_videos_to_parse_df)} videos to parse.")

# Overriding the video_urls_to_parse with the contents of the actual_videos_to_parse_df
video_urls_to_parse = list(actual_videos_to_parse_df["url"])

# Use the gbq_utils to delete the temporary table
temp_table_project_id, temp_table_dataset_id, temp_table_name = temporary_table_name.split(".")
gbq_utils.delete_table(
    project_id=temp_table_project_id,
    dataset_id=temp_table_dataset_id,
    table_id=temp_table_name,
)

# Downloading Video Metadata
Below, I'm going to define a method that'll download a video's metadata. 

In [None]:
def parse_metadata_from_video(video_url):
    """
    This method will parse a dictionary containing metadata from a video, given its URL.
    """

    # Create a video object
    video = YouTube(video_url)

    # Keep a dictionary to keep track of the metadata we're interested in
    video_metadata_dict = {}

    # We'll wrap this in a try/except block so that we can catch any errors that occur
    try:
        # Parse the `videoDetails` from the video; this contains a lot of the metadata we're interested in
        vid_info_dict = video.vid_info
        video_info_dict = vid_info_dict.get("videoDetails")

    # If we run into an Exception this early on, we'll raise an Exception
    except Exception as e:
        raise Exception(
            f"Error parsing video metadata for video {video_url}: '{e}'\nTraceback is as follows:\n{traceback.format_exc()}"
        )

    # Extract different pieces of the video metadata
    video_metadata_dict["id"] = video_info_dict.get("videoId")
    video_metadata_dict["title"] = video_info_dict.get("title")
    video_metadata_dict["length"] = video_info_dict.get("lengthSeconds")
    video_metadata_dict["channel_id"] = video_info_dict.get("channelId")
    video_metadata_dict["channel_name"] = video_info_dict.get("author")
    video_metadata_dict["short_description"] = video_info_dict.get("shortDescription")
    video_metadata_dict["view_ct"] = video_info_dict.get("viewCount")
    video_metadata_dict["url"] = video_info_dict.get("video_url")
    video_metadata_dict["small_thumbnail_url"] = (
        video_info_dict.get("thumbnail").get("thumbnails")[0].get("url")
    )
    video_metadata_dict["large_thumbnail_url"] = (
        video_info_dict.get("thumbnail").get("thumbnails")[-1].get("url")
    )

    # Try and extract the the publish_date
    try:
        publish_date = video.publish_date
        video_metadata_dict["publish_date"] = publish_date
    except:
        video_metadata_dict["publish_date"] = None

    # Try and extract the full description
    try:
        full_description = video.description
        video_metadata_dict["description"] = full_description
    except:
        video_metadata_dict["description"] = None
    
    # Use datetime to get the scrape_date (the current datetime)
    video_metadata_dict["scrape_date"] = datetime.datetime.now()
    
    # Add the url to the video_metadata_dict
    video_metadata_dict["url"] = video_url

    # Finally, return the video metadata dictionary
    return video_metadata_dict

Now: we'll need to iterate through each of the videos and download their metadata. 

In [None]:
# Parameterize the video metadata parsing
time_to_sleep_between_parsing = 2
sleep_randomization_factor = 2.5

# We'll iterate through each of the videos in the list and parse their metadata
video_metadata_dicts_by_video_url = {}
for video_url in tqdm(video_urls_to_parse, disable=not tqdm_enabled):
    
    # We'll wrap this in a try/except block so that we can catch any errors that occur
    try:
        # Parse the metadata from the video
        video_metadata_dict = parse_metadata_from_video(video_url)
        
        # Add the video metadata dictionary to the dictionary of video metadata dictionaries
        video_metadata_dicts_by_video_url[video_url] = video_metadata_dict
        
        # Sleep for a random amount of time
        time_to_sleep = random.uniform(time_to_sleep_between_parsing, time_to_sleep_between_parsing + (sleep_randomization_factor * time_to_sleep_between_parsing))
        time.sleep(time_to_sleep)
    
    # If we run into an Exception, then we'll print out the traceback
    except Exception as e:
        traceback.print_exc()

## Storing the Metadata
Now that I've downloaded some metadata about different videos, I need to store it. 

In [None]:
# Create a list of the rows to add to the table
rows_to_add = [val for val in video_metadata_dicts_by_video_url.values()]

# Add the rows to the table
gbq_utils.add_rows_to_table(
    project_id=GBQ_PROJECT_ID,
    dataset_id=GBQ_DATASET_ID,
    table_id="video_metadata",
    rows=rows_to_add   
)

# Downloading Video Audio
Next up, I need to download some video audio. This one probably needs to go a lot slower than the metadata fetching 😅

### Determining Audio to Download
I need to check with GBQ to see if there are any videos that I need to download. 

In [None]:
# Parameterize the query
n_max_video_urls = 2

# The query below will determine which videos we need to download audio for
videos_for_audio_parsing_query = f"""
SELECT
  video.url
FROM
  `backend_data.video_metadata` video
LEFT JOIN
  `backend_data.audio` audio
ON
  audio.video_url = video.url
WHERE
  audio.audio_gcs_uri IS NULL
LIMIT {n_max_video_urls}
"""

# Execute the query
videos_for_audio_parsing_df = pd.read_gbq(
    videos_for_audio_parsing_query, project_id=GBQ_PROJECT_ID
)

### Downloading Audio
Next, I'm going to use `pytube` to download the audio of these videos.

In [None]:
# Parameterize the download
time_to_sleep_between_downloads = 10
sleep_randomization_factor = 2.5
download_directory = Path("temp_data/")

# Iterate through the videos and download their audio
for video_url in tqdm(videos_for_audio_parsing_df["url"], disable=not tqdm_enabled):
    # We'll wrap this in a try/except block so that we can catch any errors that occur
    try:
        # Download the audio from the video
        youtube_utils.download_audio_from_video(
            video_url=video_url, data_folder_path=download_directory
        )

    # If we run into an Exception, then we'll print out the traceback
    except Exception as e:
        traceback.print_exc()

### Uploading Audio to GCS
Now that I've downloaded the audio, I need to upload it to GCS. 

I'll start by creating the bucket if it doesn't exist:

In [None]:
# Make sure that the neural-needledrop-audio bucket exists
gcs_utils.create_bucket(
    "neural-needledrop-audio", project_id=GBQ_PROJECT_ID, delete_if_exists=True
)

Next: upload all of the audio. 

In [None]:
# Parameterize the audio upload process
delete_files_after_upload = False

# Iterate through all of the video urls in the videos_for_audio_parsing_df
for row in tqdm(
    list(videos_for_audio_parsing_df.itertuples()), disable=not tqdm_enabled
):
    # We'll wrap this in a try/except block so that we can catch any errors that occur
    try:
        # Get the video url
        video_url = row.url

        # Get the video id
        video_id = video_url.split("watch?v=")[-1]

        # Get the path to the audio file
        audio_file_path = download_directory / f"{video_id}.m4a"

        # Check to see if this file exists
        if not Path(audio_file_path).exists():
            # If it doesn't exist, then we'll continue. Print out a warning
            print(f"Warning: {audio_file_path} does not exist. Skipping...")
            continue

        # Get the GCS URI
        gcs_uri = f"neural-needledrop-audio"

        # Upload the audio file to GCS
        audio_file_path_str = str(audio_file_path)
        gcs_utils.upload_file_to_bucket(
            file_path=audio_file_path_str,
            bucket_name=gcs_uri,
            project_id=GBQ_PROJECT_ID,
        )

        # Create a dictionary to store the audio metadata
        audio_metadata_dict = {
            "video_url": video_url,
            "audio_gcs_uri": f"gs://{gcs_uri}/{video_id}.m4a",
            "scrape_date": datetime.datetime.now(),
        }

        # Add the audio metadata to the table
        try:
            gbq_utils.add_rows_to_table(
                project_id=GBQ_PROJECT_ID,
                dataset_id=GBQ_DATASET_ID,
                table_id="audio",
                rows=[audio_metadata_dict],
            )
        except NotFound:
            gbq_utils.generate_audio_table(
                project_id=GBQ_PROJECT_ID,
                dataset_id=GBQ_DATASET_ID,
                delete_if_exists=False,
            )
            gbq_utils.add_rows_to_table(
                project_id=GBQ_PROJECT_ID,
                dataset_id=GBQ_DATASET_ID,
                table_id="audio",
                rows=[audio_metadata_dict],
            )

        # Delete the audio file if delete_files_after_upload
        if delete_files_after_upload:
            audio_file_path.unlink()

    # If we run into an Exception, then we'll print out the traceback
    except Exception as e:
        traceback.print_exc()

# If we're deleting the files after upload, then we'll delete the download_directory
if delete_files_after_upload:
    Path(download_directory).rmdir()

Now, a super quick fix that I ought to handle: I'm going to deduplicate the `backend_data.audio` table.

In [None]:
# This query will deduplicate the audio table
deduplicate_audio_table_query = f"""
CREATE OR REPLACE TABLE `{GBQ_PROJECT_ID}.{GBQ_DATASET_ID}.audio` AS (
    SELECT
        video_url,
        audio_gcs_uri,
        scrape_date
    FROM (
        SELECT
            *,
            ROW_NUMBER() OVER (PARTITION BY video_url ORDER BY scrape_date DESC) AS row_num
        FROM
            `{GBQ_PROJECT_ID}.{GBQ_DATASET_ID}.audio`
    ) ordered_table
    WHERE
        ordered_table.row_num = 1
)
"""

# Execute the query
pandas_gbq.read_gbq(deduplicate_audio_table_query, project_id=GBQ_PROJECT_ID)

# Transcribing Audio with Whisper
Now that I've downloaded some audio, I need to figure out what needs to be transcribed. 