## Step 2. Data Cleaning
### Read Raw Data in Spark

In [43]:
# pip install pyspark snowflake-connector-python snowflake-sqlalchemy

In [28]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lower
import boto3
import os

In [29]:
spark = SparkSession.builder.appName("DataCleaning&Loading").getOrCreate()

In [30]:
# Read raw CSV files
df_courses = spark.read.csv("./datasets/raw_courses.csv", header=True, inferSchema=True)
df_enrollments = spark.read.csv("./datasets/raw_enrollments.csv", header=True, inferSchema=True)
df_instructors = spark.read.csv("./datasets/raw_instructors.csv", header=True, inferSchema=True)
df_skills = spark.read.csv("./datasets/raw_skills.csv", header=True, inferSchema=True)

In [42]:
df_skills.show(5)

+--------------------+----------------+
|           course_id|      skill_name|
+--------------------+----------------+
|f9e421bc-1120-453...|Machine Learning|
|08252b39-1947-4a4...|          Python|
|085e0518-2d75-4dd...|      JavaScript|
|d7131c2e-2001-465...|Machine Learning|
|dafc3805-7f64-45d...|Machine Learning|
+--------------------+----------------+
only showing top 5 rows



In [31]:
df_courses.show(5)

+--------------------+--------------------+--------------------+------------+---------------+------------+--------+------+--------------+-------------+----------------+--------------------+----------+
|           course_id|         course_name|       instructor_id|    platform|       category|       level|language| price|duration_hours|lecture_count|certificate_type|          course_url|date_added|
+--------------------+--------------------+--------------------+------------+---------------+------------+--------+------+--------------+-------------+----------------+--------------------+----------+
|7a768555-a987-421...|Heavy what least ...|db52ca58-0500-4bc...|Khan Academy|Web Development|    Beginner|  French|439.55|          52.9|           52|  No Certificate|https://bernard.net/|2022-09-11|
|fb756923-f910-4bb...|    Side relate real|fb2ca025-adf4-462...|  Skillshare|Cloud Computing|    Advanced|  French|305.23|          59.2|           88|            NULL|https://bailey-pe...|2022-08

In [32]:
df_courses.printSchema()

root
 |-- course_id: string (nullable = true)
 |-- course_name: string (nullable = true)
 |-- instructor_id: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- category: string (nullable = true)
 |-- level: string (nullable = true)
 |-- language: string (nullable = true)
 |-- price: double (nullable = true)
 |-- duration_hours: double (nullable = true)
 |-- lecture_count: integer (nullable = true)
 |-- certificate_type: string (nullable = true)
 |-- course_url: string (nullable = true)
 |-- date_added: date (nullable = true)



In [33]:
# Fill missing values in courses (certificate_type) with 'No Certificate'
df_courses = df_courses.fillna({'certificate_type': 'No Certificate'})

# Convert text to lowercase
df_courses = df_courses.withColumn("category", lower(df_courses["category"]))

# Convert price data type to float
df_courses = df_courses.withColumn("price", col("price").cast("float"))

df_courses.show(5)

+--------------------+--------------------+--------------------+------------+---------------+------------+--------+------+--------------+-------------+----------------+--------------------+----------+
|           course_id|         course_name|       instructor_id|    platform|       category|       level|language| price|duration_hours|lecture_count|certificate_type|          course_url|date_added|
+--------------------+--------------------+--------------------+------------+---------------+------------+--------+------+--------------+-------------+----------------+--------------------+----------+
|7a768555-a987-421...|Heavy what least ...|db52ca58-0500-4bc...|Khan Academy|web development|    Beginner|  French|439.55|          52.9|           52|  No Certificate|https://bernard.net/|2022-09-11|
|fb756923-f910-4bb...|    Side relate real|fb2ca025-adf4-462...|  Skillshare|cloud computing|    Advanced|  French|305.23|          59.2|           88|  No Certificate|https://bailey-pe...|2022-08

In [34]:
df_enrollments.show(5)

