In [1]:
# Set configuration parameters for the Glue job
%idle_timeout 2880     # Set the idle timeout for the Glue job to 2880 seconds (48 minutes)
%glue_version 4.0      # Specify the version of AWS Glue to use (version 4.0)
%worker_type G.1X      # Define the worker type for the Glue job as G.1X
%number_of_workers 5   # Specify the number of workers to use for the Glue job as 5

# Import necessary libraries
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from datetime import datetime
from awsglue.dynamicframe import DynamicFrame
import boto3
from pyspark.sql.functions import *
# Create a SparkContext
sc = SparkContext.getOrCreate()
# Create a GlueContext
glueContext = GlueContext(sc)
# Create a SparkSession
spark = glueContext.spark_session
# Create a Glue job
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.4 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Session ID: a48c823a-e033-454e-b1fe-f9dd60d27407
Applying the following default arguments:
--glue_kernel_version 1.0.4
--enable-glue-datacatalog true
Waiting for session a48c823a-e033-454e-b1fe-f9dd60d27407 to get into ready status...
Session a48c823a-e033-454e-b1fe-f9dd60d27407 has been created.



In [2]:
# Define paths to raw data stored in Amazon S3
channel_data_path = "s3://youtube-etl-njk/raw_data/to_processed/channel_data/"    # Path to channel data
video_data_path = "s3://youtube-etl-njk/raw_data/to_processed/video_data/"        # Path to video data
comments_data_path = "s3://youtube-etl-njk/raw_data/to_processed/comments_data/"  # Path to comments data




In [3]:
# Read JSON data from specified paths into Spark DataFrames
channel_df = spark.read.json(channel_data_path)    # Read channel data into DataFrame
video_df = spark.read.json(video_data_path)        # Read video data into DataFrame
comments_df = spark.read.json(comments_data_path)  # Read comments data into DataFrame

AnalysisException: Path does not exist: s3://youtube-etl-njk/raw_data/to_processed/channel_data


In [4]:
def process_channel_data(channel_df):
    # Explode the 'items' array column to create separate rows for each item
    Channel_Data = channel_df.withColumn("items", explode('items')).select(
        # Select specific columns from the exploded DataFrame and rename them
        col("items.kind").alias("kind"),
        col("items.id").alias("channel_id"),
        col("items.contentDetails.relatedPlaylists.uploads").alias("playlist_id"),
        col("items.snippet.title").alias("channel_title"),
        # Clean and format the 'channel_description' column
        regexp_replace(regexp_replace(col("items.snippet.description"), ",", ";"), "\s+", " ").alias("channel_description"),
        col("items.snippet.publishedAt").alias("channel_publishedAt"),
        col("items.snippet.country").alias("channel_country"),
        col("items.statistics.viewCount").alias("channel_viewCount"),
        col("items.statistics.subscriberCount").alias("channel_subscriberCount"),
        col("items.statistics.videoCount").alias("channel_videoCount")
    ).drop_duplicates(['channel_id'])  # Remove duplicate rows based on 'channel_id'
    
    # Convert 'channel_publishedAt' column to date format
    Channel_Data = Channel_Data.withColumn("channel_publishedAt", to_date(col("channel_publishedAt")))
    
    # Convert specified columns to integer format
    convert_to_int = ['channel_viewCount', 'channel_subscriberCount', 'channel_videoCount']
    for column in convert_to_int:
        Channel_Data = Channel_Data.withColumn(column, col(column).cast("int"))   
    
    return Channel_Data




In [None]:
def process_video_data(video_df):
    # Explode the 'items' array column to create separate rows for each item
    Video_Data = video_df.withColumn("items", explode('items')).select(
        # Select specific columns from the exploded DataFrame and rename them
        col("items.snippet.channelId").alias("channelId"),
        col("items.id").alias("video_id"),
        col("items.snippet.publishedAt").alias("publishedAt"),       
        # Clean and format the 'title' column
        regexp_replace(regexp_replace(col("items.snippet.title"), ",", ";"), "\s+", " ").alias("title"),       
        # Clean and format the 'description' column
        regexp_replace(regexp_replace(col("items.snippet.description"), ",", ";"), "\s+", " ").alias("description"),      
        col("items.snippet.categoryId").alias("categoryId"),
        col("items.statistics.viewCount").alias("viewCount"),
        col("items.statistics.likeCount").alias("likeCount"),
        col("items.statistics.commentCount").alias("commentCount")
    ).drop_duplicates(['video_id'])  # Remove duplicate rows based on 'video_id'
    
    # Convert 'publishedAt' column to date format
    Video_Data = Video_Data.withColumn("publishedAt", to_date(col("publishedAt")))
    
    # Convert specified columns to integer format
    convert_to_int = ['categoryId', 'viewCount', 'likeCount', 'commentCount']
    for column in convert_to_int:
        Video_Data = Video_Data.withColumn(column, col(column).cast("int"))
    
    return Video_Data

