# Transform Video Data with partitions - Notebook

####  Set up and start session


In [1]:
%idle_timeout 2880
%glue_version 5.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
from pyspark.sql.functions import col, to_timestamp, date_format, from_utc_timestamp, to_utc_timestamp,to_date, date_format, regexp_extract, coalesce, lit, size, when, year, month, dayofmonth
from pyspark.sql.types import IntegerType
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 5.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: ef157b2b-7773-4275-a251-6a970b44891c
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
Waiting for session ef157b2b-7773-4275-a251-6a970b44891c to get into ready status...
Session ef157b2b-7773-4275-a251-6a970b44891c ha

In [2]:
def format_video_df(base_df):
    """
    Format table containing details of YT videos
    """
    # Numeric columns
    numeric_cols = ['viewCount', 'likeCount', 'commentCount']
    # Time columns
    time_cols = ['duration']
    # Date columns
    date_cols = ['publishedAt']
    
    # Convert numeric columns to numeric format
    for col_name in numeric_cols:
        base_df = base_df.withColumn(col_name, col(col_name).cast(IntegerType()))
    
    # Convert date columns
    for col_name in date_cols:
        
        # Convert to UTC date time
        base_df = base_df.withColumn(f'{col_name}_in_utc', to_utc_timestamp(col(col_name), 'UTC'))
        
        # Convert to UTC date
        base_df = base_df.withColumn(f'{col_name}_utc_date', to_date(to_utc_timestamp(col(col_name), 'UTC')))
        
        # Convert to UTC and extract only the time
        base_df = base_df.withColumn(f'{col_name}_utc_time', date_format(to_utc_timestamp(col(col_name), 'UTC'), 'HH:mm:ss'))
        
        # Extract timezone (if timezone exists in the original datetime)
        base_df = base_df.withColumn(f'{col_name}_timezone', date_format(col(col_name), 'z'))  # 'z' gives the timezone abbreviation
    
    # Convert time columns
    for col_name in time_cols:
        # Extract hours, minutes, and seconds from the duration string (ISO 8601 format)
        hours = coalesce(regexp_extract(col(col_name), r'PT(\d+)H', 1).cast('int'), lit(0))
        minutes = coalesce(regexp_extract(col(col_name), r'(\d+)M', 1).cast('int'), lit(0))
        seconds = coalesce(regexp_extract(col(col_name), r'(\d+)S', 1).cast('int'), lit(0))

        # Calculate total seconds (assuming hours, minutes, and seconds exist in the duration)
        base_df = base_df.withColumn(f'{col_name}_seconds_total', (hours * 3600) + (minutes * 60) + seconds)  
    
    # Counting tags (using size function to count the number of elements in an array column)
    # coalesce gives -1 when size = 0. Therefor when/ otherwise will be used
    base_df = base_df.withColumn('tagsCount', 
                                     when(col('tags').isNull(), lit(0)).otherwise(size(col('tags')))
                                )
    
    # Calculating likeRatio and commentRatio per 1000 views
    base_df = base_df.withColumn('likeRatio', 
                                 (col('likeCount') / col('viewCount') * 1000).cast('float'))
    base_df = base_df.withColumn('commentRatio', 
                                 (col('commentCount') / col('viewCount') * 1000).cast('float'))

    # Seperating year, month and date of Extraction Date - allowing partitioning
    base_df = base_df.withColumn('extractYear',year(col('extractDate')))
    base_df = base_df.withColumn('extractMonth',month(col('extractDate')))
    base_df = base_df.withColumn("extractDay", dayofmonth(col("extractDate")))
    
    # Drop original date columns with timezone
    return base_df.drop(*date_cols)




#### Configuring current date to read from S3 bucket

In [5]:
from datetime import datetime
current_date = datetime.now().strftime("%Y/%m/%d")
#current_date = "2025/03/09"
current_date

'2025/03/09'


In [4]:
s3_path_raw = f"s3://youtube-channel-data-v1-02032025/raw/{current_date}/video_data.parquet"
s3_path_raw

's3://youtube-channel-data-v1-02032025/raw/2025/03/09/video_data.parquet'


#### Reading from S3 bucket

In [6]:
dynamic_frame = glueContext.create_dynamic_frame.from_options(
    connection_type="s3",  # Source type
    connection_options={"paths": [s3_path_raw]},  # Path to the S3 raw data
    format="parquet"  # Specify the format of the data (parquet in this case)
)




