In [1]:
# pip install pyspark

In [None]:
from pyspark import SparkContext,SparkConf

conf = SparkConf().set("spark.jars.packages", "org.mongodb.spark:mongo-spark-connector_2.12:10.4.0")
sc = SparkContext(conf=conf)

In [11]:
from pyspark.sql import SparkSession
spark = SparkSession.builder \
    .appName("MongoDBQueryExample") \
    .config("spark.mongodb.input.uri", "mongodb://localhost:27017/univresitydb") \
    .config("spark.mongodb.output.uri", "mongodb://localhost:27017/universitydb") \
    .getOrCreate()

In [12]:
students_df = spark.read.format("mongodb") \
    .option("database", "universitydb") \
    .option("collection", "students") \
    .load()
courses_df = spark.read.format("mongodb") \
    .option("database", "universitydb") \
    .option("collection", "courses") \
    .load()
instructors_df = spark.read.format("mongodb") \
    .option("database", "universitydb") \
    .option("collection", "instructors") \
    .load()

In [13]:
# students_df = students_df.repartition(10)

# courses_df = courses_df.repartition(10)
# instructors_df = instructors_df.repartition(10)

courses_partitioned_df = courses_df.repartition(10,"_id")  
students_df = students_df.repartition(10,"_id")
instructors_partitioned_df = instructors_df.repartition(10,"_id") 

In [21]:
from pyspark.sql import functions as F
import time

course_id = 1
start_time = time.time()

students_enrolled = students_df.withColumn("enrollment", F.explode("enrollments")) \
    .filter(F.col("enrollment.course_id") == course_id) \
    .select("_id","first_name", "last_name")

students_enrolled.show()

end_time = time.time()
print(f"Query Time: {end_time - start_time} seconds")


+---+----------+---------+
|_id|first_name|last_name|
+---+----------+---------+
| 10|       Dre|  Kersley|
|  1|    Sharyl|   Jaouen|
| 11|      Gabi|  Crystal|
| 61|    Victor|   Sleany|
| 56|      Tome|    Maven|
| 13|    Jo-ann|Heindrich|
| 57|    Nikkie|Eadmeades|
| 16|  Collette| Brenneke|
+---+----------+---------+

Query Time: 0.2509124279022217 seconds


In [15]:
from pyspark.sql import functions as F
import time

instructor_id = 1  
start_time = time.time()

courses_by_instructor = courses_df.filter(F.col("instructor.instructor_id") == instructor_id) \
    .withColumnRenamed("_id", "course_id")

students_with_enrollments = students_df.withColumn("enrollment", F.explode("enrollments"))

avg_students_per_course = students_with_enrollments \
    .join(courses_by_instructor, F.col("enrollment.course_id") == F.col("course_id"), "inner") \
    .groupBy("course_id") \
    .agg(F.count("enrollment.course_id").alias("student_count")) \
    .agg(F.avg("student_count").alias("average_students"))

avg_students_per_course.show()

end_time = time.time()
print(f"Query Time: {end_time - start_time} seconds")


+----------------+
|average_students|
+----------------+
|             3.0|
+----------------+

Query Time: 0.48116493225097656 seconds


In [22]:
department_id = 2  

start_time = time.time()

courses_by_department = courses_df.filter(F.col("department.department_id") == department_id)

courses_by_department.select("_id","course_name", "course_code").show()

end_time = time.time()
print(f"Query Time: {end_time - start_time} seconds")


+---+--------------------+-----------+
|_id|         course_name|course_code|
+---+--------------------+-----------+
|  8|            Calculus|    MATH101|
|  2|      Linear Algebra|    MATH201|
| 14|          Statistics|    MATH301|
| 20|Discrete Mathematics|    MATH401|
| 26|Differential Equa...|    MATH501|
| 32|       Number Theory|    MATH601|
| 38|            Topology|    MATH701|
| 44|    Abstract Algebra|    MATH801|
| 50|       Real Analysis|    MATH901|
| 56|       Number Theory|   MATH1001|
+---+--------------------+-----------+

Query Time: 0.11560344696044922 seconds


In [17]:
start_time = time.time()

students_per_department = students_df.groupBy("department.department_name") \
    .agg(F.count("*").alias("total_students"))

students_per_department.show()

end_time = time.time()
print(f"Query Time: {end_time - start_time} seconds")


+----------------+--------------+
| department_name|total_students|
+----------------+--------------+
|             Art|            51|
|       Chemistry|            53|
|         English|            49|
|         History|            42|
|           Music|            49|
|     Mathematics|            48|
|         Physics|            47|
|Computer Science|            65|
|       Economics|            39|
|         Biology|            39|
+----------------+--------------+

Query Time: 0.3096766471862793 seconds


In [30]:
from pyspark.sql import functions as F
import time

start_time = time.time()

btech_cse_core_courses = courses_df.filter((F.col("department.department_name") == "Computer Science") & (F.col("is_core") == True))

instructor_taught_all = btech_cse_core_courses.groupBy("instructor.instructor_id", "instructor.instructor_name")\
    .agg(F.collect_set("_id").alias("courses_taught")) \
    .filter(F.size("courses_taught") == btech_cse_core_courses.count())

instructor_taught_all_without_courses = instructor_taught_all.drop("courses_taught")

instructor_taught_all_without_courses.show()

end_time = time.time()
print(f"Query Time: {end_time - start_time} seconds")


+-------------+---------------+
|instructor_id|instructor_name|
+-------------+---------------+
|            1|Cariotta Worboy|
+-------------+---------------+

Query Time: 0.2633509635925293 seconds


In [19]:
start_time = time.time()

top_10_courses = students_df.withColumn("enrollment", F.explode("enrollments")) \
    .groupBy("enrollment.course_id", "enrollment.course_name") \
    .agg(F.count("enrollment.course_id").alias("total_enrollments")) \
    .orderBy(F.desc("total_enrollments")) \
    .limit(10)

top_10_courses.show()

end_time = time.time()
print(f"Query Time: {end_time - start_time} seconds")


+---------+--------------------+-----------------+
|course_id|         course_name|total_enrollments|
+---------+--------------------+-----------------+
|        3| Classical Mechanics|               17|
|        8|            Calculus|                9|
|        5|            Genetics|                8|
|        1|Introduction to C...|                8|
|        9|   Quantum Mechanics|                6|
|       40|Environmental Che...|                6|
|        6|     Data Structures|                5|
|        7|Artificial Intell...|                4|
|        2|      Linear Algebra|                4|
|        4|   Organic Chemistry|                3|
+---------+--------------------+-----------------+

Query Time: 0.35017919540405273 seconds
