<a href="https://colab.research.google.com/github/maverick98/CDS/blob/main/M5_Programming.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


== Programming Question ==

You are given a CSV file of the form "studentID,courseID,marks" for students in a university. Provide the Spark transformations and actions using RDDs to

    (i) calculate the average marks per course,
    (ii) average marks per student and
    (iii) the number of students in the university.



In [1]:
!pip -qq install pyspark

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m4.0 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m17.1 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [2]:


import numpy as np
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import count, avg
from pyspark.sql.functions import *



In [3]:


# Create a dummy students csv file

df = pd.DataFrame({"studentID": np.random.randint(low = 101, high = 130, size = 50, ),
                   "courseID": np.random.randint(low = 1001, high = 1009, size = 50),
                   "marks": np.random.randint(low = 60, high = 95, size = 50)})
df['studentID'] = df['studentID'].apply(lambda x: "S"+str(x))
df['courseID'] = df['courseID'].apply(lambda x: "C"+str(x))

# Drop combined duplicate entries for 'studentID', 'courseID'
df = df.iloc[df[['studentID', 'courseID']].drop_duplicates().index]

df.head()



Unnamed: 0,studentID,courseID,marks
0,S123,C1001,84
1,S118,C1006,75
2,S121,C1002,93
3,S128,C1006,78
4,S107,C1004,82


In [4]:


df.shape



(44, 3)

In [5]:


# Save to csv file
df.to_csv("students_dataset.csv", index=False)



**Using RDDs**

In [6]:


# Create a spark session
spark = (SparkSession.builder.appName("StudentsData").getOrCreate())
spark



In [7]:


# Accessing sparkContext from sparkSession instance
sc = spark.sparkContext



# Read data and create rdd
dataset = sc.textFile("students_dataset.csv")
print(type(dataset))



# Filter out header row
header = dataset.first()      # extract header
rdd_data = dataset.filter(lambda x: x != header)      
print(type(rdd_data))



<class 'pyspark.rdd.RDD'>
<class 'pyspark.rdd.PipelinedRDD'>


In [8]:
rdd_data.count()

44

In [9]:


# we are creating a new RDD called “rows” by splitting every row in the rdd_data  
rows = rdd_data.map(lambda line: line.split(","))
type(rows)



pyspark.rdd.PipelinedRDD

In [10]:


rows.count()



44

In [None]:


# Print Data values
for row in rows.take(rows.count()):
  print(row)



In [26]:
course_marks= rows.map(lambda x:(x[1],x[2]))


# Calculate total marks per course
course_marks_sum = course_marks.reduceByKey(lambda x,y: float(x) + float(y))
course_marks_sum.collect()



[('C1006', 447.0),
 ('C1002', 471.0),
 ('C1001', 410.0),
 ('C1004', 555.0),
 ('C1005', 479.0),
 ('C1003', 583.0),
 ('C1008', 301.0),
 ('C1007', 145.0)]

In [27]:


# Store the count of marks associated with each courseID
course_count = course_marks.countByKey()
course_count



defaultdict(int,
            {'C1001': 5,
             'C1006': 6,
             'C1002': 6,
             'C1004': 7,
             'C1005': 6,
             'C1003': 8,
             'C1008': 4,
             'C1007': 2})

In [28]:
# Calculate average marks per course
avg_mrks_per_course_rdd = course_marks_sum.map(lambda x: (x[0], float(x[1])/course_count[x[0]]))
sorted(avg_mrks_per_course_rdd.collect())

[('C1001', 82.0),
 ('C1002', 78.5),
 ('C1003', 72.875),
 ('C1004', 79.28571428571429),
 ('C1005', 79.83333333333333),
 ('C1006', 74.5),
 ('C1007', 72.5),
 ('C1008', 75.25)]

In [29]:


# Extract studentID and marks from rdd
stdnt_marks = rows.map(lambda x: (x[0], x[2]))
type(stdnt_marks)



pyspark.rdd.PipelinedRDD

In [30]:


# Store the count of marks associated with each studentID
stdnt_count = stdnt_marks.countByKey()
stdnt_count