In [7]:
# Convert DynamicFrame to DataFrame for processing
df = dynamic_frame.toDF()



In [8]:
df.show()

+-----------+--------------------+--------------------+--------------------+--------------------+---------+---------+-------------+------------+--------+----------+-------+-----------+--------------------+
|    videoId|               title|         description|                tags|         publishedAt|viewCount|likeCount|favoriteCount|commentCount|duration|definition|caption|extractDate|           channelId|
+-----------+--------------------+--------------------+--------------------+--------------------+---------+---------+-------------+------------+--------+----------+-------+-----------+--------------------+
|7uH7_DThtX0|Top 3 Tips on Usi...|Best Tips on how ...|[Data Analyst, Ho...|2020-11-17T14:11:52Z|   331086|    11809|            0|         247| PT6M50S|        hd|  false| 2025-03-09|UC7cs8q-gJRlGwj4A...|
|lVj0RlSxTXk|Best Online Cours...|According to a re...|[big data, data a...|2021-04-15T14:00:28Z|    67210|     2714|            0|         131|PT13M23S|        hd|  false| 202

In [34]:
df.printSchema()

root
 |-- videoId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- publishedAt: string (nullable = true)
 |-- viewCount: string (nullable = true)
 |-- likeCount: string (nullable = true)
 |-- favoriteCount: string (nullable = true)
 |-- commentCount: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- definition: string (nullable = true)
 |-- caption: string (nullable = true)
 |-- extractDate: date (nullable = true)
 |-- channelId: string (nullable = true)


#### Transforming data

In [9]:
df_transformed = format_video_df(df)
df_transformed.show()

+-----------+--------------------+--------------------+--------------------+---------+---------+-------------+------------+--------+----------+-------+-----------+--------------------+-------------------+--------------------+--------------------+--------------------+----------------------+---------+---------+------------+-----------+------------+----------+
|    videoId|               title|         description|                tags|viewCount|likeCount|favoriteCount|commentCount|duration|definition|caption|extractDate|           channelId| publishedAt_in_utc|publishedAt_utc_date|publishedAt_utc_time|publishedAt_timezone|duration_seconds_total|tagsCount|likeRatio|commentRatio|extractYear|extractMonth|extractDay|
+-----------+--------------------+--------------------+--------------------+---------+---------+-------------+------------+--------+----------+-------+-----------+--------------------+-------------------+--------------------+--------------------+--------------------+-------------

In [10]:
df_transformed.printSchema()

root
 |-- videoId: string (nullable = true)
 |-- title: string (nullable = true)
 |-- description: string (nullable = true)
 |-- tags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- viewCount: integer (nullable = true)
 |-- likeCount: integer (nullable = true)
 |-- favoriteCount: string (nullable = true)
 |-- commentCount: integer (nullable = true)
 |-- duration: string (nullable = true)
 |-- definition: string (nullable = true)
 |-- caption: string (nullable = true)
 |-- extractDate: date (nullable = true)
 |-- channelId: string (nullable = true)
 |-- publishedAt_in_utc: timestamp (nullable = true)
 |-- publishedAt_utc_date: date (nullable = true)
 |-- publishedAt_utc_time: string (nullable = true)
 |-- publishedAt_timezone: string (nullable = true)
 |-- duration_seconds_total: integer (nullable = false)
 |-- tagsCount: integer (nullable = false)
 |-- likeRatio: float (nullable = true)
 |-- commentRatio: float (nullable = true)
 |-- extractYear: integer (

#### Writing to S3 bucket

In [11]:
# Define the S3 path dynamically
s3_path_analysis = f"s3://youtube-channel-data-v1-02032025/analysis/video_data/"
s3_path_analysis

's3://youtube-channel-data-v1-02032025/analysis/video_data/'


In [12]:
dynamic_frame_transformed_for_analysis = DynamicFrame.fromDF(df_transformed, glueContext, "dynamic_frame_transformed")




In [13]:
s3output = glueContext.getSink(
    path=s3_path_analysis,
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=['channelId','extractYear','extractMonth','extractDay'],
    compression="snappy",
    enableUpdateCatalog=True
)
s3output.setCatalogInfo(catalogDatabase="glue_metadata_db_analysis_files", catalogTableName="video_data")
s3output.setFormat("glueparquet")
s3output.writeFrame(dynamic_frame_transformed_for_analysis)

<awsglue.dynamicframe.DynamicFrame object at 0x7f4a2890bd90>
