In [21]:
import findspark
findspark.init()
findspark.find()
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from datetime import datetime
from pyspark.sql.functions import col, from_json, coalesce, regexp_replace
from pyspark.sql.types import StructType, StructField, StringType, DateType

In [22]:
# Creating a Spark session
spark = SparkSession.builder.appName("EdtechDataProcessingWithHadoop") \
        .config("spark.hadoop.fs.defaultFS", "hdfs://localhost:9820") \
        .enableHiveSupport().getOrCreate()

In [23]:
# Establishing PostgreSQL connectionn
postgres_url = "jdbc:postgresql://localhost:5432/edtech"
properties = {
    "user": "postgres",
    "password": "admin",
    "driver": "org.postgresql.Driver"
}

In [24]:
# Table containing JSON structure column to parse
user_registrations = "user_registrations"

In [25]:
# Reading data from PostgreSQL
df = spark.read.format("jdbc").option("url", postgres_url).option("dbtable", user_registrations).option("user", properties["user"]).option("password", 
     properties["password"]).option("driver", properties["driver"]).load()

In [26]:
# Defining schema for JSON data
json_schema = StructType([
    StructField("address", StructType([
        StructField("city", StringType()),
        StructField("state", StringType()),
        StructField("country", StringType())
    ])),
    StructField("education_info", StructType([
        StructField("highest_degree", StringType()),
        StructField("cgpa", StringType())
    ])),
    StructField("profile", StructType([
        StructField("gender", StringType()),
        StructField("dob", DateType())
    ])),
    StructField("dob", DateType())
])

In [27]:
# Parsing the JSON structure column
df_parsed = df.withColumn("user_info_struct", from_json(col("user_info"), json_schema))

In [28]:
# Transforming json structure column into separate individual columns
user_registrations_parsed = df_parsed.select(
    col("user_id"),
    col("registration_date"),
    col("user_info_struct.address.city").alias("city"),
    col("user_info_struct.address.state").alias("state"),
    col("user_info_struct.address.country").alias("country"),
    col("user_info_struct.education_info.highest_degree").alias("highest_degree"),
    col("user_info_struct.education_info.cgpa").alias("cgpa"),
    col("user_info_struct.profile.gender").alias("gender"),
    coalesce(col("user_info_struct.profile.dob"), col("user_info_struct.dob")).alias("dob")
)

In [29]:
# Show the resulting DataFrame
user_registrations_parsed.show(5)

+-----------+-----------------+-------------+--------------+-------+--------------+----+------+----------+
|    user_id|registration_date|         city|         state|country|highest_degree|cgpa|gender|       dob|
+-----------+-----------------+-------------+--------------+-------+--------------+----+------+----------+
|CCBP1001USR|       2021-12-12|Visakhapatnam|Andhra Pradesh|  India|        B.Tech| 7.2|  male|1997-01-01|
|CCBP1002USR|       2021-01-11|     KAKINADA|Andhra Pradesh|  India|          10th| 8.2|  male|2001-01-01|
|CCBP1003USR|       2021-02-11|Visakhapatnam|Andhra Pradesh|  India|        Degree| 7.2|female|1997-10-01|
|CCBP1004USR|       2021-12-11|    Hyderabad|     Telangana|  India|          10th| 8.2|female|2002-10-01|
|CCBP1007USR|       2021-01-10|      Kurnool|Andhra Pradesh|  India|          10th| 8.2|  male|1998-01-01|
+-----------+-----------------+-------------+--------------+-------+--------------+----+------+----------+
only showing top 5 rows



In [30]:
# Reading data from PostgreSQL tables into PySpark DataFrames
source = ["tracks", "courses", "topics", "lessons", "user_progress_logs", "user_feedbacks"]
dfs = {}

for table_name in source:
    dfs[table_name] = spark.read \
        .format("jdbc") \
        .option("url", postgres_url) \
        .option("dbtable", table_name) \
        .option("user", properties["user"]) \
        .option("password", properties["password"]) \
        .option("driver", properties["driver"]) \
        .load()

In [31]:
# Consolidating data with joins
transformed_df = (dfs["tracks"].alias("tr")
    .join(dfs["courses"].alias("c"), col("tr.track_id") == col("c.track_id"), "full")
    .join(dfs["topics"].alias("tp"), col("c.course_id") == col("tp.course_id"), "full")
    .join(dfs["lessons"].alias("l"), col("tp.topic_id") == col("l.topic_id"), "full")
    .join(dfs["user_progress_logs"].alias("upl"), col("l.lesson_id") == col("upl.lesson_id"), "full")
    .join(user_registrations_parsed.alias("urp"), col("upl.user_id") == col("urp.user_id"), "full")
    .join(dfs["user_feedbacks"].alias("uf"), (col("urp.user_id") == col("uf.user_id")) & (col("l.lesson_id") == col("uf.lesson_id")), "full")
    .select(
        col("urp.user_id"),
        col("urp.registration_date"),
        col("l.lesson_title"),
        col("l.lesson_type"),
        col("l.duration_in_sec"),
        col("tp.topic_title"),
        col("c.course_title"),
        col("tr.track_title"),
        col("upl.overall_completion_percentage"),
        col("upl.activity_recorded_on"),
        col("uf.indicator"),
        col("uf.response"),
        col("urp.city"),
        col("urp.highest_degree"),
        col("urp.cgpa"),
        col("urp.gender"),
        col("urp.dob"),
    )
)

