# AWS Glue Studio Notebook
##### You are now running a AWS Glue Studio notebook; To start using your notebook you need to start an AWS Glue Interactive Session.


#### Optional: Run this cell to see available notebook commands ("magics").


In [2]:
%help

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 



# Available Magic Commands

## Sessions Magic

----
    %help                             Return a list of descriptions and input types for all magic commands. 
    %profile            String        Specify a profile in your aws configuration to use as the credentials provider.
    %region             String        Specify the AWS region in which to initialize a session. 
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\ USERNAME \.aws\config" on Windows.
    %idle_timeout       Int           The number of minutes of inactivity after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %timeout            Int           The number of minutes after which a session will timeout. 
                                      Default: 2880 minutes (48 hours).
    %session_id_prefix  String        Define a String that will precede all session IDs in the format 
                                      [session_id_prefix]-[session_id]. If a session ID is not provided,
                                      a random UUID will be generated.
    %status                           Returns the status of the current Glue session including its duration, 
                                      configuration and executing user / role.
    %session_id                       Returns the session ID for the running session.
    %list_sessions                    Lists all currently running sessions by ID.
    %stop_session                     Stops the current session.
    %glue_version       String        The version of Glue to be used by this session. 
                                      Currently, the only valid options are 2.0, 3.0 and 4.0. 
                                      Default: 2.0.
    %reconnect          String        Specify a live session ID to switch/reconnect to the sessions.
----

## Selecting Session Types

----
    %streaming          String        Sets the session type to Glue Streaming.
    %etl                String        Sets the session type to Glue ETL.
    %session_type       String        Specify a session_type to be used. Supported values: streaming and etl.
----

## Glue Config Magic 
*(common across all session types)*