In [None]:
def process_comments_data(comments_df):
    # Explode the 'items' array column to create separate rows for each item
    Comments_Data = comments_df.withColumn("items", explode('items')).select(
        # Select specific columns from the exploded DataFrame and rename them
        col("items.snippet.channelId").alias("channelId"),
        col("items.snippet.videoId").alias("videoId"),
        # Clean and format the 'textDisplay' column
        regexp_replace(regexp_replace(col("items.snippet.topLevelComment.snippet.textDisplay"), ",", ";"), "\s+", " ").alias("textDisplay"),
    ).groupBy("channelId", "videoId").agg(collect_list("textDisplay").alias("Comments")).drop_duplicates(['videoId'])
    
    # Concatenate the 'Comments' column into a single string separated by commas
    Comments_Data = Comments_Data.withColumn("Comments", concat_ws(", ", Comments_Data["Comments"]))
    
    return Comments_Data

In [None]:
# Call the function to process channel data and assign the result to a new variable
Channel_Data_Transformed = process_channel_data(channel_df)

# Call the function to process video data and assign the result to a new variable
Video_Data_Transformed = process_video_data(video_df)

# Call the function to process comments data and assign the result to a new variable
Comments_Data_Transformed = process_comments_data(comments_df)

In [None]:
def write_to_s3(df, path_suffix, format_type="csv"):
    # Convert DataFrame to DynamicFrame
    dynamic_frame = DynamicFrame.fromDF(df, glueContext, "dynamic_frame") 
    
    # Write DynamicFrame to S3
    glueContext.write_dynamic_frame.from_options(
        frame=dynamic_frame,
        connection_type="s3",
        connection_options={"path": f"s3://youtube-etl-njk/transformed_data/{path_suffix}/"},
        format=format_type
    )

In [None]:
# Call the write_to_s3() function to write transformed channel data to S3
write_to_s3(Channel_Data_Transformed, "channel_data/channel_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), 'csv')

# Call the write_to_s3() function to write transformed video data to S3
write_to_s3(Video_Data_Transformed, "video_data/video_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), 'csv')

# Call the write_to_s3() function to write transformed comments data to S3
write_to_s3(Comments_Data_Transformed, "comments_data/comments_transformed_{}".format(datetime.now().strftime("%Y-%m-%d")), 'csv')


In [None]:
def move_files_to_processed(bucket_name, from_folders, to_folders):
    # Create an S3 client
    s3 = boto3.client('s3')
    
    # Iterate over pairs of 'from_folders' and 'to_folders'
    for from_folder, to_folder in zip(from_folders, to_folders):
        # List objects in the 'from_folder'
        objects = s3.list_objects_v2(Bucket=bucket_name, Prefix=f"{from_folder}/")
        
        # Iterate over each object in the 'from_folder'
        for obj in objects.get('Contents', []):
            key = obj['Key']  # Get the key (file path) of the object
            new_key = key.replace(from_folder, to_folder)  # Generate the new key for the object
            
            # Copy the object to the 'to_folder'
            s3.copy_object(CopySource={'Bucket': bucket_name, 'Key': key}, Bucket=bucket_name, Key=new_key)
            
            # Delete the object from the 'from_folder'
            s3.delete_object(Bucket=bucket_name, Key=key)
            
            # Print a message indicating the file movement
            print(f'Moved file from {key} to {new_key}')

# Example usage:
bucket_name = 'youtube-etl-njk'
from_folders = ['raw_data/to_processed/channel_data', 
                'raw_data/to_processed/video_data', 
                'raw_data/to_processed/comments_data']
to_folders = ['raw_data/processed/channel_data', 
              'raw_data/processed/video_data', 
              'raw_data/processed/comments_data']

move_files_to_processed(bucket_name, from_folders, to_folders)