# Spark RDD -  Resilient Distributed Dataset

1. RDD is the basic building block or the fundamental data structure for Spark.
2. RDD is the immutable distributed collection of objects.
3. Spark distributes data in RDD to different nodes across the cluster to achieve parallelization.


# Transformations and Actions

1. Transformation create a new RDD from an existing one.
2. Actions retunr a value to the driver program after running a computation on RDD
3. All transformations in Spark are lazy. 
4. Spark only triggers the dataflow when there's a action.

In [5]:
from pyspark import SparkConf, SparkContext

## SparkConf

1. SparkConf - Spark Configuration allows to specify the configuration of your Spark application.
2. Used to set various Spark parameters as key-value pairs.
3. examples: read data from RDS instance or extended databases or filesystems. To access those systems we provide configuration insde the SparkConf.

## SparkContext

1. SparkContext is the entry point to any spark functionality. 
2. When we run any Spark application, a driver program starts, which has the main function and your SparkContext gets initiated here.
3. A SparkContext represents the connection to a Spark cluster, and can be used to create RDDs, accumulators and broadcast variables on that cluster. Only one SparkContext should be active per JVM.

In [6]:
conf  = SparkConf().setAppName("RDD Basics")

## SparkConf().setAppName

1. sets the application name by which the Spark and Cluster will communicate.

In [7]:
sc = SparkContext.getOrCreate(conf=conf)

## SparkContext.getOrCreate()

1. get or create a Spark Context.
2. In Databricks community addition, there is a limitation to create only one SparkContext.
3. In local machines, we can create the SparkContext.
4. Good Practice to refer the previous context if it's available otherwise, create one.
5. If you keep on creating SparkContext, it will overload your memory.

### Read data from text file

1. Specify the databricks DBFS Filestore path.
2. Spark will not read data until there is any action.
3. All transformations will execute only when there is an action.

In [13]:
rdd = sc.textFile('data/sample.txt')
rdd

data/sample.txt MapPartitionsRDD[13] at textFile at NativeMethodAccessorImpl.java:0

In [15]:
rdd.collect()

['1 2 3', '4 5 6', '7 8', '9 10 11', '12 13 14 15', '16 17 18 19']

### rdd.collect()

1. collect is action and hence read the file and display the data.

# MAP

### Map: Split

In [16]:
rdd_map = rdd.map(lambda x:x.split(' '))
rdd_map.collect()

[['1', '2', '3'],
 ['4', '5', '6'],
 ['7', '8'],
 ['9', '10', '11'],
 ['12', '13', '14', '15'],
 ['16', '17', '18', '19']]

### Map: Append

In [27]:
rdd_append = rdd.map(lambda x: x+" Pyspark")
rdd_append.collect()

['1 2 3 Pyspark',
 '4 5 6 Pyspark',
 '7 8 Pyspark',
 '9 10 11 Pyspark',
 '12 13 14 15 Pyspark',
 '16 17 18 19 Pyspark']

### Map: Function

In [None]:
def split(x):
    return x.split(' ')

rdd_split = rdd.map(split)
rdd_split.collect()

###  Add 2 for the numbers in the list

In [32]:
def add(x):
    l = x.split(' ')
    l2 = []
    for s in l:
        l2.append(int(s)+2)
    return l2

rdd_add = rdd.map(add)
rdd_add.collect()

[[3, 4, 5],
 [6, 7, 8],
 [9, 10],
 [11, 12, 13],
 [14, 15, 16, 17],
 [18, 19, 20, 21]]

In [34]:
rdd_quiz = sc.textFile('/Users/manideep/Learning/PySpark/sample1.txt')
rdd_quiz.collect()

['Hi How are you?', 'Hope you are doing', 'Great']

In [47]:
#rdd_word = rdd_quiz.map(lambda x: x.split(' '))
#rdd_word.collect()

def wordcount(x):
    l = x.split(' ')
    l2 = []
    for s in l:
        l2.append(len(s))
    return l2

rdd_count = rdd_quiz.map(wordcount)
rdd_count.collect()

[[2, 3, 3, 4], [4, 3, 3, 5], [5]]

In [49]:
rdd2 = rdd_quiz.map(lambda x: [len(s) for s in x.split(' ')])
rdd2.collect()

[[2, 3, 3, 4], [4, 3, 3, 5], [5]]

# FLATMAP

In [50]:
rdd_flatmap = rdd.flatMap(lambda x: x.split(' '))
rdd_flatmap.collect()