defaultdict(int,
            {'S123': 2,
             'S118': 1,
             'S121': 4,
             'S128': 1,
             'S107': 1,
             'S115': 2,
             'S116': 1,
             'S103': 2,
             'S125': 2,
             'S101': 1,
             'S108': 1,
             'S113': 2,
             'S112': 3,
             'S114': 2,
             'S111': 3,
             'S106': 1,
             'S119': 2,
             'S117': 1,
             'S102': 2,
             'S126': 1,
             'S120': 2,
             'S129': 2,
             'S109': 2,
             'S105': 1,
             'S110': 1,
             'S124': 1})

In [31]:
# Calculate total marks per student
stdnt_marks_sum = stdnt_marks.reduceByKey(lambda x,y: float(x) + float(y))
stdnt_marks_sum.collect()

[('S123', 145.0),
 ('S128', '78'),
 ('S125', 135.0),
 ('S101', '80'),
 ('S108', '73'),
 ('S111', 239.0),
 ('S106', '91'),
 ('S119', 151.0),
 ('S102', 159.0),
 ('S129', 146.0),
 ('S105', '73'),
 ('S110', '87'),
 ('S124', '71'),
 ('S118', '75'),
 ('S121', 334.0),
 ('S107', '82'),
 ('S115', 164.0),
 ('S116', '76'),
 ('S103', 136.0),
 ('S113', 138.0),
 ('S112', 216.0),
 ('S114', 167.0),
 ('S117', '92'),
 ('S126', '83'),
 ('S120', 140.0),
 ('S109', 160.0)]

In [32]:


# Calculate average marks per student
avg_mrks_per_stdnt_rdd = stdnt_marks_sum.map(lambda x: (x[0], float(x[1])/stdnt_count[x[0]]))
sorted(avg_mrks_per_stdnt_rdd.collect())



[('S101', 80.0),
 ('S102', 79.5),
 ('S103', 68.0),
 ('S105', 73.0),
 ('S106', 91.0),
 ('S107', 82.0),
 ('S108', 73.0),
 ('S109', 80.0),
 ('S110', 87.0),
 ('S111', 79.66666666666667),
 ('S112', 72.0),
 ('S113', 69.0),
 ('S114', 83.5),
 ('S115', 82.0),
 ('S116', 76.0),
 ('S117', 92.0),
 ('S118', 75.0),
 ('S119', 75.5),
 ('S120', 70.0),
 ('S121', 83.5),
 ('S123', 72.5),
 ('S124', 71.0),
 ('S125', 67.5),
 ('S126', 83.0),
 ('S128', 78.0),
 ('S129', 73.0)]

In [33]:


# Store the count of marks associated with each studentID
stdnt_count = stdnt_marks.countByKey()
n_students = len(stdnt_count.keys())

print("Total number of students in the university = ", n_students)



Total number of students in the university =  26


#(i) calculate the average marks per course

##Using SQL

#(i) calculate the average marks per course

In [13]:
df = spark.read.csv("/content/students_dataset.csv", sep = ",", header=True, inferSchema = True)   

In [15]:
df.toPandas().head()

Unnamed: 0,studentID,courseID,marks
0,S123,C1001,84
1,S118,C1006,75
2,S121,C1002,93
3,S128,C1006,78
4,S107,C1004,82


In [19]:
df.groupby('courseId').avg().show()

+--------+-----------------+
|courseId|       avg(marks)|
+--------+-----------------+
|   C1001|             82.0|
|   C1008|            75.25|
|   C1004|79.28571428571429|
|   C1007|             72.5|
|   C1005|79.83333333333333|
|   C1003|           72.875|
|   C1002|             78.5|
|   C1006|             74.5|
+--------+-----------------+



In [17]:


# (i) calculate the average marks per course
avg_mrks_per_course = (df.select("courseID", "Marks").groupBy("courseID").agg(avg("Marks").alias("Avg_Marks")).orderBy("Avg_Marks", ascending=False))
avg_mrks_per_course.show(truncate=False)



+--------+-----------------+
|courseID|Avg_Marks        |
+--------+-----------------+
|C1001   |82.0             |
|C1005   |79.83333333333333|
|C1004   |79.28571428571429|
|C1002   |78.5             |
|C1008   |75.25            |
|C1006   |74.5             |
|C1003   |72.875           |
|C1007   |72.5             |
+--------+-----------------+



