In [1]:
pythonList = [2.3,3.4,4.3,2.4,2.3,4.0]
pythonList


In [2]:
# Using the collect() function is not recommended in production;
# rather, it should be used only in code debugging.
# distributed our data in two partitions. 
parPythonData = sc.parallelize(pythonList, 2) # number of distributed chunks of data you want:
parPythonData.collect()

In [3]:
parPythonData.first()


In [4]:
parPythonData.take(2)

In [5]:
parPythonData.getNumPartitions()

In [6]:
tempData = [59,57.2,53.6,55.4,51.8,53.6,55.4]
parTempData = sc.parallelize(tempData,2)
parTempData.collect()

In [7]:
# Converting Temperature from Fahrenheit to Celsius
def fahrenheitToCentigrade(temperature):
    centigrade = (temperature - 32)*5/9
    return centigrade

In [8]:
fahrenheitToCentigrade(59)


In [9]:
parCentigradeData = parTempData.map(fahrenheitToCentigrade)
parCentigradeData.collect()

In [10]:
# Filtering Temperatures Greater than 13o C
def tempMoreThanThirteen(temperature):
    return temperature >= 13

In [11]:
filteredTemprature = parCentigradeData.filter(tempMoreThanThirteen)
filteredTemprature.collect()

In [12]:
# Alternative
filteredTemprature = parCentigradeData.filter(lambda x : x >= 13)
filteredTemprature.collect()

In [13]:
### Performing Basic Data Manipulation

In [14]:
studentMarksData = [["si1","year1",62.08,62.4],
 ["si1","year2",75.94,76.75],
 ["si2","year1",68.26,72.95],
 ["si2","year2",85.49,75.8],
 ["si3","year1",75.08,79.84],
 ["si3","year2",54.98,87.72],
 ["si4","year1",50.03,66.85],
 ["si4","year2",71.26,69.77],
 ["si5","year1",52.74,76.27],
 ["si5","year2",50.39,68.58],
 ["si6","year1",74.86,60.8],
 ["si6","year2",58.29,62.38],
 ["si7","year1",63.95,74.51],
 ["si7","year2",66.69,56.92]]


In [15]:
studentMarksDataRDD = sc.parallelize(studentMarksData, 4)
studentMarksDataRDD.take(2)


In [16]:
# Calculating Average Semester Grades
studentMarksMean  = studentMarksDataRDD.map(lambda x : [x[0], x[1], (x[2] + x[3]) / 2])
studentMarksMean.take(2)

In [17]:
# Filtering Student Average Grades in the Second Year
secondYearMarks = studentMarksMean.filter(lambda x : "year2" in x)
secondYearMarks.take(2)

In [18]:
# Finding the Top Three Students
sortedMarksData = secondYearMarks.sortBy(keyfunc = lambda x : -x[2])
sortedMarksData.collect()


In [19]:
sortedMarksData.take(3)

In [20]:
# optimize using takeOrdered()
topThreeStudents = secondYearMarks.takeOrdered(num=3, key=lambda x : -x[2])
topThreeStudents

In [21]:
# Finding the Bottom Three Students
bottomThreeStudents = secondYearMarks.takeOrdered(num=3, key=lambda x : x[2])
bottomThreeStudents

In [22]:
# Getting All Students with 80% Averages
moreThan80Marks = secondYearMarks.filter(lambda x : x[2] > 80)
moreThan80Marks.collect()

In [23]:
# Run Set Operations
data2001 = ['RIN1', 'RIN2', 'RIN3', 'RIN4', 'RIN5', 'RIN6', 'RIN7']
data2002 = ['RIN3', 'RIN4', 'RIN7', 'RIN8', 'RIN9']
data2003 = ['RIN4', 'RIN8', 'RIN10', 'RIN11', 'RIN12']

# Parallelizing
parData2001 = sc.parallelize(data2001,2)
parData2002 = sc.parallelize(data2002,2)
parData2003 = sc.parallelize(data2003,2)

In [24]:
#  Finding Projects Initiated in Three Years
unionOf20012002 = parData2001.union(parData2002)
unionOf20012002.collect()

In [25]:
allResearchs = unionOf20012002.union(parData2003)
allResearchs.collect()

In [26]:
# Making Sets of Distinct Data
allUniqueResearchs = allResearchs.distinct()
allUniqueResearchs.collect()

In [27]:
# Counting Distinct Elements
allUniqueResearchs.distinct().count()

In [28]:
# We can run telescopic commands in PySpark too
parData2001.union(parData2002).union(parData2003).distinct().count()

