In [1]:
import findspark
findspark.init()
findspark.find()
import pyspark
findspark.find()

'H:\\Spark\\spark-3.0.0-bin-hadoop2.7'

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession

In [3]:
conf = pyspark.SparkConf().setAppName('appName').setMaster('local')
sc = pyspark.SparkContext(conf=conf)
spark = SparkSession(sc)

In [4]:
import pandas as pd
df = pd.read_csv('iris/iris_site.csv')
df.head()

Unnamed: 0,5.1,3.5,1.4,0.2,setosa
0,4.9,3.0,1.4,0.2,setosa
1,4.7,3.2,1.3,0.2,setosa
2,4.6,3.1,1.5,0.2,setosa
3,5.0,3.6,1.4,0.2,setosa
4,5.4,3.9,1.7,0.4,setosa


In [5]:
iris1 = sc.textFile('iris/iris_site.csv').map(lambda line:line.split(','))
iris1.take(5)

[['5.1', '3.5', '1.4', '0.2', 'setosa'],
 ['4.9', '3.0', '1.4', '0.2', 'setosa'],
 ['4.7', '3.2', '1.3', '0.2', 'setosa'],
 ['4.6', '3.1', '1.5', '0.2', 'setosa'],
 ['5.0', '3.6', '1.4', '0.2', 'setosa']]

## Extracting a particular column

In [6]:
iris1_mod = iris1.map(lambda col:col[0])
iris1_mod.take(5)

['5.1', '4.9', '4.7', '4.6', '5.0']

## Using for-loop with RDD object

In [7]:
for var1 in iris1.map(lambda col: float(col[0])).take(5):
    print(var1, type(var1))

5.1 <class 'float'>
4.9 <class 'float'>
4.7 <class 'float'>
4.6 <class 'float'>
5.0 <class 'float'>


## Typecasting all column values

In [8]:
iris1_mod = iris1.map(lambda var: [float(var[0]), float(var[1]), float(var[2]), float(var[3]), var[4]])
iris1_mod.take(5)

[[5.1, 3.5, 1.4, 0.2, 'setosa'],
 [4.9, 3.0, 1.4, 0.2, 'setosa'],
 [4.7, 3.2, 1.3, 0.2, 'setosa'],
 [4.6, 3.1, 1.5, 0.2, 'setosa'],
 [5.0, 3.6, 1.4, 0.2, 'setosa']]

## Creating key-value pairs in RDD

Cases where data must be in key-value pair: 
- key - name of column
- value- corresponding values for a particular column

In [9]:
iris1_mod = iris1.map(lambda var: (('Sepal Length', float(var[0])),
                       ('Sepal Width', float(var[1])),
                       ('Petal Length', float(var[2])),
                       ('Petal Width', float(var[3])),
                       ('Species', var[4])))

iris1_mod.take(5)

[(('Sepal Length', 5.1),
  ('Sepal Width', 3.5),
  ('Petal Length', 1.4),
  ('Petal Width', 0.2),
  ('Species', 'setosa')),
 (('Sepal Length', 4.9),
  ('Sepal Width', 3.0),
  ('Petal Length', 1.4),
  ('Petal Width', 0.2),
  ('Species', 'setosa')),
 (('Sepal Length', 4.7),
  ('Sepal Width', 3.2),
  ('Petal Length', 1.3),
  ('Petal Width', 0.2),
  ('Species', 'setosa')),
 (('Sepal Length', 4.6),
  ('Sepal Width', 3.1),
  ('Petal Length', 1.5),
  ('Petal Width', 0.2),
  ('Species', 'setosa')),
 (('Sepal Length', 5.0),
  ('Sepal Width', 3.6),
  ('Petal Length', 1.4),
  ('Petal Width', 0.2),
  ('Species', 'setosa'))]

__Observe:__ The key-value pair is present inside a double collection.

To remove the extra level of collection `flatMap` is used instead of `Map`.

In [10]:
iris1_mod = iris1.flatMap(lambda var: (('Sepal Length', float(var[0])),
                       ('Sepal Width', float(var[1])),
                       ('Petal Length', float(var[2])),
                       ('Petal Width', float(var[3])),
                       ('Species', var[4])))

iris1_mod.take(5)

[('Sepal Length', 5.1),
 ('Sepal Width', 3.5),
 ('Petal Length', 1.4),
 ('Petal Width', 0.2),
 ('Species', 'setosa')]

