In [1]:
import time
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

spark = SparkSession \
    .builder \
    .appName("UniDbMongoSpark") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/university_db_new") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017/university_db_new") \
    .config("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:3.0.1") \
    .getOrCreate()

spark.sparkContext.setLogLevel("FATAL")

students_df = spark.read.format("mongo").option("uri", "mongodb://localhost:27017/university_db_new.students").load()
courses_df = spark.read.format("mongo").option("uri", "mongodb://localhost:27017/university_db_new.courses").load()

departments_df = spark.read.format("mongo").option("uri", "mongodb://localhost:27017/university_db_new.departments").load()
instructors_df = spark.read.format("mongo").option("uri", "mongodb://localhost:27017/university_db_new.instructors").load()

repartition = True
n_partitions = 24

if repartition:
    students_df = students_df.repartition(n_partitions, "student_id")
    departments_df = departments_df.repartition(n_partitions, "department_id")
    courses_df = courses_df.repartition(n_partitions, "course_id")
    instructors_df = instructors_df.repartition(n_partitions, "instructor_id")

    
def measure_query(query_function):
    start_time = time.time()
    result_df = query_function()
    duration = time.time() - start_time
    print(f"Execution Time: {duration:.4f} seconds")
    return result_df




24/09/25 18:43:00 WARN Utils: Your hostname, mayank resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
24/09/25 18:43:00 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/mayank/.local/lib/python3.10/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/mayank/.ivy2/cache
The jars for the packages stored in: /home/mayank/.ivy2/jars
org.mongodb.spark#mongo-spark-connector_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-fa31dd79-2aae-45f4-a966-1f922ae47323;1.0
	confs: [default]
	found org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 in central
	found org.mongodb#mongodb-driver-sync;4.0.5 in central
	found org.mongodb#bson;4.0.5 in central
	found org.mongodb#mongodb-driver-core;4.0.5 in central
:: resolution report :: resolve 546ms :: artifacts dl 11ms
	:: modules in use:
	org.mongodb#bson;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-core;4.0.5 from central in [default]
	org.mongodb#mongodb-driver-sync;4.0.5 from central in [default]
	org.mongodb.spark#mongo-spark-connector_2.12;3.0.1 from central in [default]
	---------------------------------------------------------------------
	|                  |            modules            ||   artifacts

In [2]:
# Query 1: Students enrolled in course_id 10
def query_students_with_course_10():
    exploded_courses = students_df.select("student_id", "first_name", "last_name", F.explode("courses").alias("course"))
    
    return exploded_courses.filter(exploded_courses.course.course_id == 10) \
        .select("student_id", "first_name", "last_name")

print("\nQuery 1: Students with Course ID 10")
students_with_course_10 = measure_query(query_students_with_course_10)
students_with_course_10.show()



Query 1: Students with Course ID 10
Execution Time: 0.3370 seconds


                                                                                

+----------+----------+---------+
|student_id|first_name|last_name|
+----------+----------+---------+
|       104|    Deanna|     Wolf|
|       104|    Deanna|     Wolf|
|       483|    Melvin|  Wallace|
|       483|    Melvin|  Wallace|
|       174|   Abigail|   Walker|
|       174|   Abigail|   Walker|
|       277| Alejandro|     Gill|
|       277| Alejandro|     Gill|
|       386|   Caitlin|     Hull|
|       386|   Caitlin|     Hull|
+----------+----------+---------+



In [3]:
# Query 2: Average Student Count per Course
def query_average_student_count_per_course():
    return courses_df.select("course_id", F.size("students").alias("student_count")) \
        .agg(F.avg("student_count").alias("avg_students")) \
        .select("avg_students")

print("\nQuery 2SS: Average Student Count per Course")
average_student_count = measure_query(query_average_student_count_per_course)
average_student_count.show()



Query 2SS: Average Student Count per Course
Execution Time: 0.2067 seconds
+------------------+
|      avg_students|
+------------------+
|3.3333333333333335|
+------------------+



In [4]:
# Query 3: Courses in department_id 3
def query_courses_by_department():
    return courses_df.filter(courses_df.department_id == 3) \
        .select("course_id", "course_name")