In [29]:
#  Finding Projects Completed the First Year
firstYearCompletion = parData2001.subtract(parData2002)
firstYearCompletion.collect()

In [30]:
# Finding Projects Completed in the First Two Years
unionTwoYears = parData2001.union(parData2002)
unionTwoYears.subtract(parData2003).collect()


In [31]:
unionTwoYears.subtract(parData2003).distinct().collect()


In [32]:
# Finding Projects Started in 2001 and Continued Through 2003.
projectsInTwoYear = parData2001.intersection(parData2002)
projectsInTwoYear.collect()

In [33]:
projectsInTwoYear.subtract(parData2003).distinct().collect()


In [34]:
# Calculate Summary Statistics
airVelocityKMPH = [12,13,15,12,11,12,11]
parVelocityKMPH = sc.parallelize(airVelocityKMPH, 2)

In [35]:
# Getting the Number of Data Points
countValue = parVelocityKMPH.count()
countValue

In [36]:
# Summing Air Velocities in a Day
sumValue = parVelocityKMPH.sum()
sumValue


In [37]:
# Finding the Mean Air Velocity
meanValue = parVelocityKMPH.mean()
meanValue

In [38]:
# Finding the Variance of Air Data
varianceValue = parVelocityKMPH.variance()
varianceValue

In [39]:
# Calculating Sample Variance
sampleVarianceValue = parVelocityKMPH.sampleVariance()
sampleVarianceValue


In [40]:
# Calculating Standard Deviation
stdevValue = parVelocityKMPH.stdev()
stdevValue

In [41]:
#  Calculating Sample Standard Deviation
sampleStdevValue = parVelocityKMPH.sampleStdev()
sampleStdevValue


In [42]:
# Calculating All Values in One Step using: stats()
parVelocityKMPH.stats()

In [43]:
# transformed into a dictionary by using the asDict() function:
parVelocityKMPH.stats().asDict()


In [44]:
# also can get individual elements by using different functions defined on StatCounter
parVelocityKMPH.stats().mean()

In [45]:
parVelocityKMPH.stats().stdev()


In [46]:
parVelocityKMPH.stats().count()


In [47]:
parVelocityKMPH.stats().min()


In [48]:
parVelocityKMPH.stats().max()


In [49]:
pythonList = ['b' , 'd', 'm', 't', 'e', 'u']


In [50]:
RDD1 = sc.parallelize(pythonList, 2)
RDD1.collect()


In [51]:
def vowelCheckFunction(data):
    if data in ("a", "e", "i", "o", "u"):
        return 1
    else:
        return 0

In [52]:
vowelCheckFunction('a')

In [53]:
vowelCheckFunction('b')

In [54]:
RDD2 = RDD1.map( lambda data : (data, vowelCheckFunction(data)))
RDD2.collect()

In [55]:
# Fetching Keys from a Paired RDD
RDD2Keys = RDD2.keys()
RDD2Keys.collect()

In [56]:
#  Fetching Values from a Paired RDD
RDD2Values = RDD2.values()
RDD2Values.collect()

In [57]:
# Aggregate Data
filDataSingle = [['filamentA','100W',605],
['filamentB','100W',683],
['filamentB','100W',691],
['filamentB','200W',561],
['filamentA','200W',530],
['filamentA','100W',619],
['filamentB','100W',686],
['filamentB','200W',600],
['filamentB','100W',696],
['filamentA','200W',579],
['filamentA','200W',520],
['filamentA','100W',622],
['filamentA','100W',668],
['filamentB','200W',569],
['filamentB','200W',555],
['filamentA','200W',541]]

In [58]:
filDataSingleRDD = sc.parallelize(filDataSingle,2)
filDataSingleRDD.take(3)

In [59]:
#. Creating a Paired RDD
filDataPairedRDD1 = filDataSingleRDD.map(lambda x : (x[0], x[2]))
filDataPairedRDD1.take(4)

In [60]:
# Finding the Mean Lifetime Based on Filament Type
filDataPairedRDD11 = filDataPairedRDD1.map(lambda x : (x[0], [x[1], 1]))
filDataPairedRDD11.take(4)filDataSumandCount = filDataPairedRDD11.reduceByKey(lambda l1,l2 :
[l1[0] + l2[0] ,l1[1]+l2[1]])
filDataSumandCount.collect()

In [61]:
filDataSumandCount = filDataPairedRDD11.reduceByKey(lambda l1,l2 :
[l1[0] + l2[0] ,l1[1]+l2[1]])
filDataSumandCount.collect()

In [62]:
filDataPairedRDD11.count()


In [63]:
filDataPairedRDD11.getNumPartitions()