### Sorting RDD

In [11]:
iris1_mod = iris1.map(lambda var: [float(var[0]), float(var[1]), float(var[2]), float(var[3]), var[4]])
iris1_mod.take(5)

[[5.1, 3.5, 1.4, 0.2, 'setosa'],
 [4.9, 3.0, 1.4, 0.2, 'setosa'],
 [4.7, 3.2, 1.3, 0.2, 'setosa'],
 [4.6, 3.1, 1.5, 0.2, 'setosa'],
 [5.0, 3.6, 1.4, 0.2, 'setosa']]

In [12]:
iris1_mod.sortBy(lambda x:x[0]).take(5)

[[4.3, 3.0, 1.1, 0.1, 'setosa'],
 [4.4, 2.9, 1.4, 0.2, 'setosa'],
 [4.4, 3.0, 1.3, 0.2, 'setosa'],
 [4.4, 3.2, 1.3, 0.2, 'setosa'],
 [4.5, 2.3, 1.3, 0.3, 'setosa']]

### Sorting based on key

In [13]:
iris1_mod = iris1.map(lambda var: [var[4], (var[0], var[1], var[2], var[3])])

iris1_mod.take(5)

[['setosa', ('5.1', '3.5', '1.4', '0.2')],
 ['setosa', ('4.9', '3.0', '1.4', '0.2')],
 ['setosa', ('4.7', '3.2', '1.3', '0.2')],
 ['setosa', ('4.6', '3.1', '1.5', '0.2')],
 ['setosa', ('5.0', '3.6', '1.4', '0.2')]]

In [14]:
iris1_mod.sortBy(ascending = False, keyfunc = lambda k:k).take(5)

[['virginica', ('7.9', '3.8', '6.4', '2.0')],
 ['virginica', ('7.7', '3.8', '6.7', '2.2')],
 ['virginica', ('7.7', '3.0', '6.1', '2.3')],
 ['virginica', ('7.7', '2.8', '6.7', '2.0')],
 ['virginica', ('7.7', '2.6', '6.9', '2.3')]]

## Union

In [15]:
sl = iris1.map(lambda var1:['Sepal Length', float(var1[0])])
sw = iris1.map(lambda var1:['Sepal Width', float(var1[1])])
pl = iris1.map(lambda var1:['Petal Length', float(var1[2])])
pw = iris1.map(lambda var1:['Petal Width', float(var1[3])])
sp = iris1.map(lambda var1:['Species', var1[4]])
sp.take(5)

[['Species', 'setosa'],
 ['Species', 'setosa'],
 ['Species', 'setosa'],
 ['Species', 'setosa'],
 ['Species', 'setosa']]

In [16]:
type(sp)

pyspark.rdd.PipelinedRDD

In [17]:
sl.take(5)

[['Sepal Length', 5.1],
 ['Sepal Length', 4.9],
 ['Sepal Length', 4.7],
 ['Sepal Length', 4.6],
 ['Sepal Length', 5.0]]

In [18]:
union_data = sp.union(sl)

![](images\spark_union.PNG)

### intersection() / intersect()

In [19]:
x = sc.parallelize(['a', 'b', 'c', 'd', 'e'])
y = sc.parallelize(['d', 'e', 'f', 'g', 'h'])
x.intersection(y).collect()

['d', 'e']

### RDD joins

In [20]:
loc1 = sc.parallelize([('emp1', [3,1,2,5]), ('emp2', [2,4,1,5,7])])
loc2 = sc.parallelize([('emp2', [15,7,3,1]), ('emp1', [6,9,1,3,0])])
join1 = loc1.join(loc2)

In [21]:
join1.collect()

[('emp2', ([2, 4, 1, 5, 7], [15, 7, 3, 1])),
 ('emp1', ([3, 1, 2, 5], [6, 9, 1, 3, 0]))]

### Loop through values - `foreach`

An RDD will contain multiple elements. If a certain operation needs to be performed on each element of RDD, we can make use of iterative loops like `foreach`.

In [22]:
iris1_mod = iris1.flatMap(lambda var: (('Sepal Length', float(var[0])),
                       ('Sepal Width', float(var[1])),
                       ('Petal Length', float(var[2])),
                       ('Petal Width', float(var[3]))))

def fun(x): 
    print(x)
    
