In [1]:
from pyspark import SparkConf, SparkContext
conf = SparkConf().setAppName('reduceByKey')
sc = SparkContext.getOrCreate(conf)

In [17]:
rdd = sc.textFile('C:/Users/Sivasai.M/Music/files/StudentData.csv')
rdd.collect()[:4]

['age,gender,name,course,roll,marks,email',
 '28,Female,Hubert Oliveras,DB,02984,59,Annika Hoffman_Naoma Fritts@OOP.com',
 '29,Female,Toshiko Hillyard,Cloud,12899,62,Margene Moores_Marylee Capasso@DB.com',
 '28,Male,Celeste Lollis,PF,21267,45,Jeannetta Golden_Jenna Montague@DSA.com']

In [20]:
# Ignoring header from the CSV
header = rdd.first()
rdd2 = rdd.filter(lambda x : x != header)
rdd2.collect()[:4]

['28,Female,Hubert Oliveras,DB,02984,59,Annika Hoffman_Naoma Fritts@OOP.com',
 '29,Female,Toshiko Hillyard,Cloud,12899,62,Margene Moores_Marylee Capasso@DB.com',
 '28,Male,Celeste Lollis,PF,21267,45,Jeannetta Golden_Jenna Montague@DSA.com',
 '29,Female,Elenore Choy,DB,32877,29,Billi Clore_Mitzi Seldon@DB.com']

# 1. Total Students

In [21]:
rdd2.count()

1000

# 2. Total Marks by Male and Female Student

In [29]:
rdd3 = rdd2.map(lambda x : (x.split(',')[1],int(x.split(',')[-2])))
rdd3.collect()[:4]

[('Female', 59), ('Female', 62), ('Male', 45), ('Female', 29)]

In [30]:
rdd3.reduceByKey(lambda x,y: x+y).collect()

[('Female', 29636), ('Male', 30461)]

# 3. Total Passed and Failed Students

In [31]:
rdd2.collect()[:2]

['28,Female,Hubert Oliveras,DB,02984,59,Annika Hoffman_Naoma Fritts@OOP.com',
 '29,Female,Toshiko Hillyard,Cloud,12899,62,Margene Moores_Marylee Capasso@DB.com']

In [38]:
rdd3 = rdd2.map(lambda x: ('PASS' if int(x.split(',')[-2]) > 50 else 'FAIL',1))

In [39]:
rdd3.reduceByKey(lambda x,y : x+y).collect()

[('FAIL', 370), ('PASS', 630)]

# 4. Total Enrollments per Course

In [42]:
rdd_enr = rdd2.map(lambda x : (x.split(',')[3],1))
rdd_enr.reduceByKey(lambda x,y : x+y).collect()

[('DB', 157),
 ('Cloud', 192),
 ('PF', 166),
 ('MVC', 157),
 ('OOP', 152),
 ('DSA', 176)]

# 5. Total Marks per Course

In [44]:
rdd_cour_marks = rdd2.map(lambda x: ((x.split(',')[3]),int(x.split(',')[-2])))
rdd_cour_marks.collect()[:4]

[('DB', 59), ('Cloud', 62), ('PF', 45), ('DB', 29)]

In [45]:
rdd_cour_marks.reduceByKey(lambda x,y: x+y).collect()

[('DB', 9270),
 ('Cloud', 11443),
 ('PF', 9933),
 ('MVC', 9585),
 ('OOP', 8916),
 ('DSA', 10950)]

# 6. Average marks per Course

In [48]:
rdd_avg = rdd2.map(lambda x: ((x.split(',')[3]),(int(x.split(',')[-2]),1)))
rdd_avg.collect()[:4]

[('DB', (59, 1)), ('Cloud', (62, 1)), ('PF', (45, 1)), ('DB', (29, 1))]

In [53]:
rdd_avg1 = rdd_avg.reduceByKey(lambda x,y : (x[0]+y[0],x[1]+y[1]))
rdd_avg1.collect()

[('DB', (9270, 157)),
 ('Cloud', (11443, 192)),
 ('PF', (9933, 166)),
 ('MVC', (9585, 157)),
 ('OOP', (8916, 152)),
 ('DSA', (10950, 176))]

In [57]:
# method:1
rdd_avg1.map(lambda x : (x[0],(x[1][0]/x[1][1]))).collect()

[('DB', 59.044585987261144),
 ('Cloud', 59.598958333333336),
 ('PF', 59.83734939759036),
 ('MVC', 61.05095541401274),
 ('OOP', 58.6578947368421),
 ('DSA', 62.21590909090909)]

In [58]:
# method:2
rdd_avg1.mapValues(lambda x: x[0]/x[1]).collect()

[('DB', 59.044585987261144),
 ('Cloud', 59.598958333333336),
 ('PF', 59.83734939759036),
 ('MVC', 61.05095541401274),
 ('OOP', 58.6578947368421),
 ('DSA', 62.21590909090909)]

# 7. Finding Minimum and Maximum marks per Course

In [59]:
rdd_min_max = rdd2.map(lambda x: ((x.split(',')[3]),int(x.split(',')[-2])))
rdd_min_max.collect()[:4]

[('DB', 59), ('Cloud', 62), ('PF', 45), ('DB', 29)]

In [60]:
rdd_min_max.reduceByKey(lambda x,y: x if x<y else y).collect()

[('DB', 20), ('Cloud', 20), ('PF', 20), ('MVC', 22), ('OOP', 20), ('DSA', 20)]

In [62]:
rdd_min_max.reduceByKey(lambda x,y: x if x>y else y).collect()

[('DB', 98), ('Cloud', 99), ('PF', 99), ('MVC', 99), ('OOP', 99), ('DSA', 99)]

# 8. Average Age of Male and Female Students

In [66]:
rdd_avg_age = rdd2.map(lambda x : (x.split(',')[1],(int(x.split(',')[0]),1)))
rdd_avg_age.collect()[:4]

[('Female', (28, 1)),
 ('Female', (29, 1)),
 ('Male', (28, 1)),
 ('Female', (29, 1))]

In [68]:
rdd_avg_age1 = rdd_avg_age.reduceByKey(lambda x,y : (x[0]+y[0],x[1]+y[1]))
rdd_avg_age1.collect()

[('Female', (14273, 501)), ('Male', (14233, 499))]

In [70]:
rdd_avg_age1.map(lambda x: (x[0],x[1][0]/x[1][1])).collect()

[('Female', 28.489021956087825), ('Male', 28.52304609218437)]