Import the PySpark Libraries and create an instance of the SparkContext.

In [1]:
from pyspark import SparkContext, SparkConf
scf = SparkConf().setAppName("1_1").setMaster("local[*]")
sct = SparkContext(conf=scf)

In [2]:
pl = [2.2,3.4,4.3,2.4,2.3,4.0]  # List Data
parpl = sct.parallelize(pl,2)   # Parallelize the List Data
parpl.collect()                 # Collect is only for Debugging

[2.2, 3.4, 4.3, 2.4, 2.3, 4.0]

In [3]:
parpl.first()

2.2

In [7]:
parpl.take(2)

[2.2, 3.4]

In [8]:
parpl.getNumPartitions()

2

Convert temperature from Fahrenheit to Celsius

C = (F – 32) × 5/9

• Convert temperature from Fahrenheit to Celsius

• Get all the temperature data points greater than 13o C

In [10]:
tempData = [59,57.2,53.6,55.4,51.8,53.6,55.4]
parTempData = sct.parallelize(tempData,2)
parTempData.collect()

[59, 57.2, 53.6, 55.4, 51.8, 53.6, 55.4]

In [11]:
def fahrenheitToCentigrade(temperature) :
    centigrade = (temperature-32)*5/9
    return centigrade

In [13]:
parCentigradeData = parTempData.map(fahrenheitToCentigrade)
parCentigradeData.collect()

[15.0, 14.000000000000002, 12.0, 13.0, 10.999999999999998, 12.0, 13.0]

Get all the temperature data points greater than 13o C

We are going to send our tempMoreThanThirteen function as input to the filter()
function. The filter() function will iterate over each value in the parCentigradeData
RDD. For each value, the tempMoreThanThirteen function will be applied. If the
value is greater than or equal to 13, True will be returned. The value for which
tempMoreThanThirteen returns True will come to filteredTemprature:

In [14]:
def tempMoreThanThirteen(temperature):
    return temperature >=13

In [15]:
filteredTemprature = parCentigradeData.filter(tempMoreThanThirteen)
filteredTemprature.collect()

[15.0, 14.000000000000002, 13.0, 13.0]

Replace our predicates by using the lambda function.

In [16]:
filteredTemprature = parCentigradeData.filter(lambda x : x>=13)
filteredTemprature.collect()

[15.0, 14.000000000000002, 13.0, 13.0]

Data Manipulation

You want to calculate the following:

• Average grades per semester, each year, for each student

• Top three students who have the highest average grades in the
second year

• Bottom three students who have the lowest average grades in the
second year

• All students who have earned more than an 80% average in the
second semester of the second year
Using the map() function is often helpful. In this example, the average grades per
semester, for each year, can be calculated using map().

It is a general data science problem to get the top k elements, such as the top k highly
performing bonds. The PySpark takeOrdered() function is going to take the top k or top
bottom elements from our RDD.
Students who have earned more than 80% averages in the second year can be filtered
using the filter() function.

In [18]:
#   Making a List from a Given Table
studentMarksData = [
    ["si1","year1",62.08,62.4], ["si1","year2",75.94,76.75], ["si2","year1",68.26,72.95],
    ["si2","year2",85.49,75.8], ["si3","year1",75.08,79.84], ["si3","year2",54.98,87.72],
    ["si4","year1",50.03,66.85], ["si4","year2",71.26,69.77], ["si5","year1",52.74,76.27],
    ["si5","year2",50.39,68.58], ["si6","year1",74.86,60.8], ["si6","year2",58.29,62.38], 
    ["si7","year1",63.95,74.51], ["si7","year2",66.69,56.92]
                    ]

In [20]:
studentMarksDataRDD = sct.parallelize(studentMarksData,4)

In [21]:
#   fetch 2 first elements of an RDD
studentMarksDataRDD.take(2)

[['si1', 'year1', 62.08, 62.4], ['si1', 'year2', 75.94, 76.75]]

In [26]:
#   Calculating Average Semester Grades
studentMarksMean = studentMarksDataRDD.map(lambda x : [x[0], x[1], (x[2]+x[3]), (x[2]+x[3])/2] )
studentMarksMean.take(2)

[['si1', 'year1', 124.47999999999999, 62.239999999999995],
 ['si1', 'year2', 152.69, 76.345]]