# iris1_mod.foreach(fun).take(5)   # This is giving error AttributeError: 'NoneType' object has no attribute 'take'

iris1_mod.map(lambda x: x[1]+1).take(5)

[6.1, 4.5, 2.4, 1.2, 5.9]

In [23]:
iris1_mod.take(4)

[('Sepal Length', 5.1),
 ('Sepal Width', 3.5),
 ('Petal Length', 1.4),
 ('Petal Width', 0.2)]

### Filtering data based on a condition

In [24]:
# remove all records whose Sepal_Length (1st column value) is less than 7
f1 = iris1.filter(lambda x: float(x[0])>7)
f1.take(5)

[['7.1', '3.0', '5.9', '2.1', 'virginica'],
 ['7.6', '3.0', '6.6', '2.1', 'virginica'],
 ['7.3', '2.9', '6.3', '1.8', 'virginica'],
 ['7.2', '3.6', '6.1', '2.5', 'virginica'],
 ['7.7', '3.8', '6.7', '2.2', 'virginica']]

In [25]:
# to print onlt first column values and values >6.5
f2 = iris1.filter(lambda x: float(x[0])>6.5).map(lambda x: x[0])
f2.take(5)

['7.0', '6.9', '6.6', '6.7', '6.6']

##  Combining multiple RDD data with same key

When there are key-value pairs in two seperate RDDs and we are required to collate all data with the same key together from both RDDs.  

To achieve the same we can amke use of `cogroup()` function.

In [26]:
p1 = sc.parallelize([('a', 1), ('b', 2)])
p2 = sc.parallelize([('a', 99), ('b', 80), ('c', 100)])
p1.cogroup(p2).collect()

[('b',
  (<pyspark.resultiterable.ResultIterable at 0x1bfd3129608>,
   <pyspark.resultiterable.ResultIterable at 0x1bfd319fcc8>)),
 ('c',
  (<pyspark.resultiterable.ResultIterable at 0x1bfd30ae608>,
   <pyspark.resultiterable.ResultIterable at 0x1bfd30ff0c8>)),
 ('a',
  (<pyspark.resultiterable.ResultIterable at 0x1bfd3134ac8>,
   <pyspark.resultiterable.ResultIterable at 0x1bfd3134908>))]

Observe that for each key there is a collection of value as a result of cogroup function.

Also  
- Result of cogroup function can be realized by combining for loop with tuple, map and list function.
- for each key we get an iterable object.
- Using for loop we iterate through each key,value pair of the cogroup result and we place each value in a list, and the collection of the list is in turn placed inside a tuple.

In [27]:
[(x,tuple(map(list,y)))  for x,y in p1.cogroup(p2).collect()]

[('b', ([2], [80])), ('c', ([], [100])), ('a', ([1], [99]))]

Case when there are key-value pairs in multiple RDDs and we are required to collate all data with the same key together from all RDDs. Done using `groupWith` function.

`groupWith`works exactly like `cogroup` except that it deals with more RDDs.