['1',
 '2',
 '3',
 '4',
 '5',
 '6',
 '7',
 '8',
 '9',
 '10',
 '11',
 '12',
 '13',
 '14',
 '15',
 '16',
 '17',
 '18',
 '19']

In [51]:
rdd_quiz.flatMap(lambda x :[len(s) for s in x.split(' ')]).collect()

[2, 3, 3, 4, 4, 3, 3, 5, 5]

# Filter

In [57]:
rdd_filter = rdd.filter(lambda x: x !='1 2 3')
rdd_filter.collect()

['4 5 6', '7 8', '9 10 11', '12 13 14 15', '16 17 18 19']

In [77]:
def foo(x):
     if x != '1 2 3':
            return x

rdd.filter(foo).collect()

['4 5 6', '7 8', '9 10 11', '12 13 14 15', '16 17 18 19']

In [78]:
rdd_quiz1 = sc.textFile('/Users/manideep/Learning/PySpark/RDD_QUIZ1.txt')
rdd_quiz1.collect()

['This mango company animal',
 'cat dog mic ant laptop',
 'chair switch mobile am charger cover',
 'amanda any alarm ant']

In [96]:
rdd_quiz1.flatMap(lambda x: x.split(' ')).filter(lambda x: not (x.startswith('c') or  x.startswith('a'))).collect()

['This', 'mango', 'dog', 'mic', 'laptop', 'switch', 'mobile']

In [102]:
def fitlerAandC(x):
    if x.startswith('a') or x.startswith('c'):
        return False
    else:
        return True

rdd_quiz1.flatMap(lambda x: x.split(' ')).filter(fitlerAandC).collect()

['This', 'mango', 'dog', 'mic', 'laptop', 'switch', 'mobile']

# GroupBy

For groupByKey to work properly, the data must be in the format of tuple or Key value pairs. ("Apple", 1),("Ball",2).

### rdd.groupByKey()

In [105]:
rdd_quiz1.map(lambda x: (x, len(x.split(' ')))).collect()

[('This mango company animal', 4),
 ('cat dog mic ant laptop', 5),
 ('chair switch mobile am charger cover', 6),
 ('amanda any alarm ant', 4)]

In [125]:
rdd_quiz1.flatMap(lambda x: x.split(' ')).map(lambda x: (x,len(x))).groupByKey().mapValues(list).collect()

[('mango', [5]),
 ('cat', [3]),
 ('ant', [3, 3]),
 ('laptop', [6]),
 ('chair', [5]),
 ('switch', [6]),
 ('mobile', [6]),
 ('am', [2]),
 ('This', [4]),
 ('company', [7]),
 ('animal', [6]),
 ('dog', [3]),
 ('mic', [3]),
 ('charger', [7]),
 ('cover', [5]),
 ('amanda', [6]),
 ('any', [3]),
 ('alarm', [5])]

# ReduceBy

reduceByKey is used to combined data based on Keys in RDD

### rdd.reduceByKey()

In [129]:
rdd_quiz1.flatMap(lambda x:x.split(' ')).map(lambda x: (x,len(x))).reduceByKey(lambda x,y:x+y).collect()

[('mango', 5),
 ('cat', 3),
 ('ant', 6),
 ('laptop', 6),
 ('chair', 5),
 ('switch', 6),
 ('mobile', 6),
 ('am', 2),
 ('This', 4),
 ('company', 7),
 ('animal', 6),
 ('dog', 3),
 ('mic', 3),
 ('charger', 7),
 ('cover', 5),
 ('amanda', 6),
 ('any', 3),
 ('alarm', 5)]

In [132]:
rdd_words = sc.textFile('/Users/manideep/Learning/PySpark/WordData.txt')
rdd_words.map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y).collect()

[('Apple', 10),
 ('Chair', 15),
 ('Bag', 5),
 ('Mobile', 5),
 ('Book', 5),
 ('Mic', 10),
 ('Laptop', 5)]

In [156]:
rdd_words1 = sc.textFile('/Users/manideep/Learning/PySpark/WordData1.txt')
rdd_count = rdd_words1.flatMap(lambda x: x.split(' ')).map(lambda x:(x,1)).reduceByKey(lambda x,y:x+y)
rdd_count.collect()

[('mango', 4),
 ('Cat', 1),
 ('ant', 2),
 ('laptop', 3),
 ('Chair', 1),
 ('switch', 1),
 ('am', 1),
 ('This', 2),
 ('company', 1),
 ('animal', 1),
 ('charged', 1),
 ('cover', 1),
 ('Animalany', 1)]