+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+---------+------+--------------------+
|       enrollment_id|           course_id|          student_id|     student_name|       student_email|enrolled_date|completed|rating|         review_text|
+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+---------+------+--------------------+
|9d3cc9ca-db24-4ea...|cd9b79fd-a04c-47f...|5ae654b7-9e3b-47e...|      Alice Weber|davidbrown@exampl...|   2022-09-05|    false|  4.99|                NULL|
|d5bbe3a4-40b8-4dc...|6ce763e1-708f-45a...|8e487a8c-e3e4-494...|      Nancy Green|suzanne28@example...|   2024-04-11|    false|  2.61|Middle her shake ...|
|ac49b6e2-27a4-474...|21419a14-9acc-400...|666bd0ee-b257-4bf...|Elizabeth Jackson|rickey20@example.org|   2023-12-11|    false|  NULL|                NULL|
|5e460f1c-2169-40d...|cdcc2d6a-1a16-498...|87400c48-d854-43e...|

In [35]:
df_enrollments.printSchema()

root
 |-- enrollment_id: string (nullable = true)
 |-- course_id: string (nullable = true)
 |-- student_id: string (nullable = true)
 |-- student_name: string (nullable = true)
 |-- student_email: string (nullable = true)
 |-- enrolled_date: date (nullable = true)
 |-- completed: boolean (nullable = true)
 |-- rating: double (nullable = true)
 |-- review_text: string (nullable = true)



In [39]:
# Fill missing ratings with the average rating
avg_rating = df_enrollments.selectExpr("avg(rating) as avg_rating").collect()[0]["avg_rating"]
df_enrollments = df_enrollments.fillna({'rating': avg_rating})
df_enrollments = df_enrollments.fillna({"completed": False})

# Format date correctly
df_enrollments = df_enrollments.withColumn("enrolled_date", col("enrolled_date").cast("date"))

# Convert rating data type to float
df_enrollments = df_enrollments.withColumn("rating", col("rating").cast("float"))
df_enrollments.show(5)

+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+---------+--------+--------------------+
|       enrollment_id|           course_id|          student_id|     student_name|       student_email|enrolled_date|completed|  rating|         review_text|
+--------------------+--------------------+--------------------+-----------------+--------------------+-------------+---------+--------+--------------------+
|9d3cc9ca-db24-4ea...|cd9b79fd-a04c-47f...|5ae654b7-9e3b-47e...|      Alice Weber|davidbrown@exampl...|   2022-09-05|    false|    4.99|                NULL|
|d5bbe3a4-40b8-4dc...|6ce763e1-708f-45a...|8e487a8c-e3e4-494...|      Nancy Green|suzanne28@example...|   2024-04-11|    false|    2.61|Middle her shake ...|
|ac49b6e2-27a4-474...|21419a14-9acc-400...|666bd0ee-b257-4bf...|Elizabeth Jackson|rickey20@example.org|   2023-12-11|    false|3.768775|                NULL|
|5e460f1c-2169-40d...|cdcc2d6a-1a16-498...|87400c48-

### Save Cleaned Data in Optimized Parquet Format

In [None]:
s3_client = boto3.client("s3", region_name="us-west-2")
bucket_name = "learning-platform-cleaned-data"

# List of DataFrames
dataframes = {
    "courses": df_courses,
    "enrollments": df_enrollments,
    "instructors": df_instructors,
    "skills": df_skills
}

for name, df in dataframes.items():
    output_dir = f"cleaned_{name}"
    df.coalesce(1).write.csv(output_dir, header=True, mode="overwrite")

    file_name = [f for f in os.listdir(output_dir) if f.endswith(".csv")][0]
    output_file = os.path.join(output_dir, file_name)

    # Upload to S3
    s3_object_key = f"{name}.csv"
    s3_client.upload_file(output_file, bucket_name, s3_object_key)

    print(f"Uploaded {s3_object_key} to S3 successfully!")

print("All files uploaded successfully! ðŸš€")

Uploaded enrollments.csv to S3 successfully!
All files uploaded successfully! ðŸš€


In [None]:
df_courses.write.mode("overwrite").parquet("processed_courses")
df_enrollments.write.mode("overwrite").parquet("processed_enrollments")
df_instructors.write.mode("overwrite").parquet("processed_instructors")
df_skills.write.mode("overwrite").parquet("processed_skills")

                                                                                