[How to pass list of RDDs to groupWith in Pyspark](https://stackoverflow.com/questions/37621853/how-to-pass-list-of-rdds-to-groupwith-in-pyspark)

In [31]:
d1 = sc.parallelize([('a',5), ('b', 6)])
d2 = sc.parallelize([('c',5), ('d', 6)])
d3 = sc.parallelize([('a', 100)])
d4 = sc.parallelize([('d', 1000), ('b', 999)])

m = [d1,d2,d3,d4]

gw = m[0].groupWith(*m[1:])  # Since groupWith accepts varargs all you have to do is to unpack arguments

[(x,tuple(map(list,y)))  for x,y in gw.collect()]

[('b', ([6], [], [], [999])),
 ('c', ([], [5], [], [])),
 ('a', ([5], [], [100], [])),
 ('d', ([], [6], [], [1000]))]

## Counting occurence

### 1. Count occurence of each key element

To count the key element `countByKey` function.

In [32]:
p1 = sc.parallelize([('a', 5), ('b', 12), ('a', 23), ('c', 87), ('c', 65)])
p1.countByKey().items()

dict_items([('a', 2), ('b', 1), ('c', 2)])

In [33]:
iris1_mod = iris1.map(lambda x:(x[4],1))
iris1_mod.countByKey().items()

dict_items([('setosa', 50), ('versicolor', 50), ('virginica', 50)])

### 2. Count occurence of each value

Count of distinct occurence of each value in a particular column

In [34]:
iris1.map(lambda x:x[4]).countByValue().items()

dict_items([('setosa', 50), ('versicolor', 50), ('virginica', 50)])

Counting total number of values in a particular dataset

In [35]:
iris1.take(5)

[['5.1', '3.5', '1.4', '0.2', 'setosa'],
 ['4.9', '3.0', '1.4', '0.2', 'setosa'],
 ['4.7', '3.2', '1.3', '0.2', 'setosa'],
 ['4.6', '3.1', '1.5', '0.2', 'setosa'],
 ['5.0', '3.6', '1.4', '0.2', 'setosa']]

As each row of the iris dataset is imported as a list so there are 150 lists within the master list.

In [36]:
iris1.count()

150

### Distinct

Distinct occurence of values in a particular column can be calculated using the distinct function.

In [37]:
iris1_mod = iris1.map(lambda x:x[4])
distinct1 = iris1_mod.distinct()
distinct1.collect()

['setosa', 'versicolor', 'virginica']

### Generating sequence of values

In [38]:
range1 = sc.range(start = 1, end = 10, step = 2)
type(range1)

pyspark.rdd.PipelinedRDD

In [39]:
range1.collect()

[1, 3, 5, 7, 9]

### Apply function for each key - Using `mapValues()`

In [42]:
data1 = sc.parallelize([('SL', [2,1,3,5,1]), ('SW', [5,6,1,3,0])])

def add_vals(para):
    return(sum(para))

In [43]:
data1.mapValues(add_vals).collect()

[('SL', 12), ('SW', 15)]

## Grouping and Aggregation

### 1. Measure data in RDD cab be aggregated using `fold` function

__Reference:__ [operator module in python](https://docs.python.org/3/library/operator.html)

[fold operations in spark](https://www.edureka.co/community/11994/can-anyone-explain-fold-operation-in-spark)


`operator.add(x, y)` == `x+y`

In [44]:
from operator import add

To add all values of Sepal Width column

In [47]:
iris1.map(lambda x:float(x[1])).fold(0, add)

458.60000000000014

### 2. To aggregate the value for each key
Used when data is in key-value format.

In [48]:
iris1_mod = iris1.flatMap(lambda var: (('Sepal Length', float(var[0])),
                       ('Sepal Width', float(var[1])),
                       ('Petal Length', float(var[2])),
                       ('Petal Width', float(var[3]))))

iris1_mod.take(5)

[('Sepal Length', 5.1),
 ('Sepal Width', 3.5),
 ('Petal Length', 1.4),
 ('Petal Width', 0.2),
 ('Sepal Length', 4.9)]

In [49]:
iris1_mod.foldByKey(0, add).collect()

[('Sepal Length', 876.5000000000002),
 ('Sepal Width', 458.60000000000014),
 ('Petal Length', 563.7000000000004),
 ('Petal Width', 179.90000000000012)]

## Reduce

- Function used to reduce the elements of an RDD (usually used for aggregation).
- Argument is function to be executed on RDD.

In [50]:
iris1_mod = iris1.map(lambda x: float(x[0]))
iris1_mod.reduce(add)

876.5000000000002

### reduceByKey()
- Used where it is required to aggregate each group of data
- function passed by the function is applied for all values in particular key.

In [51]:
iris1_mod = iris1.flatMap(lambda var: (('Sepal Length', float(var[0])),
                       ('Sepal Width', float(var[1])),
                       ('Petal Length', float(var[2])),
                       ('Petal Width', float(var[3]))))

iris1_mod.reduceByKey(add).collect()

[('Sepal Length', 876.5000000000002),
 ('Sepal Width', 458.60000000000014),
 ('Petal Length', 563.7000000000004),
 ('Petal Width', 179.90000000000012)]

### groupByKey()

We can group elements with the same key using `groupByKey()` function. On applying the function we group data for each column.

The grouped result is an iterable object.

In [52]:
iris1_mod = iris1.flatMap(lambda var: (('Sepal Length', float(var[0])),
                       ('Sepal Width', float(var[1])),
                       ('Petal Length', float(var[2])),
                       ('Petal Width', float(var[3]))))

iris1_mod.take(5)

[('Sepal Length', 5.1),
 ('Sepal Width', 3.5),
 ('Petal Length', 1.4),
 ('Petal Width', 0.2),
 ('Sepal Length', 4.9)]

In [53]:
iris1_mod.groupByKey().collect()

[('Sepal Length', <pyspark.resultiterable.ResultIterable at 0x1bfd319a908>),
 ('Sepal Width', <pyspark.resultiterable.ResultIterable at 0x1bfd319aac8>),
 ('Petal Length', <pyspark.resultiterable.ResultIterable at 0x1bfd319a708>),
 ('Petal Width', <pyspark.resultiterable.ResultIterable at 0x1bfd319ac88>)]

Use `mapValues()` to convert iterable objects to list or any other form.

In [57]:
iris1_mod.groupByKey().mapValues(list).take(1)

[('Sepal Length',
  [5.1,
   4.9,
   4.7,
   4.6,
   5.0,
   5.4,
   4.6,
   5.0,
   4.4,
   4.9,
   5.4,
   4.8,
   4.8,
   4.3,
   5.8,
   5.7,
   5.4,
   5.1,
   5.7,
   5.1,
   5.4,
   5.1,
   4.6,
   5.1,
   4.8,
   5.0,
   5.0,
   5.2,
   5.2,
   4.7,
   4.8,
   5.4,
   5.2,
   5.5,
   4.9,
   5.0,
   5.5,
   4.9,
   4.4,
   5.1,
   5.0,
   4.5,
   4.4,
   5.0,
   5.1,
   4.8,
   5.1,
   4.6,
   5.3,
   5.0,
   7.0,
   6.4,
   6.9,
   5.5,
   6.5,
   5.7,
   6.3,
   4.9,
   6.6,
   5.2,
   5.0,
   5.9,
   6.0,
   6.1,
   5.6,
   6.7,
   5.6,
   5.8,
   6.2,
   5.6,
   5.9,
   6.1,
   6.3,
   6.1,
   6.4,
   6.6,
   6.8,
   6.7,
   6.0,
   5.7,
   5.5,
   5.5,
   5.8,
   6.0,
   5.4,
   6.0,
   6.7,
   6.3,
   5.6,
   5.5,
   5.5,
   6.1,
   5.8,
   5.0,
   5.6,
   5.7,
   5.7,
   6.2,
   5.1,
   5.7,
   6.3,
   5.8,
   7.1,
   6.3,
   6.5,
   7.6,
   4.9,
   7.3,
   6.7,
   7.2,
   6.5,
   6.4,
   6.8,
   5.7,
   5.8,
   6.4,
   6.5,
   7.7,
   7.7,
   6.0,
   6.9,
   5.6,
   7.7

### Aggregating grouped RDD

Aggregation can be performed on grouped data by combining the aggregate function with `mapValues()` and `groupByKey()` function.

In [58]:
iris1_mod.groupByKey().mapValues(sum).collect()

[('Sepal Length', 876.5000000000002),
 ('Sepal Width', 458.60000000000014),
 ('Petal Length', 563.7000000000004),
 ('Petal Width', 179.90000000000012)]

## Basic Descriptive statistics

### 1. max()

In [59]:
iris1.map(lambda x: float(x[1])).max()

4.4

### 2. min()

In [60]:
iris1.map(lambda x: float(x[1])).min()

2.0

## 3. mean()

In [61]:
iris1.map(lambda x: float(x[1])).mean()

3.0573333333333315

Measuring spread of data

### 4. std dev

In [63]:
iris1.map(lambda x: float(x[1])).stdev()

0.4344109677354942

### 5. variance

In [65]:
iris1.map(lambda x: float(x[1])).variance()

0.18871288888888857

### 6. Summary of data

In [66]:
iris1.map(lambda x: float(x[1])).stats()

(count: 150, mean: 3.0573333333333315, stdev: 0.4344109677354942, max: 4.4, min: 2.0)

### 7. subtractByKey()

Return each (key, value) pair in self that has no pair with matching key in other

In [67]:
x = sc.parallelize([('a',1), ('b',5),('a',5)])
y = sc.parallelize([('a',3), ('c',None)])
x.subtractByKey(y).collect()

[('b', 5)]

### 8. subtract()

In [68]:
x = sc.parallelize([('a',1), ('b',5),('a',3)])
y = sc.parallelize([('a',3), ('c',None), ('b', 2)])
x.subtract(y).collect()

[('a', 1), ('b', 5)]

In [69]:
sc.stop()