----

    %%configure         Dictionary    A json-formatted dictionary consisting of all configuration parameters for 
                                      a session. Each parameter can be specified here or through individual magics.
    %iam_role           String        Specify an IAM role ARN to execute your session with.
                                      Default from ~/.aws/config on Linux or macOS, 
                                      or C:\Users\%USERNAME%\.aws\config` on Windows.
    %number_of_workers  int           The number of workers of a defined worker_type that are allocated 
                                      when a session runs.
                                      Default: 5.
    %additional_python_modules  List  Comma separated list of additional Python modules to include in your cluster 
                                      (can be from Pypi or S3).
    %%tags        Dictionary          Specify a json-formatted dictionary consisting of tags to use in the session.
    
    %%assume_role Dictionary, String  Specify a json-formatted dictionary or an IAM role ARN string to create a session 
                                      for cross account access.
                                      E.g. {valid arn}
                                      %%assume_role 
                                      'arn:aws:iam::XXXXXXXXXXXX:role/AWSGlueServiceRole' 
                                      E.g. {credentials}
                                      %%assume_role
                                      {
                                            "aws_access_key_id" : "XXXXXXXXXXXX",
                                            "aws_secret_access_key" : "XXXXXXXXXXXX",
                                            "aws_session_token" : "XXXXXXXXXXXX"
                                       }
----

                                      
## Magic for Spark Sessions (ETL & Streaming)

----
    %worker_type        String        Set the type of instances the session will use as workers. 
    %connections        List          Specify a comma separated list of connections to use in the session.
    %extra_py_files     List          Comma separated list of additional Python files From S3.
    %extra_jars         List          Comma separated list of additional Jars to include in the cluster.
    %spark_conf         String        Specify custom spark configurations for your session. 
                                      E.g. %spark_conf spark.serializer=org.apache.spark.serializer.KryoSerializer
----

## Action Magic

----

    %%sql               String        Run SQL code. All lines after the initial %%sql magic will be passed
                                      as part of the SQL code.  
    %matplot      Matplotlib figure   Visualize your data using the matplotlib library.
                                      E.g. 
                                      import matplotlib.pyplot as plt
                                      # Set X-axis and Y-axis values
                                      x = [5, 2, 8, 4, 9]
                                      y = [10, 4, 8, 5, 2]
                                      # Create a bar chart 
                                      plt.bar(x, y) 
                                      # Show the plot
                                      %matplot plt    
    %plotly            Plotly figure  Visualize your data using the plotly library.
                                      E.g.
                                      import plotly.express as px
                                      #Create a graphical figure
                                      fig = px.line(x=["a","b","c"], y=[1,3,2], title="sample figure")
                                      #Show the figure
                                      %plotly fig

  
                
----



####  Run this cell to set up and start your interactive session.


In [1]:
%idle_timeout 2880
%glue_version 4.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from pyspark.sql.functions import col, when, count, lit, trim, avg, sum, countDistinct
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

Welcome to the Glue Interactive Sessions Kernel
For more information on available magic commands, please type %help in any new cell.

Please view our Getting Started page to access the most up-to-date information on the Interactive Sessions kernel: https://docs.aws.amazon.com/glue/latest/dg/interactive-sessions.html
Installed kernel version: 1.0.7 
Current idle_timeout is None minutes.
idle_timeout has been set to 2880 minutes.
Setting Glue version to: 4.0
Previous worker type: None
Setting new worker type to: G.1X
Previous number of workers: None
Setting new number of workers to: 5
Trying to create a Glue session for the kernel.
Session Type: glueetl
Worker Type: G.1X
Number of Workers: 5
Idle Timeout: 2880
Session ID: 6c80da70-1b8e-4e5f-9820-e5684486f8e9
Applying the following default arguments:
--glue_kernel_version 1.0.7
--enable-glue-datacatalog true
Waiting for session 6c80da70-1b8e-4e5f-9820-e5684486f8e9 to get into ready status...
Session 6c80da70-1b8e-4e5f-9820-e5684486f8e9 ha

#### Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [2]:

courses_dynamic_df = glueContext.create_dynamic_frame.from_catalog(
    database="udemy_database", 
    table_name="courses"
)

# Load comments data
comments_dynamic_df = glueContext.create_dynamic_frame.from_catalog(
    database="udemy_database", 
    table_name="comments"
)

# Convert DynamicFrames to DataFrames for easier transformations
courses_df = courses_dynamic_df.toDF()
comments_df = comments_dynamic_df.toDF()



In [3]:
# Show first 5 rows to verify data is loaded
print("Courses Data:")
courses_df.show(5)

Courses Data:
+------+--------------------+-------+------+--------------------+---------------+----------+-----------+------------+------------+------------------+--------------------+----------------+---------+-------------------+-------------+--------+--------------------+---------------+------------------+
|    id|               title|is_paid| price|            headline|num_subscribers|avg_rating|num_reviews|num_comments|num_lectures|content_length_min|      published_time|last_update_date| category|        subcategory|        topic|language|          course_url|instructor_name|    instructor_url|
+------+--------------------+-------+------+--------------------+---------------+----------+-----------+------------+------------+------------------+--------------------+----------------+---------+-------------------+-------------+--------+--------------------+---------------+------------------+
|4715.0|Online Vegan Vege...|   true| 24.99|Learn to cook del...|         2231.0|      3.75|   

In [4]:
print("Comments Data:")
comments_df.show(5)

Comments Data:
+---------+---------+----+--------------------+-------------+--------------------+
|       id|course_id|rate|                date| display_name|             comment|
+---------+---------+----+--------------------+-------------+--------------------+
| 88962892|  3173036| 1.0|2021-06-29T18:54:...|        Rahul|I think a beginne...|
|125535470|  4913148| 5.0|2022-10-07T11:17:...|        Marlo|Aviva is such a n...|
| 68767147|  3178386| 3.5|2020-10-19T06:35:...|Yamila Andrea|Muy buena la intr...|
|125029758|  3175814| 5.0|2022-09-30T21:13:...|   Jacqueline|This course is th...|
| 76584052|  3174896| 4.5|2021-01-30T08:45:...|      Anthony|I found this cour...|
+---------+---------+----+--------------------+-------------+--------------------+
only showing top 5 rows


In [6]:
# Replace empty strings and other non-standard null representations with actual nulls
from pyspark.sql.functions import when

courses_df = courses_df.withColumn(
    "headline", when(col("headline") == "", None).otherwise(col("headline"))
).withColumn(
    "last_update_date", when(col("last_update_date") == "", None).otherwise(col("last_update_date"))
)





In [7]:
# Convert default placeholders to null
courses_df = courses_df.withColumn(
    "price", when(col("price") == 0.0, None).otherwise(col("price"))
).withColumn(
    "headline", when(col("headline") == "", None).otherwise(col("headline"))
)





In [8]:
# Inspect schema of the DataFrame
courses_df.printSchema()


root
 |-- id: double (nullable = true)
 |-- title: string (nullable = true)
 |-- is_paid: boolean (nullable = true)
 |-- price: double (nullable = true)
 |-- headline: string (nullable = true)
 |-- num_subscribers: double (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- num_reviews: double (nullable = true)
 |-- num_comments: double (nullable = true)
 |-- num_lectures: double (nullable = true)
 |-- content_length_min: double (nullable = true)
 |-- published_time: string (nullable = true)
 |-- last_update_date: string (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- language: string (nullable = true)
 |-- course_url: string (nullable = true)
 |-- instructor_name: string (nullable = true)
 |-- instructor_url: string (nullable = true)


In [9]:
# Check nulls again after cleaning
courses_df.select([count(when(col(c).isNull(), c)).alias(c) for c in courses_df.columns]).show()


+---+-----+-------+-----+--------+---------------+----------+-----------+------------+------------+------------------+--------------+----------------+--------+-----------+-----+--------+----------+---------------+--------------+
| id|title|is_paid|price|headline|num_subscribers|avg_rating|num_reviews|num_comments|num_lectures|content_length_min|published_time|last_update_date|category|subcategory|topic|language|course_url|instructor_name|instructor_url|
+---+-----+-------+-----+--------+---------------+----------+-----------+------------+------------+------------------+--------------+----------------+--------+-----------+-----+--------+----------+---------------+--------------+
|  0|    0|      0|21738|      27|              0|         0|          0|           0|           0|                 0|             0|             137|       0|          0|    0|       0|         0|              0|             0|
+---+-----+-------+-----+--------+---------------+----------+-----------+-----------

In [10]:
# Count null-like values
null_like_counts = courses_df.select(
    count(when((col("instructor_name").isNull()) | (col("instructor_name") == "") | (col("instructor_name").rlike("^\s*$")), "instructor_name")).alias("instructor_name_nulls"),
    count(when((col("instructor_url").isNull()) | (col("instructor_url") == "") | (col("instructor_url").rlike("^\s*$")), "instructor_url")).alias("instructor_url_nulls"),
    count(when((col("topic").isNull()) | (col("topic") == "") | (col("topic").rlike("^\s*$")), "topic")).alias("topic_nulls")
)

# Show the counts
null_like_counts.show()


+---------------------+--------------------+-----------+
|instructor_name_nulls|instructor_url_nulls|topic_nulls|
+---------------------+--------------------+-----------+
|                    5|                 427|        958|
+---------------------+--------------------+-----------+


In [11]:
# Drop rows where any of the specified columns have null or null-like values
courses_df = courses_df.filter(
    (col("instructor_name").isNotNull()) & (col("instructor_name") != "") & (~col("instructor_name").rlike("^\s*$"))
).filter(
    (col("instructor_url").isNotNull()) & (col("instructor_url") != "") & (~col("instructor_url").rlike("^\s*$"))
).filter(
    (col("topic").isNotNull()) & (col("topic") != "") & (~col("topic").rlike("^\s*$"))
).filter(
    (col("headline").isNotNull()) & (col("headline") != "") & (~col("headline").rlike("^\s*$"))
)

# Verify the dataset after removal
courses_df.show(5)

# Recheck for nulls in the specified columns
courses_df.select([count(when(col(c).isNull(), c)).alias(c) for c in ["instructor_name", "instructor_url", "topic", "headline"]]).show()


+------+--------------------+-------+------+--------------------+---------------+----------+-----------+------------+------------+------------------+--------------------+----------------+---------+-------------------+-------------+--------+--------------------+---------------+------------------+
|    id|               title|is_paid| price|            headline|num_subscribers|avg_rating|num_reviews|num_comments|num_lectures|content_length_min|      published_time|last_update_date| category|        subcategory|        topic|language|          course_url|instructor_name|    instructor_url|
+------+--------------------+-------+------+--------------------+---------------+----------+-----------+------------+------------+------------------+--------------------+----------------+---------+-------------------+-------------+--------+--------------------+---------------+------------------+
|4715.0|Online Vegan Vege...|   true| 24.99|Learn to cook del...|         2231.0|      3.75|      134.0|     

In [12]:
# Convert 'id' column in the courses_df to integer
courses_df = courses_df.withColumn("id", col("id").cast("long"))

# Verify the schema to confirm the change
courses_df.printSchema()

# Show a few rows to verify the change
courses_df.select("id").show(5, truncate=False)


root
 |-- id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- is_paid: boolean (nullable = true)
 |-- price: double (nullable = true)
 |-- headline: string (nullable = true)
 |-- num_subscribers: double (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- num_reviews: double (nullable = true)
 |-- num_comments: double (nullable = true)
 |-- num_lectures: double (nullable = true)
 |-- content_length_min: double (nullable = true)
 |-- published_time: string (nullable = true)
 |-- last_update_date: string (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- language: string (nullable = true)
 |-- course_url: string (nullable = true)
 |-- instructor_name: string (nullable = true)
 |-- instructor_url: string (nullable = true)

+----+
|id  |
+----+
|4715|
|1769|
|5664|
|7723|
|8157|
+----+
only showing top 5 rows


In [13]:
# Count null values for each column in the comments table
comments_null_counts = comments_df.select(
    [count(when(col(c).isNull(), c)).alias(c) for c in comments_df.columns]
)

# Show the null counts
comments_null_counts.show()

+---+---------+----+----+------------+-------+
| id|course_id|rate|date|display_name|comment|
+---+---------+----+----+------------+-------+
|  0|        0|   0|   0|           0|      0|
+---+---------+----+----+------------+-------+


In [15]:
# Count null-like values (empty strings, whitespaces) in specific columns
comments_null_like_counts = comments_df.select(
    count(when((col("display_name").isNull()) | (col("display_name") == "") | (col("display_name").rlike("^\s*$")), "display_name")).alias("display_name_nulls"),
    count(when((col("comment").isNull()) | (col("comment") == "") | (col("comment").rlike("^\s*$")), "comment")).alias("comment_nulls"),
    count(when((col("course_id").isNull()), "course_id")).alias("course_id_nulls"),
    count(when((col("rate").isNull()), "rate")).alias("rate_nulls")
)

# Show the null-like counts
comments_null_like_counts.show()


+------------------+-------------+---------------+----------+
|display_name_nulls|comment_nulls|course_id_nulls|rate_nulls|
+------------------+-------------+---------------+----------+
|                10|           19|              0|         0|
+------------------+-------------+---------------+----------+


In [16]:
comments_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- course_id: long (nullable = true)
 |-- rate: double (nullable = true)
 |-- date: string (nullable = true)
 |-- display_name: string (nullable = true)
 |-- comment: string (nullable = true)


In [17]:
from pyspark.sql.functions import count, when, col

# Check for nulls or null-like values in the relevant columns
comments_df.select(
    [count(when(col(c).isNull() | (col(c) == "") | (col(c).rlike("^\s*$")), c)).alias(c) for c in comments_df.columns]
).show()


+---+---------+----+----+------------+-------+
| id|course_id|rate|date|display_name|comment|
+---+---------+----+----+------------+-------+
|  0|        0|   0|   0|          10|     19|
+---+---------+----+----+------------+-------+


In [18]:
# Drop rows with nulls in critical columns
comments_df = comments_df.dropna(subset=["id", "course_id", "rate"])

# Fill null or empty values in non-critical columns
from pyspark.sql.functions import when

comments_df = comments_df.fillna({
    "display_name": "Anonymous",
    "comment": "No Comment"
})





In [19]:
from pyspark.sql.functions import trim, lower, col, to_date

# Clean and standardize strings
comments_df = comments_df.withColumn("display_name", trim(lower(col("display_name")))) \
                         .withColumn("comment", trim(col("comment")))

# Format date column
comments_df = comments_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))





In [20]:
comments_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- course_id: long (nullable = true)
 |-- rate: double (nullable = true)
 |-- date: date (nullable = true)
 |-- display_name: string (nullable = false)
 |-- comment: string (nullable = false)


In [21]:
# Cast id to double
comments_df = comments_df.withColumn("id", col("id").cast("double"))

# Verify the schema to ensure the change
comments_df.printSchema()




root
 |-- id: double (nullable = true)
 |-- course_id: long (nullable = true)
 |-- rate: double (nullable = true)
 |-- date: date (nullable = true)
 |-- display_name: string (nullable = false)
 |-- comment: string (nullable = false)


In [22]:
# Cast id, course_id, id_extracted, and course_id_extracted to double
comments_df = comments_df.withColumn("id", col("id").cast("double")) \
                         .withColumn("course_id", col("course_id").cast("double"))
                          
                        

# Verify the schema to ensure the changes
comments_df.printSchema()



root
 |-- id: double (nullable = true)
 |-- course_id: double (nullable = true)
 |-- rate: double (nullable = true)
 |-- date: date (nullable = true)
 |-- display_name: string (nullable = false)
 |-- comment: string (nullable = false)


In [23]:
spark.conf.set("spark.sql.legacy.timeParserPolicy", "LEGACY")





In [24]:
from pyspark.sql.functions import to_date

# Convert the date column to 'yyyy-MM-dd' format
comments_df = comments_df.withColumn("date", to_date(col("date"), "yyyy-MM-dd"))

# Show the transformed DataFrame
comments_df.show(5)

# Verify the schema
comments_df.printSchema()


+------------+---------+----+----------+-------------+--------------------+
|          id|course_id|rate|      date| display_name|             comment|
+------------+---------+----+----------+-------------+--------------------+
| 8.8962892E7|3173036.0| 1.0|2021-06-29|        rahul|I think a beginne...|
| 1.2553547E8|4913148.0| 5.0|2022-10-07|        marlo|Aviva is such a n...|
| 6.8767147E7|3178386.0| 3.5|2020-10-19|yamila andrea|Muy buena la intr...|
|1.25029758E8|3175814.0| 5.0|2022-09-30|   jacqueline|This course is th...|
| 7.6584052E7|3174896.0| 4.5|2021-01-30|      anthony|I found this cour...|
+------------+---------+----+----------+-------------+--------------------+
only showing top 5 rows

root
 |-- id: double (nullable = true)
 |-- course_id: double (nullable = true)
 |-- rate: double (nullable = true)
 |-- date: date (nullable = true)
 |-- display_name: string (nullable = false)
 |-- comment: string (nullable = false)


In [25]:
from pyspark.sql.functions import when

comments_df = comments_df.withColumn(
    "id",
    when(col("id").cast("long").isNotNull(), col("id").cast("long")).otherwise(-1)
).withColumn(
    "course_id",
    when(col("course_id").cast("long").isNotNull(), col("course_id").cast("long")).otherwise(-1)
)





In [26]:
# Check the first few rows
comments_df.select("id", "course_id").show(5, truncate=False)

# Confirm the schema
comments_df.printSchema()


+---------+---------+
|id       |course_id|
+---------+---------+
|88962892 |3173036  |
|125535470|4913148  |
|68767147 |3178386  |
|125029758|3175814  |
|76584052 |3174896  |
+---------+---------+
only showing top 5 rows

root
 |-- id: long (nullable = true)
 |-- course_id: long (nullable = true)
 |-- rate: double (nullable = true)
 |-- date: date (nullable = true)
 |-- display_name: string (nullable = false)
 |-- comment: string (nullable = false)


In [27]:
from pyspark.sql.functions import count, when, col

# Count null values for each column
comments_df.select(
    [count(when(col(c).isNull(), c)).alias(c) for c in comments_df.columns]
).show()


+---+---------+----+----+------------+-------+
| id|course_id|rate|date|display_name|comment|
+---+---------+----+----+------------+-------+
|  0|        0|   0|   0|           0|      0|
+---+---------+----+----+------------+-------+


In [28]:
# Check for duplicate rows in courses table
duplicates_in_courses = courses_df.groupBy(courses_df.columns).count().filter(col("count") > 1)

# Show duplicate rows
duplicates_in_courses.show(truncate=False)


+---+-----+-------+-----+--------+---------------+----------+-----------+------------+------------+------------------+--------------+----------------+--------+-----------+-----+--------+----------+---------------+--------------+-----+
|id |title|is_paid|price|headline|num_subscribers|avg_rating|num_reviews|num_comments|num_lectures|content_length_min|published_time|last_update_date|category|subcategory|topic|language|course_url|instructor_name|instructor_url|count|
+---+-----+-------+-----+--------+---------------+----------+-----------+------------+------------+------------------+--------------+----------------+--------+-----------+-----+--------+----------+---------------+--------------+-----+
+---+-----+-------+-----+--------+---------------+----------+-----------+------------+------------+------------------+--------------+----------------+--------+-----------+-----+--------+----------+---------------+--------------+-----+


In [29]:
# Count total number of duplicate rows
total_duplicates_courses = courses_df.groupBy(courses_df.columns).count().filter(col("count") > 1).count()

print(f"Total duplicate rows in courses table: {total_duplicates_courses}")


Total duplicate rows in courses table: 0


In [30]:
# Check for duplicate rows in comments table
from pyspark.sql.functions import count

duplicates_in_comments = comments_df.groupBy(comments_df.columns).count().filter(col("count") > 1)

# Show duplicate rows
duplicates_in_comments.show(truncate=False)


+---+---------+----+----+------------+-------+-----+
|id |course_id|rate|date|display_name|comment|count|
+---+---------+----+----+------------+-------+-----+
+---+---------+----+----+------------+-------+-----+


In [31]:
# Count total number of duplicate rows
total_duplicates_comments = comments_df.groupBy(comments_df.columns).count().filter(col("count") > 1).count()

print(f"Total duplicate rows in comments table: {total_duplicates_comments}")


Total duplicate rows in comments table: 0


In [32]:
comments_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- course_id: long (nullable = true)
 |-- rate: double (nullable = true)
 |-- date: date (nullable = true)
 |-- display_name: string (nullable = false)
 |-- comment: string (nullable = false)


In [33]:
courses_df.printSchema()

root
 |-- id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- is_paid: boolean (nullable = true)
 |-- price: double (nullable = true)
 |-- headline: string (nullable = true)
 |-- num_subscribers: double (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- num_reviews: double (nullable = true)
 |-- num_comments: double (nullable = true)
 |-- num_lectures: double (nullable = true)
 |-- content_length_min: double (nullable = true)
 |-- published_time: string (nullable = true)
 |-- last_update_date: string (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- language: string (nullable = true)
 |-- course_url: string (nullable = true)
 |-- instructor_name: string (nullable = true)
 |-- instructor_url: string (nullable = true)


In [34]:
from pyspark.sql.functions import to_date

# Convert published_time and last_update_date to 'yyyy-MM-dd' format
courses_df = courses_df.withColumn("published_time", to_date(col("published_time"), "yyyy-MM-dd")) \
                       .withColumn("last_update_date", to_date(col("last_update_date"), "yyyy-MM-dd"))

# Verify the schema and data
courses_df.printSchema()
courses_df.select("published_time", "last_update_date").show(5, truncate=False)


root
 |-- id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- is_paid: boolean (nullable = true)
 |-- price: double (nullable = true)
 |-- headline: string (nullable = true)
 |-- num_subscribers: double (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- num_reviews: double (nullable = true)
 |-- num_comments: double (nullable = true)
 |-- num_lectures: double (nullable = true)
 |-- content_length_min: double (nullable = true)
 |-- published_time: date (nullable = true)
 |-- last_update_date: date (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- language: string (nullable = true)
 |-- course_url: string (nullable = true)
 |-- instructor_name: string (nullable = true)
 |-- instructor_url: string (nullable = true)

+--------------+----------------+
|published_time|last_update_date|
+--------------+----------------+
|2010-08-05    |2020-11-06      |
|2010-01-12    |n

In [35]:
# Filter out invalid rates
comments_df = comments_df.filter((col("rate") >= 1) & (col("rate") <= 5))

# Filter out invalid prices
courses_df = courses_df.filter((col("price") >= 0) & (col("price") <= 1000))





In [36]:
# Count rows in comments table
comments_row_count = comments_df.count()
print(f"Total rows in comments table: {comments_row_count}")
# Count rows in courses table
courses_row_count = courses_df.count()
print(f"Total rows in courses table: {courses_row_count}")


Total rows in comments table: 99855
Total rows in courses table: 186788


In [37]:
from pyspark.sql.functions import col

# Rename id to course_id in courses_df
courses_df = courses_df.withColumnRenamed("id", "course_id")

# Ensure course_id data types match
comments_df = comments_df.withColumn("course_id", col("course_id").cast("long"))
courses_df = courses_df.withColumn("course_id", col("course_id").cast("long"))

# Perform the join
enriched_data = comments_df.join(courses_df, on="course_id", how="inner")




In [38]:
row_count = enriched_data.count()
print(f"Total number of rows in enriched_data: {row_count}")

Total number of rows in enriched_data: 92870


In [39]:
courses_df.printSchema()

root
 |-- course_id: long (nullable = true)
 |-- title: string (nullable = true)
 |-- is_paid: boolean (nullable = true)
 |-- price: double (nullable = true)
 |-- headline: string (nullable = true)
 |-- num_subscribers: double (nullable = true)
 |-- avg_rating: double (nullable = true)
 |-- num_reviews: double (nullable = true)
 |-- num_comments: double (nullable = true)
 |-- num_lectures: double (nullable = true)
 |-- content_length_min: double (nullable = true)
 |-- published_time: date (nullable = true)
 |-- last_update_date: date (nullable = true)
 |-- category: string (nullable = true)
 |-- subcategory: string (nullable = true)
 |-- topic: string (nullable = true)
 |-- language: string (nullable = true)
 |-- course_url: string (nullable = true)
 |-- instructor_name: string (nullable = true)
 |-- instructor_url: string (nullable = true)


In [40]:
duplicates_instructors = courses_df.groupBy("instructor_name").count().filter("count > 1")
duplicates_instructors.show()



+--------------------+-----+
|     instructor_name|count|
+--------------------+-----+
|         Angela Poch|    8|
|Bplans School of ...|    5|
|           Joe Saenz|    3|
|      Rexcel Cariaga|    3|
|      Daniel McCarty|    5|
|      Kenney Mencher|    2|
|           Mary Buck|    2|
|        Lissa Coffey|    7|
|Jef Gazley, M.S.,...|    8|
|Ricardo Párraga Z...|    9|
|        Peter Janzen|    3|
|       Amani Channel|    2|
|  Christelle Donaghy|    3|
| Aspiratech Training|    7|
|       Steve Reifman|    2|
|  Intellezy Trainers|  245|
|JMG Virtual Consu...|    8|
|Lee Jones BSc. MS...|    3|
|         Caleb Curry|    8|
|    Cristian Carrera|    3|
+--------------------+-----+
only showing top 20 rows


In [41]:
# Create Instructor Dimension using instructor_name and instructor_url
instructor_dim = courses_df.select(
    col("instructor_name"),
    col("instructor_url")
).dropDuplicates()

# Generate sequential IDs
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

window_spec_instructor = Window.orderBy("instructor_name", "instructor_url")
instructor_dim = instructor_dim.withColumn(
    "instructor_dim_id", row_number().over(window_spec_instructor)
)

instructor_dim.show()


+-------------------------+--------------------+-----------------+
|          instructor_name|      instructor_url|instructor_dim_id|
+-------------------------+--------------------+-----------------+
|          " NetworkHelp "|/user/parmod-dhim...|                1|
|     "Mr. Casual" Char...|/user/charlie-dec...|                2|
|     "Real Teacher" ∼F...|      /user/mak-fav/|                3|
|       #Bora Certificar ?|/user/isabella-so...|                4|
|     #Digitalizzando A...|      /user/syrus-4/|                5|
|     'Regen Ray' Milidoni|  /user/raymilidoni/|                6|
|     (Bo)Siripa Aruenpong|/user/siripa-arue...|                7|
|     (International Di...|/user/nicktoussaint/|                8|
|        (Mila) Mengyun Yi|    /user/mengyun-8/|                9|
|(あくしょん) Murakami ...|/user/murakami-yo...|               10|
|  (株) ケン・ミュージック|/user/kenmiyuzits...|               11|
|  (株)笑い総研 大久保信克|/user/da-jiu-bao-...|               12|
|          + DoctorA

In [42]:
from pyspark.sql.functions import lit

# Fill null instructor_url with a placeholder
instructor_dim = instructor_dim.fillna({"instructor_url": "Unknown"})





In [43]:
# Create Course Dimension
course_dim = courses_df.select(
    col("course_id"),
    col("title"),
    col("price"),
    col("is_paid"),
    col("course_url")
).dropDuplicates()

# Generate sequential IDs
window_spec_course = Window.orderBy("course_id")
course_dim = course_dim.withColumn(
    "course_dim_id", row_number().over(window_spec_course)
)

course_dim.show()


+---------+--------------------+------+-------+--------------------+-------------+
|course_id|               title| price|is_paid|          course_url|course_dim_id|
+---------+--------------------+------+-------+--------------------+-------------+
|     2762|Simple Strategy f...| 39.99|   true|/course/swing-tra...|            1|
|     4715|Online Vegan Vege...| 24.99|   true|/course/vegan-veg...|            2|
|     5664|How To Become a V...| 19.99|   true|/course/see-my-pe...|            3|
|     7723|How to Train a Puppy|199.99|   true|/course/complete-...|            4|
|     8069|    Curso SEO Online| 99.99|   true|/course/curso-de-...|            5|
|     8075|How to Create an ...|149.99|   true|/course/how-to-cr...|            6|
|     8082|Ruby Programming ...| 74.99|   true|/course/learn-rub...|            7|
|     8139|14-Day Yoga Detox...| 29.99|   true|/course/yoga-for-...|            8|
|     8157|Web Design from t...|159.99|   true|/course/web-desig...|            9|
|   

In [44]:
# Create Category Dimension
category_dim = courses_df.select(
    col("category"),
    col("subcategory"),
    col("topic")
).dropDuplicates()

# Generate sequential IDs
window_spec_category = Window.orderBy("category", "subcategory", "topic")
category_dim = category_dim.withColumn(
    "category_dim_id", row_number().over(window_spec_category)
)

category_dim.show()


+--------+--------------------+--------------------+---------------+
|category|         subcategory|               topic|category_dim_id|
+--------+--------------------+--------------------+---------------+
|Business|Business Analytic...|         A/B Testing|              1|
|Business|Business Analytic...|AWS Certified Mac...|              2|
|Business|Business Analytic...|          Accounting|              3|
|Business|Business Analytic...|               Agile|              4|
|Business|Business Analytic...|          Algorithms|              5|
|Business|Business Analytic...|             Alteryx|              6|
|Business|Business Analytic...|          Amazon AWS|              7|
|Business|Business Analytic...|   Amazon QuickSight|              8|
|Business|Business Analytic...|           AngularJS|              9|
|Business|Business Analytic...|            AnyLogic|             10|
|Business|Business Analytic...|        Apache Kafka|             11|
|Business|Business Analytic...|   

In [45]:
# Create Language Dimension (Finalized - single instance)
language_dim = courses_df.select(
    col("language")
).dropDuplicates()

# Generate sequential IDs for language dimension
window_spec_language = Window.orderBy("language")
language_dim = language_dim.withColumn(
    "language_dim_id", row_number().over(window_spec_language))
language_dim.show()


+---------+---------------+
| language|language_dim_id|
+---------+---------------+
|Afrikaans|              1|
| Albanian|              2|
|   Arabic|              3|
| Armenian|              4|
|   Aymara|              5|
|    Azeri|              6|
|   Basque|              7|
|  Bengali|              8|
|Bulgarian|              9|
|  Burmese|             10|
|  Catalan|             11|
| Croatian|             12|
|    Czech|             13|
|   Danish|             14|
|    Dutch|             15|
|  English|             16|
| Estonian|             17|
|  Faroese|             18|
| Filipino|             19|
|  Finnish|             20|
+---------+---------------+
only showing top 20 rows


In [46]:
#Collect all unique dates from courses and comments tables
from pyspark.sql.functions import to_date, col, year, month, dayofmonth, row_number
unique_dates = (
    courses_df.select(to_date(col("published_time")).alias("date"))
    .union(courses_df.select(to_date(col("last_update_date")).alias("date")))
    .union(comments_df.select(to_date(col("date")).alias("date")))
    .dropDuplicates()
)

# Add year, month, and day columns
date_dim = unique_dates.withColumn("year", year(col("date"))) \
                       .withColumn("month", month(col("date"))) \
                       .withColumn("day", dayofmonth(col("date")))

# Generate sequential IDs
window_spec_date = Window.orderBy("date")
date_dim = date_dim.withColumn(
    "date_dim_id", row_number().over(window_spec_date)
)

date_dim.show()

+----------+----+-----+----+-----------+
|      date|year|month| day|date_dim_id|
+----------+----+-----+----+-----------+
|      null|null| null|null|          1|
|2010-04-14|2010|    4|  14|          2|
|2010-08-05|2010|    8|   5|          3|
|2010-10-13|2010|   10|  13|          4|
|2011-06-20|2011|    6|  20|          5|
|2011-06-23|2011|    6|  23|          6|
|2011-07-06|2011|    7|   6|          7|
|2011-07-08|2011|    7|   8|          8|
|2011-07-09|2011|    7|   9|          9|
|2011-07-11|2011|    7|  11|         10|
|2011-07-12|2011|    7|  12|         11|
|2011-07-15|2011|    7|  15|         12|
|2011-07-18|2011|    7|  18|         13|
|2011-07-23|2011|    7|  23|         14|
|2011-07-28|2011|    7|  28|         15|
|2011-07-29|2011|    7|  29|         16|
|2011-08-03|2011|    8|   3|         17|
|2011-08-08|2011|    8|   8|         18|
|2011-08-13|2011|    8|  13|         19|
|2011-08-22|2011|    8|  22|         20|
+----------+----+-----+----+-----------+
only showing top

In [47]:
from pyspark.sql.functions import col

# Join the dimensions with the courses_df to create the CoursePerformance Fact Table
course_performance_fact = courses_df.alias("c") \
    .join(course_dim.alias("cd"), col("c.course_id") == col("cd.course_id"), "inner") \
    .join(instructor_dim.alias("id"), (col("c.instructor_name") == col("id.instructor_name")) & 
         (col("c.instructor_url") == col("id.instructor_url")), "inner") \
    .join(category_dim.alias("catd"), (col("c.category") == col("catd.category")) & 
         (col("c.subcategory") == col("catd.subcategory")) & 
         (col("c.topic") == col("catd.topic")), "inner") \
    .join(language_dim.alias("ld"), col("c.language") == col("ld.language"), "inner") \
    .join(date_dim.alias("dd"), to_date(col("c.published_time")) == col("dd.date"), "inner") \
    .select(
        col("cd.course_dim_id"),
        col("ld.language_dim_id"),
        col("catd.category_dim_id"),
        col("id.instructor_dim_id"),
        col("dd.date_dim_id").alias("published_date_dim_id"),
        col("c.num_subscribers"),
        col("c.avg_rating"),
        col("c.num_reviews"),
        col("c.num_comments"),
        col("c.num_lectures"),
        col("c.content_length_min")
    )

course_performance_fact = course_performance_fact.withColumnRenamed("row_number()", "performance_fact_id")

# Display the CoursePerformance Fact Table
course_performance_fact.show()



+-------------+---------------+---------------+-----------------+---------------------+---------------+----------+-----------+------------+------------+------------------+
|course_dim_id|language_dim_id|category_dim_id|instructor_dim_id|published_date_dim_id|num_subscribers|avg_rating|num_reviews|num_comments|num_lectures|content_length_min|
+-------------+---------------+---------------+-----------------+---------------------+---------------+----------+-----------+------------+------------+------------------+
|            6|             16|           2619|            40607|                    7|        10761.0|       3.9|      349.0|       101.0|        87.0|             526.0|
|           15|             16|          11085|            11359|                   14|         4454.0|      4.35|      829.0|       147.0|       230.0|            1813.0|
|           53|             16|           3723|             8924|                   40|          743.0|       4.3|       87.0|        22.0| 

In [48]:
# Unique courses in original dataset
unique_courses = courses_df.select("course_id").distinct().count()
print(f"Total Unique Courses in Original Dataset: {unique_courses}")

# Total records in CoursePerformance Fact Table
performance_fact_count = course_performance_fact.count()
print(f"Total Records in CoursePerformance Fact Table: {performance_fact_count}")

Total Unique Courses in Original Dataset: 186788
Total Records in CoursePerformance Fact Table: 186788


In [49]:
courses_df.select("course_id").distinct().show(10, truncate=False)


+---------+
|course_id|
+---------+
|8075     |
|8420     |
|10971    |
|12248    |
|14706    |
|15305    |
|15392    |
|16082    |
|17158    |
|17261    |
+---------+
only showing top 10 rows


In [50]:
filtered_data = courses_df.filter(courses_df["course_id"] == 8075)
filtered_data.show()


+---------+--------------------+-------+------+--------------------+---------------+----------+-----------+------------+------------+------------------+--------------+----------------+--------+-----------+----------+--------+--------------------+----------------+--------------------+
|course_id|               title|is_paid| price|            headline|num_subscribers|avg_rating|num_reviews|num_comments|num_lectures|content_length_min|published_time|last_update_date|category|subcategory|     topic|language|          course_url| instructor_name|      instructor_url|
+---------+--------------------+-------+------+--------------------+---------------+----------+-----------+------------+------------+------------------+--------------+----------------+--------+-----------+----------+--------+--------------------+----------------+--------------------+
|     8075|How to Create an ...|   true|149.99|You don't need to...|        10761.0|       3.9|      349.0|       101.0|        87.0|            

In [51]:
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number

# Create User Dimension table
user_dim = comments_df.select(
    col("id").alias("user_id"),
    col("display_name")
).dropDuplicates()

# Generate surrogate key for users
window_spec_user = Window.orderBy("user_id")
user_dim = user_dim.withColumn(
    "user_dim_id", row_number().over(window_spec_user)
)

# Display the User Dimension table
user_dim.show()



+-------+------------+-----------+
|user_id|display_name|user_dim_id|
+-------+------------+-----------+
|   2591|         ian|          1|
|   3791|      louise|          2|
|   3833|       josef|          3|
|   3841|      pamela|          4|
|   3970|        adam|          5|
|   4012|    abhishek|          6|
|   4409|         jae|          7|
|   4504|       david|          8|
|   4664|       sarah|          9|
|   4842|      trisha|         10|
|   5253|       zenko|         11|
|   5954|      samuel|         12|
|   6164|       emily|         13|
|   6302|       linda|         14|
|   6735|         syd|         15|
|   6741|       steve|         16|
|   6802|     sultana|         17|
|   6831|        mike|         18|
|   6848|        jeff|         19|
|   6959|        sean|         20|
+-------+------------+-----------+
only showing top 20 rows


In [53]:
# Count the total number of unique user_dim_ids
total_user_dim_ids = user_dim.select("user_dim_id").distinct().count()

print(f"Total number of unique user_dim_ids: {total_user_dim_ids}")


Total number of unique user_dim_ids: 99855


In [54]:
from pyspark.sql.functions import col, to_date

# Ensure feedback date is in the correct format
comments_df = comments_df.withColumn("feedback_date", to_date(col("date")))

# Join comments_df with course_dim, user_dim, and date_dim
course_feedback_fact = comments_df.alias("cf") \
    .join(course_dim.alias("cd"), col("cf.course_id") == col("cd.course_id"), "inner") \
    .join(user_dim.alias("ud"), col("cf.id") == col("ud.user_id"), "inner") \
    .join(date_dim.alias("dd"), col("cf.feedback_date") == col("dd.date"), "inner") \
    .select(
        col("cd.course_dim_id"),
        col("ud.user_dim_id"),
        col("dd.date_dim_id").alias("feedback_date_dim_id"),
        col("cf.rate"),
        col("cf.comment")
    )


# Step 4: Show result
course_feedback_fact.show()


+-------------+-----------+--------------------+----+-------------------------------------+
|course_dim_id|user_dim_id|feedback_date_dim_id|rate|                              comment|
+-------------+-----------+--------------------+----+-------------------------------------+
|       173948|      92586|                3805| 1.0|                 nothing informati...|
|       139093|      51003|                3497| 5.0|                 I would love to t...|
|        88364|      51516|                3504| 5.0|小太郎シリーズ３コース受講しました...|
|        88487|      68820|                3658| 5.0|                        Great course!|
|        90472|      23876|                3077| 1.0|                 There was very li...|
|       174323|      75156|                3706| 5.0|                 I got hooked on t...|
|        90043|      24656|                3092| 3.0|                 السلام عليكم\nاول...|
|       139628|      47672|                3451| 5.0|                 Iniciar es la par...|
|    

In [55]:
fact_count = course_feedback_fact.count()
print(f"Total rows in fact table: {fact_count}")

Total rows in fact table: 92870


In [56]:
# Find unmatched course_ids in comments_df
unmatched_courses = comments_df.select("course_id").subtract(course_dim.select("course_id"))
print(f"Unmatched course_id count: {unmatched_courses.count()}")

# Find unmatched user_ids in comments_df
unmatched_users = comments_df.select("id").subtract(user_dim.select("user_id"))
print(f"Unmatched user_id count: {unmatched_users.count()}")

# Find unmatched feedback dates in comments_df
unmatched_dates = comments_df.select(to_date(col("date")).alias("feedback_date")) \
    .subtract(date_dim.select("date"))
print(f"Unmatched feedback_date count: {unmatched_dates.count()}")


Unmatched course_id count: 3500
Unmatched user_id count: 0
Unmatched feedback_date count: 0


In [57]:
unmatched_course_ids = comments_df.select("course_id").subtract(course_dim.select("course_id"))
print(f"Number of unmatched course_id: {unmatched_course_ids.count()}")
unmatched_course_ids.show()


Number of unmatched course_id: 3500
+---------+
|course_id|
+---------+
|  3894456|
|  4106512|
|  4813950|
|  4457246|
|  4175888|
|  2957632|
|  4160760|
|  3885136|
|  3992348|
|  3808988|
|  4341190|
|  3688594|
|  4122310|
|  4500434|
|  3955796|
|  4206800|
|  4389016|
|  4503836|
|  4170294|
|  4540588|
+---------+
only showing top 20 rows


In [58]:
original_course_id = course_dim.filter(col("course_dim_id") == 173948).select("course_id").collect()[0][0]
print(f"Original Course ID: {original_course_id}")


Original Course ID: 4693438


In [59]:
original_user_id = user_dim.filter(col("user_dim_id") == 92586).select("user_id").collect()[0][0]
print(f"Original User ID: {original_user_id}")


Original User ID: 124129784


In [60]:
original_date = date_dim.filter(col("date_dim_id") == 3805).select("date").collect()[0][0]
print(f"Original Date: {original_date}")


Original Date: 2022-09-20


In [61]:
comments_df.filter(
    (col("course_id") == original_course_id) &
    (col("id") == original_user_id) &
    (to_date(col("date")) == original_date)
).show(truncate=False)


+---------+---------+----+----------+------------+--------------------------+-------------+
|id       |course_id|rate|date      |display_name|comment                   |feedback_date|
+---------+---------+----+----------+------------+--------------------------+-------------+
|124129784|4693438  |1.0 |2022-09-20|jiang       |nothing information there,|2022-09-20   |
+---------+---------+----+----------+------------+--------------------------+-------------+


In [62]:
# Course Dimension
course_dim.write.mode("overwrite").parquet("s3://udemy-data-project/processed/course_dim/")

# User Dimension
user_dim.write.mode("overwrite").parquet("s3://udemy-data-project/processed/user_dim/")

# Category Dimension
category_dim.write.mode("overwrite").parquet("s3://udemy-data-project/processed/category_dim/")

# Language Dimension
language_dim.write.mode("overwrite").parquet("s3://udemy-data-project/processed/language_dim/")

# Date Dimension
date_dim.write.mode("overwrite").parquet("s3://udemy-data-project/processed/date_dim/")

# Instructor Dimension
instructor_dim.write.mode("overwrite").parquet("s3://udemy-data-project/processed/instructor_dim/")

# CourseFeedback Fact Table
course_feedback_fact.write.mode("overwrite").parquet("s3://udemy-data-project/processed/course_feedback_fact/")

# CoursePerformance Fact Table (if applicable)
course_performance_fact.write.mode("overwrite").parquet("s3://udemy-data-project/processed/course_performance_fact/")







In [None]:
s3output = glueContext.getSink(
  path="s3://bucket_name/folder_name",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=[],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="demo", catalogTableName="populations"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(DyF)