# Count

In [160]:
rdd_words.count()

55

# CountByValue()

In [161]:
rdd_words.countByValue()

defaultdict(int,
            {'Apple': 10,
             'Mic': 10,
             'Laptop': 5,
             'Chair': 15,
             'Bag': 5,
             'Mobile': 5,
             'Book': 5})

In [163]:
rdd_quiz1.getNumPartitions()

2

# Repartition

used to change the number of partitions in RDD

In [168]:
rdd_repartition = rdd_quiz1.repartition(3)
rdd_repartition.getNumPartitions()

3

# Coalesce()

used to decrease the number of partitions in RDD.
It will create a new RDD because it is also a transformation.
cannot increase partitions using coalesce()

In [169]:
rdd_coalesce = rdd_repartition.coalesce(1)
rdd_coalesce.getNumPartitions()

1

In [170]:
movie_ratings = sc.textFile('/Users/manideep/Learning/PySpark/movie_ratings.csv')
movie_ratings.collect()

['The Shawshank Redemption,3',
 'The Matrix,5',
 '12 Angry Men,3',
 '12 Angry Men,4',
 'The Matrix,5',
 'Pulp Fiction,4',
 'The Godfather,5',
 'The Shawshank Redemption,2',
 'Pulp Fiction,4',
 'The Godfather,5',
 '12 Angry Men,2',
 'The Godfather,3',
 'Pulp Fiction,4',
 '12 Angry Men,1',
 'The Shawshank Redemption,2',
 '12 Angry Men,1',
 'The Shawshank Redemption,5',
 'Pulp Fiction,5',
 'Pulp Fiction,2',
 'The Matrix,4']

In [180]:
tt = movie_ratings.map(lambda x: (x.split(',')[0],(int(x.split(',')[1]),1)))
tt.collect()

[('The Shawshank Redemption', (3, 1)),
 ('The Matrix', (5, 1)),
 ('12 Angry Men', (3, 1)),
 ('12 Angry Men', (4, 1)),
 ('The Matrix', (5, 1)),
 ('Pulp Fiction', (4, 1)),
 ('The Godfather', (5, 1)),
 ('The Shawshank Redemption', (2, 1)),
 ('Pulp Fiction', (4, 1)),
 ('The Godfather', (5, 1)),
 ('12 Angry Men', (2, 1)),
 ('The Godfather', (3, 1)),
 ('Pulp Fiction', (4, 1)),
 ('12 Angry Men', (1, 1)),
 ('The Shawshank Redemption', (2, 1)),
 ('12 Angry Men', (1, 1)),
 ('The Shawshank Redemption', (5, 1)),
 ('Pulp Fiction', (5, 1)),
 ('Pulp Fiction', (2, 1)),
 ('The Matrix', (4, 1))]

In [199]:
add = tt.reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))
avg_rating = add.map(lambda x : (x[0], x[1][0]/x[1][1]))
avg_rating.collect()

[('The Shawshank Redemption', 3.0),
 ('12 Angry Men', 2.2),
 ('The Godfather', 4.333333333333333),
 ('The Matrix', 4.666666666666667),
 ('Pulp Fiction', 3.8)]

# Average Quiz

In [200]:
avg_quiz = sc.textFile('/Users/manideep/Learning/PySpark/average_quiz_sample.csv')
avg_quiz.collect()

['JAN,NY,3.0',
 'JAN,PA,1.0',
 'JAN,NJ,2.0',
 'JAN,CT,4.0',
 'FEB,PA,1.0',
 'FEB,NJ,1.0',
 'FEB,NY,2.0',
 'FEB,VT,1.0',
 'MAR,NJ,2.0',
 'MAR,NY,1.0',
 'MAR,VT,2.0',
 'MAR,PA,3.0']

In [207]:
split = avg_quiz.map(lambda x: (x.split(',')[0],(float(x.split(',')[2]),1)))
split.collect()

[('JAN', (3.0, 1)),
 ('JAN', (1.0, 1)),
 ('JAN', (2.0, 1)),
 ('JAN', (4.0, 1)),
 ('FEB', (1.0, 1)),
 ('FEB', (1.0, 1)),
 ('FEB', (2.0, 1)),
 ('FEB', (1.0, 1)),
 ('MAR', (2.0, 1)),
 ('MAR', (1.0, 1)),
 ('MAR', (2.0, 1)),
 ('MAR', (3.0, 1))]