print("\nQuery 2: Courses by Department ID 3")
courses_by_department = measure_query(query_courses_by_department)
courses_by_department.show()


Query 2: Courses by Department ID 3
Execution Time: 0.0714 seconds
+---------+--------------------+
|course_id|         course_name|
+---------+--------------------+
|        3|            Someone.|
|       27|                Run.|
|       41|               Home.|
|        3|            Someone.|
|       27|                Run.|
|       41|               Home.|
|       71|War short since a...|
|       84|Modern speak smal...|
|       71|War short since a...|
|       84|Modern speak smal...|
|      124|       Seat tonight.|
|      124|       Seat tonight.|
|       30|      Person behind.|
|       30|      Person behind.|
|       36|          Sign kind.|
|       36|          Sign kind.|
|      139|Beautiful design ...|
|      139|Beautiful design ...|
|       13| Value fine subject.|
|       13| Value fine subject.|
+---------+--------------------+
only showing top 20 rows



In [5]:
# Query 4: Total Students per Department
def query_total_students_per_department():
    joined_df = students_df.join(departments_df, "department_id", "left")
    
    return joined_df.groupBy("department_name") \
        .agg(F.count("*").alias("total_students")) \
        .select(
            F.col("department_name"), 
            "total_students"
        )

print("\nQuery 4: Total Students per Department")
total_students_per_department = measure_query(query_total_students_per_department)
total_students_per_department.show()



Query 4: Total Students per Department
Execution Time: 0.1833 seconds




+--------------------+--------------+
|     department_name|total_students|
+--------------------+--------------+
|           Brown LLC|           232|
|        Schwartz Inc|           172|
|Aguilar, Taylor a...|           228|
|        Campbell Ltd|           152|
|       Munoz-Johnson|           240|
|          Carter LLC|           168|
|        Young-Obrien|           180|
|     Clayton-Johnson|           212|
|         Green Group|           224|
|          Santos Ltd|           192|
+--------------------+--------------+



                                                                                

In [6]:
# Query 5: Instructors with Course Count
def query_instructors_with_course_count():
    instructor_courses_df = instructors_df.join(courses_df, "instructor_id", "left") \
        .filter((F.col("course_id") >= 101) & (F.col("course_id") <= 125)) \
        .groupBy("instructor_id", "first_name", "last_name") \
        .agg(F.collect_list("course_id").alias("course_count"))  # Collect course IDs

    return instructor_courses_df.filter(F.size("course_count") < 25) \
        .select("instructor_id", "first_name", "last_name")

print("\nQuery 5: Instructors with Course Count")
instructors_with_course_count = measure_query(query_instructors_with_course_count)
instructors_with_course_count.show()


Query 5: Instructors with Course Count
Execution Time: 0.1489 seconds


                                                                                

+-------------+----------+---------+
|instructor_id|first_name|last_name|
+-------------+----------+---------+
|            2|    Edward|    Hurst|
|            5|      Noah|Patterson|
|            1|    Pamela|   Ashley|
+-------------+----------+---------+



In [7]:
# Query 6: Courses with Enrollment Information
def query_courses_with_enrollment():
    
    return courses_df.select(
            "course_id", 
            "course_name", 
            F.size("students").alias("enrollments")  
        ) \
        .orderBy(F.desc("enrollments")) \
        .limit(10)

print("\nQuery 1: Courses with Enrollment Information")
courses_with_enrollment = measure_query(query_courses_with_enrollment)
courses_with_enrollment.show()



Query 1: Courses with Enrollment Information
Execution Time: 0.0533 seconds
+---------+--------------------+-----------+
|course_id|         course_name|enrollments|
+---------+--------------------+-----------+
|      104|     State although.|          7|
|      136|        Become even.|          7|
|       93|Less dream image ...|          7|
|      136|        Become even.|          7|
|      135|Discussion interv...|          7|
|       93|Less dream image ...|          7|
|      104|     State although.|          7|
|      135|Discussion interv...|          7|
|      128|  Science bill line.|          6|
|       28|         Why career.|          6|
+---------+--------------------+-----------+

