In [None]:
import re
import os
from pyspark.sql import SparkSession, Row
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from googleapiclient.discovery import build
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api.formatters import SRTFormatter

In [None]:
# Initialize the YouTube API client
api_key = os.getenv("YOUTUBE_API_KEY")
youtube = build('youtube', 'v3', developerKey=api_key)

In [None]:
def get_video_ids_with_captions(query, max_results=50):
    video_ids = []
    # search request to the YouTube API
    search_response = youtube.search().list(
        q=query,
        part='id',
        maxResults=max_results,
        type='video',
        videoCaption='closedCaption',  # Only retrieve videos with captions
    ).execute()

    # Extract video IDs
    for search_result in search_response.get('items', []):
        video_ids.append(search_result['id']['videoId'])

    return video_ids

def sanitize_filename(filename):
    """Sanitize the filename by removing or replacing invalid characters."""
    valid_filename = re.sub(r'[^\w\s-]', '', filename).strip().lower()
    return re.sub(r'[-\s]+', '-', valid_filename)

def main():
    # Initialize Spark Session
    spark = SparkSession.builder \
        .appName("YouTubeDataAnalysis") \
        .getOrCreate()

    query = "hindi old comedy movies"  
    video_ids = get_video_ids_with_captions(query)

    # Converting the video IDs into a Spark DataFrame
    rows = [Row(video_id=video_id) for video_id in video_ids]
    df = spark.createDataFrame(rows)

    #counting the number of video IDs
    print(f"Total video IDs: {df.count()}")

    # Save the DataFrame to HDFS
    hdfs_path = f"hdfs://namenode:9000/video_ids{sanitize_filename(query)}.csv"
    df.write.csv(hdfs_path)

    print(f"Saved {len(video_ids)} video IDs to {hdfs_path}.")

if __name__ == "__main__":
    main()

In [None]:
def download_transcript(video_id):
    """
    Fetches the transcript for a given video ID and returns it as SRT format.
    This function is intended to be used as a UDF in Spark.
    """
    try:
        # Fetching the transcript in English
        transcript = YouTubeTranscriptApi.get_transcript(video_id, languages=['en'])
        
        # Formatting the transcript as SRT
        formatter = SRTFormatter()
        srt_transcript = formatter.format_transcript(transcript)
        
        return srt_transcript
    except Exception as e:
        return f"Error: {e}"

def main():
    spark = SparkSession.builder \
        .appName("YouTubeTranscripts") \
        .getOrCreate()

    # Define the UDF for downloading transcripts
    download_transcript_udf = udf(download_transcript, StringType())

    # Reading a list of video IDs from HDFS 
    hdfs_input_path = "hdfs://namenode:9000/video_ids"
    df_video_ids = spark.read.text(hdfs_input_path)

    # Applying the UDF to download transcripts
    df_transcripts = df_video_ids.withColumn("transcript", download_transcript_udf("value"))


    # Saving transcripts to HDFS
    hdfs_output_path = "hdfs://namenode:9000/subtitles/"
    df_transcripts.write.text(hdfs_output_path)

if __name__ == "__main__":
    main()