In [214]:
add_split = split.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))

In [215]:
avg = add_split.map(lambda x: (x[0],x[1][0]/x[1][1]))
avg.collect()

[('JAN', 2.5), ('FEB', 1.25), ('MAR', 2.0)]

# Min and Max

In [222]:
movie_ratings.collect()

['The Shawshank Redemption,3',
 'The Matrix,5',
 '12 Angry Men,3',
 '12 Angry Men,4',
 'The Matrix,5',
 'Pulp Fiction,4',
 'The Godfather,5',
 'The Shawshank Redemption,2',
 'Pulp Fiction,4',
 'The Godfather,5',
 '12 Angry Men,2',
 'The Godfather,3',
 'Pulp Fiction,4',
 '12 Angry Men,1',
 'The Shawshank Redemption,2',
 '12 Angry Men,1',
 'The Shawshank Redemption,5',
 'Pulp Fiction,5',
 'Pulp Fiction,2',
 'The Matrix,4']

In [None]:
movie_split = movie_ratings.map(lambda x: (x.split(',')[0],int(x.split(',')[1])))

# MIN

In [223]:
min_rating = movie_split.reduceByKey(lambda x,y: x if x < y else y)
min_rating.collect()

[('The Shawshank Redemption', 2),
 ('12 Angry Men', 1),
 ('The Godfather', 3),
 ('The Matrix', 4),
 ('Pulp Fiction', 2)]

# MAX

In [224]:
max_rating = movie_split.reduceByKey(lambda x,y: x if x > y else y)
max_rating.collect()

[('The Shawshank Redemption', 5),
 ('12 Angry Men', 4),
 ('The Godfather', 5),
 ('The Matrix', 5),
 ('Pulp Fiction', 5)]

# MIN, MAX Quiz

In [225]:
avg_quiz.collect()

['JAN,NY,3.0',
 'JAN,PA,1.0',
 'JAN,NJ,2.0',
 'JAN,CT,4.0',
 'FEB,PA,1.0',
 'FEB,NJ,1.0',
 'FEB,NY,2.0',
 'FEB,VT,1.0',
 'MAR,NJ,2.0',
 'MAR,NY,1.0',
 'MAR,VT,2.0',
 'MAR,PA,3.0']

In [227]:
split_city = avg_quiz.map(lambda x: (x.split(',')[1],float(x.split(',')[2])))
split_city.collect()

[('NY', 3.0),
 ('PA', 1.0),
 ('NJ', 2.0),
 ('CT', 4.0),
 ('PA', 1.0),
 ('NJ', 1.0),
 ('NY', 2.0),
 ('VT', 1.0),
 ('NJ', 2.0),
 ('NY', 1.0),
 ('VT', 2.0),
 ('PA', 3.0)]

In [228]:
min_city = split_city.reduceByKey(lambda x,y: x if x < y else y)
min_city.collect()

[('NY', 1.0), ('CT', 4.0), ('PA', 1.0), ('NJ', 1.0), ('VT', 1.0)]

In [229]:
max_city = split_city.reduceByKey(lambda x,y: x if x > y else y)
max_city.collect()

[('NY', 3.0), ('CT', 4.0), ('PA', 3.0), ('NJ', 2.0), ('VT', 2.0)]

# Mini Project

In [235]:
student_data = sc.textFile('/Users/manideep/Learning/PySpark/StudentData.csv')
headers = student_data.first()
student_data = student_data.filter(lambda x: x != headers).map(lambda x:x.split(','))
student_data.collect()