# (ii) average marks per student

In [20]:
df.toPandas().head()

Unnamed: 0,studentID,courseID,marks
0,S123,C1001,84
1,S118,C1006,75
2,S121,C1002,93
3,S128,C1006,78
4,S107,C1004,82


In [23]:
avg_mrks_per_student = (df.select("studentID", "marks").groupBy("studentID").agg(avg("Marks").alias("Avg_Marks")).orderBy("Avg_Marks", ascending=False))
avg_mrks_per_student.show(truncate=False)

+---------+-----------------+
|studentID|Avg_Marks        |
+---------+-----------------+
|S117     |92.0             |
|S106     |91.0             |
|S110     |87.0             |
|S121     |83.5             |
|S114     |83.5             |
|S126     |83.0             |
|S115     |82.0             |
|S107     |82.0             |
|S101     |80.0             |
|S109     |80.0             |
|S111     |79.66666666666667|
|S102     |79.5             |
|S128     |78.0             |
|S116     |76.0             |
|S119     |75.5             |
|S118     |75.0             |
|S108     |73.0             |
|S105     |73.0             |
|S129     |73.0             |
|S123     |72.5             |
+---------+-----------------+
only showing top 20 rows



In [24]:
df.groupby('studentID').avg().show()

+---------+----------+
|studentID|avg(marks)|
+---------+----------+
|     S105|      73.0|
|     S106|      91.0|
|     S102|      79.5|
|     S120|      70.0|
|     S119|      75.5|
|     S115|      82.0|
|     S107|      82.0|
|     S110|      87.0|
|     S118|      75.0|
|     S101|      80.0|
|     S129|      73.0|
|     S126|      83.0|
|     S125|      67.5|
|     S108|      73.0|
|     S109|      80.0|
|     S112|      72.0|
|     S103|      68.0|
|     S113|      69.0|
|     S123|      72.5|
|     S117|      92.0|
+---------+----------+
only showing top 20 rows





#  (iii) the number of students in the university

In [25]:
n_stdnts = df.select("studentID").distinct().count()
print("Total number of students in the university = ", n_stdnts)

Total number of students in the university =  26


# Using temp table..Easiest one

In [35]:
df.registerTempTable("student")
df2 = spark.sql("select * from student")
df2.show(5)

+---------+--------+-----+
|studentID|courseID|marks|
+---------+--------+-----+
|     S123|   C1001|   84|
|     S118|   C1006|   75|
|     S121|   C1002|   93|
|     S128|   C1006|   78|
|     S107|   C1004|   82|
+---------+--------+-----+
only showing top 5 rows



#(i) calculate the average marks per course, 

In [38]:
df3=spark.sql("select courseID,avg(marks) from student group by courseID")
df3.show()

+--------+-----------------+
|courseID|       avg(marks)|
+--------+-----------------+
|   C1001|             82.0|
|   C1008|            75.25|
|   C1004|79.28571428571429|
|   C1007|             72.5|
|   C1005|79.83333333333333|
|   C1003|           72.875|
|   C1002|             78.5|
|   C1006|             74.5|
+--------+-----------------+



#(ii) average marks per student and 

In [39]:
df3=spark.sql("select studentID,avg(marks) from student group by studentID")
df3.show()

+---------+----------+
|studentID|avg(marks)|
+---------+----------+
|     S105|      73.0|
|     S106|      91.0|
|     S102|      79.5|
|     S120|      70.0|
|     S119|      75.5|
|     S115|      82.0|
|     S107|      82.0|
|     S110|      87.0|
|     S118|      75.0|
|     S101|      80.0|
|     S129|      73.0|
|     S126|      83.0|
|     S125|      67.5|
|     S108|      73.0|
|     S109|      80.0|
|     S112|      72.0|
|     S103|      68.0|
|     S113|      69.0|
|     S123|      72.5|
|     S117|      92.0|
+---------+----------+
only showing top 20 rows



In [47]:
df5=spark.sql("select count(distinct(studentID)) from student")
df5.show()

+-------------------------+
|count(DISTINCT studentID)|
+-------------------------+
|                       26|
+-------------------------+