In [29]:
#   Filtering Student Average Grades in the Second Year
secondYearMarks = studentMarksMean.filter(lambda x : x[1] == 'year2')
secondYearMarks.take(2)

[['si1', 'year2', 152.69, 76.345], ['si2', 'year2', 161.29, 80.645]]

In [31]:
#   Finding the Top Three Students
SortedSecondYearMarks = secondYearMarks.sortBy(keyfunc= lambda x : -x[3])
SortedSecondYearMarks.take(3)

[['si2', 'year2', 161.29, 80.645],
 ['si1', 'year2', 152.69, 76.345],
 ['si3', 'year2', 142.7, 71.35]]

In [32]:
#   The better way to find Top 3 Students
topThreeStudents = secondYearMarks.takeOrdered(num=3, key = lambda x :-x[3])
topThreeStudents

[['si2', 'year2', 161.29, 80.645],
 ['si1', 'year2', 152.69, 76.345],
 ['si3', 'year2', 142.7, 71.35]]

In [36]:
#   Finding the Bottom 3 Students
bottomThreeStudents = secondYearMarks.takeOrdered(num=3, key = lambda x :x[3])
bottomThreeStudents

[['si5', 'year2', 118.97, 59.485],
 ['si6', 'year2', 120.67, 60.335],
 ['si7', 'year2', 123.61, 61.805]]

In [39]:
#   Getting All Students with 80% Averages
Students80 = secondYearMarks.filter(lambda x : x[3] >=80)
Students80.take(2)

[['si2', 'year2', 161.29, 80.645]]

Set Operations

How many research projects were initiated in the three years?

• How many projects were completed in the first year?

• How many projects were completed in the first two years?


PySpark performs pseudo set operations.
They are called pseudo set operations because some functions do not remove duplicate
elements.

In [40]:
#   Creating a List of Research Data by Year
data2001 = ['RIN1', 'RIN2', 'RIN3', 'RIN4', 'RIN5', 'RIN6', 'RIN7']
data2002 = ['RIN3', 'RIN4', 'RIN7', 'RIN8', 'RIN9']
data2003 = ['RIN4', 'RIN8', 'RIN10', 'RIN11', 'RIN12']

In [45]:
#   Parallelize the Data
parData2001 = sct.parallelize(data2001,2)
parData2002 = sct.parallelize(data2002,2)
parData2003 = sct.parallelize(data2003,2)

In [46]:
#   Finding Projects Initiated in Three Years
Uni_1_2 = parData2001.union(parData2002)
Uni_123 = Uni_1_2.union(parData2003)
Uni_123.collect()

['RIN1',
 'RIN2',
 'RIN3',
 'RIN4',
 'RIN5',
 'RIN6',
 'RIN7',
 'RIN3',
 'RIN4',
 'RIN7',
 'RIN8',
 'RIN9',
 'RIN4',
 'RIN8',
 'RIN10',
 'RIN11',
 'RIN12']

Making Sets of Distinct Data

In [47]:
#   get rid of Duplicates
allUnique = Uni_123.distinct()
allUnique.collect()

['RIN1',
 'RIN10',
 'RIN12',
 'RIN2',
 'RIN3',
 'RIN5',
 'RIN8',
 'RIN4',
 'RIN9',
 'RIN11',
 'RIN6',
 'RIN7']

In [49]:
#   Counting Distinct Elements
Uni_123.distinct().count()

12

The Easy Way for me

In [50]:
parData2001.union(parData2002).union(parData2003).distinct().count()

12

In [51]:
#   Projects Completed in the First Year
firstYearCompletion = parData2001.subtract(parData2002)
firstYearCompletion.collect()

['RIN1', 'RIN2', 'RIN5', 'RIN6']

In [None]:
#   Projects Completed in the First Year
firstYearCompletion = parData2001.subtract(parData2002)
firstYearCompletion.collect()

In [54]:
#   Projects Completed in the First/Second Year
Completion_12 = (parData2001.union(parData2002)).subtract(parData2003)
Completion_12.distinct().collect()

['RIN1', 'RIN2', 'RIN3', 'RIN5', 'RIN9', 'RIN6', 'RIN7']

In [57]:
#   Finding Projects Started in 2001 and Continued Through 2003
one = parData2001.intersection(parData2002)
one.subtract(parData2003).collect()

['RIN3', 'RIN7']