In [32]:
# Define the new HDFS base path
hdfs_path = "hdfs://localhost:9820/edtech/"

# Generate file name with the current date
current_date = datetime.now().strftime("%Y%m%d_%H%M%S")
file_name = f"transformed_data_{current_date}.parquet"

#Full hdfs path
full_hdfs_path = f"{hdfs_path}/{file_name}"

# Write DataFrame to Parquet File in the new HDFS path
transformed_df.write.mode("overwrite").parquet(full_hdfs_path)

# Create a temporary view named "temp_table"
loaded_df = spark.read.parquet(full_hdfs_path)
loaded_df.createOrReplaceTempView("temp_table")

In [33]:
# Hive Target Load for loading transformed data
hive_database = 'db_edtech'
hive_table_01 = 'edtech_analysis_01'
hive_table_02 = 'edtech_analysis_02'

# Creating Target Load
spark.sql(f"CREATE DATABASE IF NOT EXISTS {hive_database}")
spark.sql(f"USE {hive_database}")

DataFrame[]

#### Analysis based on Users
How do completion rates differ among users based on their highest degree? Is there a significant difference in completion percentages for users with different degrees?

In [34]:
analysis01 = spark.sql(f"""
SELECT 
count(user_id) as user_count, highest_degree, AVG(overall_completion_percentage) AS avg_completion_percentage
FROM temp_table
WHERE highest_degree IS NOT NULL
GROUP BY highest_degree
ORDER BY avg_completion_percentage DESC
""")

# Save the analysis results into a Hive table
analysis01.write.mode("overwrite").saveAsTable(hive_table_01)

# Show the results
spark.sql(f"SELECT * FROM {hive_table_01}").show()

+----------+--------------+-------------------------+
|user_count|highest_degree|avg_completion_percentage|
+----------+--------------+-------------------------+
|       161|          10th|        52.84516129032258|
|        78|        B.Tech|        51.76712328767123|
|        62|         Inter|                    49.52|
|        62|        Degree|        48.60655737704918|
|         5|          Ph.d|                     NULL|
+----------+--------------+-------------------------+



#### Analysis based on Lessons and Courses
How does the average lesson duration impact the overall completion percentage across different topics and courses?

In [35]:
analysis02 = spark.sql(f"""
SELECT
topic_title,
course_title,
AVG(duration_in_sec) AS avg_lesson_duration,
AVG(overall_completion_percentage) AS avg_completion_percentage
FROM temp_table
GROUP BY topic_title, course_title
ORDER BY avg_lesson_duration, avg_completion_percentage DESC;
""")

# Save the analysis results into a Hive table
analysis02.write.mode("overwrite").saveAsTable(hive_table_02)

# Show the results
spark.sql(f"SELECT * FROM {hive_table_02}").show()

+--------------------+--------------------+-------------------+-------------------------+
|         topic_title|        course_title|avg_lesson_duration|avg_completion_percentage|
+--------------------+--------------------+-------------------+-------------------------+
|           Jobby App|    Practical Python|               NULL|                     NULL|
|Prototypes and Cl...|JavaScript Essent...|               NULL|                     NULL|
|Collaborating wit...|Developer Foundat...|  2914.285714285714|        49.47142857142857|
|Third Party Packages|    Practical Python|  3080.769230769231|       50.705882352941174|
|Working with Comm...|Developer Foundat...| 3208.1632653061224|        53.89795918367347|
| Binary Search Trees|     Data Structures|           3271.875|                53.765625|
|Priority Queues &...|     Data Structures|             3600.0|                     63.4|
|                NULL|                NULL|             3600.0|                     61.7|
|Introduct

In [36]:
try:
    # Clean up data
    spark.sql(f"DROP DATABASE IF EXISTS {hive_database} CASCADE")

    # Try to switch to Hive database
    spark.sql(f"USE {hive_database}")

    # If the database exists, execute SHOW TABLES
    spark.sql(f"SHOW TABLES IN {hive_database}").show()

except Exception as e:
    # Catch the exception if the database does not exist
    #print(f"Error: {e}")
    print(f"The Hive database '{hive_database}' does not exist or an error occurred.")

finally:
    spark.stop()

The Hive database 'db_edtech' does not exist or an error occurred.