[['28',
  'Female',
  'Hubert Oliveras',
  'DB',
  '02984',
  '59',
  'Annika Hoffman_Naoma Fritts@OOP.com'],
 ['29',
  'Female',
  'Toshiko Hillyard',
  'Cloud',
  '12899',
  '62',
  'Margene Moores_Marylee Capasso@DB.com'],
 ['28',
  'Male',
  'Celeste Lollis',
  'PF',
  '21267',
  '45',
  'Jeannetta Golden_Jenna Montague@DSA.com'],
 ['29',
  'Female',
  'Elenore Choy',
  'DB',
  '32877',
  '29',
  'Billi Clore_Mitzi Seldon@DB.com'],
 ['28',
  'Male',
  'Sheryll Towler',
  'DSA',
  '41487',
  '41',
  'Claude Panos_Judie Chipps@OOP.com'],
 ['28',
  'Male',
  'Margene Moores',
  'MVC',
  '52771',
  '32',
  'Toshiko Hillyard_Clementina Menke@MVC.com'],
 ['28',
  'Male',
  'Neda Briski',
  'OOP',
  '61973',
  '69',
  'Alberta Freund_Elenore Choy@DB.com'],
 ['28',
  'Female',
  'Claude Panos',
  'Cloud',
  '72409',
  '85',
  'Sheryll Towler_Alberta Freund@Cloud.com'],
 ['28',
  'Male',
  'Celeste Lollis',
  'MVC',
  '81492',
  '64',
  'Nicole Harwood_Claude Panos@MVC.com'],
 ['29',
  'Mal

## No. of students in the file

In [234]:
student_data.count()

1000

## Total marks achieved by Female and Male Students

In [242]:
marks = student_data.map(lambda x:(x[1],int(x[5])))
marks.collect()

[('Female', 59),
 ('Female', 62),
 ('Male', 45),
 ('Female', 29),
 ('Male', 41),
 ('Male', 32),
 ('Male', 69),
 ('Female', 85),
 ('Male', 64),
 ('Male', 51),
 ('Female', 35),
 ('Male', 53),
 ('Female', 27),
 ('Female', 55),
 ('Male', 42),
 ('Female', 27),
 ('Male', 36),
 ('Male', 22),
 ('Male', 56),
 ('Female', 62),
 ('Female', 96),
 ('Female', 23),
 ('Female', 60),
 ('Female', 58),
 ('Female', 68),
 ('Female', 83),
 ('Female', 20),
 ('Female', 99),
 ('Female', 43),
 ('Female', 82),
 ('Female', 59),
 ('Female', 79),
 ('Female', 24),
 ('Female', 62),
 ('Female', 44),
 ('Male', 63),
 ('Male', 62),
 ('Male', 45),
 ('Male', 36),
 ('Male', 47),
 ('Female', 27),
 ('Female', 42),
 ('Male', 66),
 ('Female', 25),
 ('Female', 34),
 ('Female', 29),
 ('Male', 51),
 ('Male', 35),
 ('Male', 57),
 ('Female', 60),
 ('Female', 43),
 ('Male', 76),
 ('Male', 77),
 ('Male', 53),
 ('Female', 28),
 ('Female', 43),
 ('Male', 85),
 ('Female', 41),
 ('Male', 52),
 ('Male', 83),
 ('Female', 48),
 ('Female', 82)

In [244]:
Total_marks_by_gender = marks.reduceByKey(lambda x,y:x+y) 
Total_marks_by_gender.collect()

[('Female', 29636), ('Male', 30461)]

## Total Number of Students that have passed and failed

In [245]:
marks_1 = student_data.map(lambda x:(x[2],int(x[5])))
marks_1.collect()

[('Hubert Oliveras', 59),
 ('Toshiko Hillyard', 62),
 ('Celeste Lollis', 45),
 ('Elenore Choy', 29),
 ('Sheryll Towler', 41),
 ('Margene Moores', 32),
 ('Neda Briski', 69),
 ('Claude Panos', 85),
 ('Celeste Lollis', 64),
 ('Cordie Harnois', 51),
 ('Kena Wild', 35),
 ('Ernest Rossbach', 53),
 ('Latia Vanhoose', 27),
 ('Latia Vanhoose', 55),
 ('Neda Briski', 42),
 ('Latia Vanhoose', 27),
 ('Loris Crossett', 36),
 ('Annika Hoffman', 22),
 ('Santa Kerfien', 56),
 ('Mickey Cortright', 62),
 ('Loris Crossett', 96),
 ('Niki Klimek', 23),
 ('Kena Wild', 60),
 ('Jc Andrepont', 58),
 ('Anna Santos', 68),
 ('Alberta Freund', 83),
 ('Maybell Duguay', 20),
 ('Paris Hutton', 99),
 ('Cheri Kenney', 43),
 ('Dustin Feagins', 82),
 ('Claude Panos', 59),
 ('Anna Santos', 79),
 ('Cheri Kenney', 24),
 ('Loris Crossett', 62),
 ('Mickey Cortright', 44),
 ('Hubert Oliveras', 63),
 ('Sebrina Maresca', 62),
 ('Jalisa Swenson', 45),
 ('Kizzy Brenner', 36),
 ('Toshiko Hillyard', 47),
 ('Kizzy Brenner', 27),
 ('An

In [247]:
Students_passed = marks_1.filter(lambda x: x[1]>50)
Students_passed.count()

630

In [253]:
Students_failed = marks_1.filter(lambda x: x[1] <= 50)
Students_failed.count()

370

## Total Number of Students Enrolled per course

In [257]:
student_data.collect()
course = student_data.map(lambda x:(x[3],1))
course.collect()

[('DB', 1),
 ('Cloud', 1),
 ('PF', 1),
 ('DB', 1),
 ('DSA', 1),
 ('MVC', 1),
 ('OOP', 1),
 ('Cloud', 1),
 ('MVC', 1),
 ('OOP', 1),
 ('DSA', 1),
 ('DB', 1),
 ('DB', 1),
 ('MVC', 1),
 ('PF', 1),
 ('DB', 1),
 ('MVC', 1),
 ('OOP', 1),
 ('PF', 1),
 ('DB', 1),
 ('PF', 1),
 ('PF', 1),
 ('Cloud', 1),
 ('DSA', 1),
 ('Cloud', 1),
 ('OOP', 1),
 ('Cloud', 1),
 ('DSA', 1),
 ('Cloud', 1),
 ('DSA', 1),
 ('Cloud', 1),
 ('DB', 1),
 ('MVC', 1),
 ('PF', 1),
 ('DSA', 1),
 ('OOP', 1),
 ('PF', 1),
 ('PF', 1),
 ('DB', 1),
 ('DB', 1),
 ('Cloud', 1),
 ('DSA', 1),
 ('PF', 1),
 ('OOP', 1),
 ('Cloud', 1),
 ('DSA', 1),
 ('PF', 1),
 ('OOP', 1),
 ('DB', 1),
 ('DSA', 1),
 ('MVC', 1),
 ('Cloud', 1),
 ('OOP', 1),
 ('PF', 1),
 ('OOP', 1),
 ('DB', 1),
 ('DSA', 1),
 ('PF', 1),
 ('OOP', 1),
 ('DSA', 1),
 ('DB', 1),
 ('Cloud', 1),
 ('DSA', 1),
 ('Cloud', 1),
 ('PF', 1),
 ('MVC', 1),
 ('MVC', 1),
 ('MVC', 1),
 ('PF', 1),
 ('DSA', 1),
 ('PF', 1),
 ('MVC', 1),
 ('Cloud', 1),
 ('DSA', 1),
 ('MVC', 1),
 ('MVC', 1),
 ('MVC', 1),


In [258]:
enroll_per_course = course.reduceByKey(lambda x,y:x+y)
enroll_per_course.collect()

[('DB', 157),
 ('Cloud', 192),
 ('PF', 166),
 ('MVC', 157),
 ('OOP', 152),
 ('DSA', 176)]

## Total Marks per course

In [262]:
marks_2  = student_data.map(lambda x:(x[3],int(x[5])))
marks_2.collect()

[('DB', 59),
 ('Cloud', 62),
 ('PF', 45),
 ('DB', 29),
 ('DSA', 41),
 ('MVC', 32),
 ('OOP', 69),
 ('Cloud', 85),
 ('MVC', 64),
 ('OOP', 51),
 ('DSA', 35),
 ('DB', 53),
 ('DB', 27),
 ('MVC', 55),
 ('PF', 42),
 ('DB', 27),
 ('MVC', 36),
 ('OOP', 22),
 ('PF', 56),
 ('DB', 62),
 ('PF', 96),
 ('PF', 23),
 ('Cloud', 60),
 ('DSA', 58),
 ('Cloud', 68),
 ('OOP', 83),
 ('Cloud', 20),
 ('DSA', 99),
 ('Cloud', 43),
 ('DSA', 82),
 ('Cloud', 59),
 ('DB', 79),
 ('MVC', 24),
 ('PF', 62),
 ('DSA', 44),
 ('OOP', 63),
 ('PF', 62),
 ('PF', 45),
 ('DB', 36),
 ('DB', 47),
 ('Cloud', 27),
 ('DSA', 42),
 ('PF', 66),
 ('OOP', 25),
 ('Cloud', 34),
 ('DSA', 29),
 ('PF', 51),
 ('OOP', 35),
 ('DB', 57),
 ('DSA', 60),
 ('MVC', 43),
 ('Cloud', 76),
 ('OOP', 77),
 ('PF', 53),
 ('OOP', 28),
 ('DB', 43),
 ('DSA', 85),
 ('PF', 41),
 ('OOP', 52),
 ('DSA', 83),
 ('DB', 48),
 ('Cloud', 82),
 ('DSA', 45),
 ('Cloud', 75),
 ('PF', 27),
 ('MVC', 92),
 ('MVC', 89),
 ('MVC', 28),
 ('PF', 42),
 ('DSA', 26),
 ('PF', 99),
 ('MVC', 

In [263]:
total_marks_per_course = marks_2.reduceByKey(lambda x,y:x+y)
total_marks_per_course.collect()

[('DB', 9270),
 ('Cloud', 11443),
 ('PF', 9933),
 ('MVC', 9585),
 ('OOP', 8916),
 ('DSA', 10950)]

## Average Marks per course

In [265]:
marks_3  = student_data.map(lambda x:(x[3],(int(x[5]),1)))
marks_3.collect()

[('DB', (59, 1)),
 ('Cloud', (62, 1)),
 ('PF', (45, 1)),
 ('DB', (29, 1)),
 ('DSA', (41, 1)),
 ('MVC', (32, 1)),
 ('OOP', (69, 1)),
 ('Cloud', (85, 1)),
 ('MVC', (64, 1)),
 ('OOP', (51, 1)),
 ('DSA', (35, 1)),
 ('DB', (53, 1)),
 ('DB', (27, 1)),
 ('MVC', (55, 1)),
 ('PF', (42, 1)),
 ('DB', (27, 1)),
 ('MVC', (36, 1)),
 ('OOP', (22, 1)),
 ('PF', (56, 1)),
 ('DB', (62, 1)),
 ('PF', (96, 1)),
 ('PF', (23, 1)),
 ('Cloud', (60, 1)),
 ('DSA', (58, 1)),
 ('Cloud', (68, 1)),
 ('OOP', (83, 1)),
 ('Cloud', (20, 1)),
 ('DSA', (99, 1)),
 ('Cloud', (43, 1)),
 ('DSA', (82, 1)),
 ('Cloud', (59, 1)),
 ('DB', (79, 1)),
 ('MVC', (24, 1)),
 ('PF', (62, 1)),
 ('DSA', (44, 1)),
 ('OOP', (63, 1)),
 ('PF', (62, 1)),
 ('PF', (45, 1)),
 ('DB', (36, 1)),
 ('DB', (47, 1)),
 ('Cloud', (27, 1)),
 ('DSA', (42, 1)),
 ('PF', (66, 1)),
 ('OOP', (25, 1)),
 ('Cloud', (34, 1)),
 ('DSA', (29, 1)),
 ('PF', (51, 1)),
 ('OOP', (35, 1)),
 ('DB', (57, 1)),
 ('DSA', (60, 1)),
 ('MVC', (43, 1)),
 ('Cloud', (76, 1)),
 ('OOP', (77

In [272]:
marks3_add = marks_3.reduceByKey(lambda x,y: (x[0]+y[0],x[1]+y[1]))
marks3_add.collect()

[('DB', (9270, 157)),
 ('Cloud', (11443, 192)),
 ('PF', (9933, 166)),
 ('MVC', (9585, 157)),
 ('OOP', (8916, 152)),
 ('DSA', (10950, 176))]

In [275]:
avg_marks_per_course = marks3_add.map(lambda x:(x[0],x[1][0]/x[1][1]))
avg_marks_per_course.collect()

[('DB', 59.044585987261144),
 ('Cloud', 59.598958333333336),
 ('PF', 59.83734939759036),
 ('MVC', 61.05095541401274),
 ('OOP', 58.6578947368421),
 ('DSA', 62.21590909090909)]

In [276]:
marks3_add.mapValues(lambda x:x[0]/x[1]).collect()

[('DB', 59.044585987261144),
 ('Cloud', 59.598958333333336),
 ('PF', 59.83734939759036),
 ('MVC', 61.05095541401274),
 ('OOP', 58.6578947368421),
 ('DSA', 62.21590909090909)]

## Finding minimum and maximum marks

In [279]:
marks_4  = student_data.map(lambda x:(x[3],int(x[5])))
marks_4.collect()

[('DB', 59),
 ('Cloud', 62),
 ('PF', 45),
 ('DB', 29),
 ('DSA', 41),
 ('MVC', 32),
 ('OOP', 69),
 ('Cloud', 85),
 ('MVC', 64),
 ('OOP', 51),
 ('DSA', 35),
 ('DB', 53),
 ('DB', 27),
 ('MVC', 55),
 ('PF', 42),
 ('DB', 27),
 ('MVC', 36),
 ('OOP', 22),
 ('PF', 56),
 ('DB', 62),
 ('PF', 96),
 ('PF', 23),
 ('Cloud', 60),
 ('DSA', 58),
 ('Cloud', 68),
 ('OOP', 83),
 ('Cloud', 20),
 ('DSA', 99),
 ('Cloud', 43),
 ('DSA', 82),
 ('Cloud', 59),
 ('DB', 79),
 ('MVC', 24),
 ('PF', 62),
 ('DSA', 44),
 ('OOP', 63),
 ('PF', 62),
 ('PF', 45),
 ('DB', 36),
 ('DB', 47),
 ('Cloud', 27),
 ('DSA', 42),
 ('PF', 66),
 ('OOP', 25),
 ('Cloud', 34),
 ('DSA', 29),
 ('PF', 51),
 ('OOP', 35),
 ('DB', 57),
 ('DSA', 60),
 ('MVC', 43),
 ('Cloud', 76),
 ('OOP', 77),
 ('PF', 53),
 ('OOP', 28),
 ('DB', 43),
 ('DSA', 85),
 ('PF', 41),
 ('OOP', 52),
 ('DSA', 83),
 ('DB', 48),
 ('Cloud', 82),
 ('DSA', 45),
 ('Cloud', 75),
 ('PF', 27),
 ('MVC', 92),
 ('MVC', 89),
 ('MVC', 28),
 ('PF', 42),
 ('DSA', 26),
 ('PF', 99),
 ('MVC', 

In [281]:
Min_marks = marks_4.reduceByKey(lambda x,y : x if x < y else y)
Min_marks.collect()

[('DB', 20), ('Cloud', 20), ('PF', 20), ('MVC', 22), ('OOP', 20), ('DSA', 20)]

In [282]:
Max_marks = marks_4.reduceByKey(lambda x,y : x if x > y else y)
Max_marks.collect()

[('DB', 98), ('Cloud', 99), ('PF', 99), ('MVC', 99), ('OOP', 99), ('DSA', 99)]

## Average age of Male and Female Students

In [293]:
age = student_data.map(lambda x: (x[1], (int(x[0]),1)))
age.collect()

[('Female', (28, 1)),
 ('Female', (29, 1)),
 ('Male', (28, 1)),
 ('Female', (29, 1)),
 ('Male', (28, 1)),
 ('Male', (28, 1)),
 ('Male', (28, 1)),
 ('Female', (28, 1)),
 ('Male', (28, 1)),
 ('Male', (29, 1)),
 ('Female', (29, 1)),
 ('Male', (29, 1)),
 ('Female', (28, 1)),
 ('Female', (29, 1)),
 ('Male', (29, 1)),
 ('Female', (29, 1)),
 ('Male', (29, 1)),
 ('Male', (29, 1)),
 ('Male', (29, 1)),
 ('Female', (28, 1)),
 ('Female', (29, 1)),
 ('Female', (29, 1)),
 ('Female', (28, 1)),
 ('Female', (28, 1)),
 ('Female', (29, 1)),
 ('Female', (28, 1)),
 ('Female', (28, 1)),
 ('Female', (29, 1)),
 ('Female', (29, 1)),
 ('Female', (28, 1)),
 ('Female', (29, 1)),
 ('Female', (28, 1)),
 ('Female', (28, 1)),
 ('Female', (28, 1)),
 ('Female', (28, 1)),
 ('Male', (28, 1)),
 ('Male', (28, 1)),
 ('Male', (29, 1)),
 ('Male', (28, 1)),
 ('Male', (28, 1)),
 ('Female', (28, 1)),
 ('Female', (29, 1)),
 ('Male', (28, 1)),
 ('Female', (28, 1)),
 ('Female', (29, 1)),
 ('Female', (28, 1)),
 ('Male', (28, 1)),
 (

In [300]:
age_add = age.reduceByKey(lambda x,y : (x[0]+y[0],x[1]+y[1]))
age_add.collect()

[('Female', (14273, 501)), ('Male', (14233, 499))]

In [301]:
avg_age = age_add.mapValues(lambda x:x[0]/x[1])
avg_age.collect()

[('Female', 28.489021956087825), ('Male', 28.52304609218437)]