Reference

https://colab.research.google.com/github/pnavaro/big-data/blob/master/notebooks/15-PySpark.ipynb#scrollTo=Kx0hch3XePOx

In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop3.2.tgz
!tar xf spark-3.0.0-bin-hadoop3.2.tgz
!pip install -q findspark

In [2]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop3.2"

In [3]:
import findspark
findspark.init()

In [4]:
from pyspark import SparkContext , SparkConf
from pyspark.sql import SparkSession

conf = SparkConf().set('spark.ui.port', '4050')

In [5]:
sc = SparkContext(conf=conf)
spark = SparkSession.builder.master("local[*]").getOrCreate()

## DATASET

In [20]:
import pandas as pd
import numpy as np

df = pd.read_csv('wineQuality.csv')

In [7]:
df.describe()

Unnamed: 0.2,Unnamed: 0,Unnamed: 0.1,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
count,6463.0,6463.0,6463.0,6463.0,6463.0,6463.0,6463.0,6463.0,6463.0,6463.0,6463.0,6463.0,6463.0,6463.0
mean,3255.125793,3255.125793,7.217755,0.339589,0.318758,5.443958,0.056056,30.516865,115.694492,0.994698,3.218332,0.53115,10.492825,5.818505
std,1869.906597,1869.906597,1.297913,0.164639,0.145252,4.756852,0.035076,17.758815,56.526736,0.003001,0.16065,0.148913,1.193128,0.873286
min,0.0,0.0,3.8,0.08,0.0,0.6,0.009,1.0,6.0,0.98711,2.72,0.22,8.0,3.0
25%,1639.5,1639.5,6.4,0.23,0.25,1.8,0.038,17.0,77.0,0.99233,3.11,0.43,9.5,5.0
50%,3257.0,3257.0,7.0,0.29,0.31,3.0,0.047,29.0,118.0,0.99489,3.21,0.51,10.3,6.0
75%,4872.5,4872.5,7.7,0.4,0.39,8.1,0.065,41.0,156.0,0.997,3.32,0.6,11.3,6.0
max,6496.0,6496.0,15.9,1.58,1.66,65.8,0.611,289.0,440.0,1.03898,4.01,2.0,14.9,9.0


In [8]:
df.isnull().sum()

Unnamed: 0              0
Unnamed: 0.1            0
type                    0
fixed acidity           0
volatile acidity        0
citric acid             0
residual sugar          0
chlorides               0
free sulfur dioxide     0
total sulfur dioxide    0
density                 0
pH                      0
sulphates               0
alcohol                 0
quality                 0
dtype: int64

In [None]:
# df.dropna(inplace=True)

In [None]:
# df.to_csv('wineQuality.csv')

In [None]:
# df[df['volatile acidity'] >1.4 ]

Unnamed: 0.1,Unnamed: 0,type,fixed acidity,volatile acidity,citric acid,residual sugar,chlorides,free sulfur dioxide,total sulfur dioxide,density,pH,sulphates,alcohol,quality
20,20,white,6.2,0.660,0.48,1.2,0.029,29.0,75.0,0.98920,3.33,0.39,12.8,8
23,23,white,7.6,0.670,0.14,1.5,0.074,25.0,168.0,0.99370,3.05,0.51,9.3,5
178,178,white,6.0,0.670,0.07,1.2,0.060,9.0,108.0,0.99310,3.11,0.35,8.7,4
221,221,white,7.2,0.685,0.21,9.5,0.070,33.0,172.0,0.99710,3.00,0.55,9.1,6
372,372,white,6.6,0.905,0.19,0.8,0.048,17.0,204.0,0.99340,3.34,0.56,10.0,5
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6475,6475,red,6.2,0.700,0.15,5.1,0.076,13.0,27.0,0.99622,3.54,0.60,11.9,6
6476,6476,red,6.8,0.670,0.15,1.8,0.118,13.0,20.0,0.99540,3.42,0.67,11.3,6
6480,6480,red,6.1,0.715,0.10,2.6,0.053,13.0,27.0,0.99362,3.57,0.50,11.9,5
6487,6487,red,6.6,0.725,0.20,7.8,0.073,29.0,79.0,0.99770,3.29,0.54,9.2,5


## CREATE RDD

1) Using parallelize


In [None]:
RDD1 = sc.parallelize(df)
rddCollect = RDD1.collect()
print("Number of Partitions: "+str(RDD1.getNumPartitions()))
print("Action: First element: "+str(RDD1.first()))
print(rddCollect)

Number of Partitions: 2
Action: First element: type
['type', 'fixed acidity', 'volatile acidity', 'citric acid', 'residual sugar', 'chlorides', 'free sulfur dioxide', 'total sulfur dioxide', 'density', 'pH', 'sulphates', 'alcohol', 'quality']


In [None]:
print("is Empty RDD? : "+str(RDD1.isEmpty()))

is Empty RDD? : False


2) read.csv

In [9]:
RDD2 = spark.read.csv('wineQuality.csv',inferSchema=True, header =True).rdd
RDD2

MapPartitionsRDD[14] at javaToPython at NativeMethodAccessorImpl.java:0

3) textfile

In [None]:
RDD3 = sc.textFile('wineQuality.csv')

In [None]:
RDD3

wineQuality.csv MapPartitionsRDD[107] at textFile at NativeMethodAccessorImpl.java:0

## MAP

In [None]:
RDD2.map(lambda x:x).collect()

[Row(_c0=0, type='white', fixed acidity=7.0, volatile acidity=0.27, citric acid=0.36, residual sugar=20.7, chlorides=0.045, free sulfur dioxide=45.0, total sulfur dioxide=170.0, density=1.001, pH=3.0, sulphates=0.45, alcohol=8.8, quality=6),
 Row(_c0=1, type='white', fixed acidity=6.3, volatile acidity=0.3, citric acid=0.34, residual sugar=1.6, chlorides=0.049, free sulfur dioxide=14.0, total sulfur dioxide=132.0, density=0.994, pH=3.3, sulphates=0.49, alcohol=9.5, quality=6),
 Row(_c0=2, type='white', fixed acidity=8.1, volatile acidity=0.28, citric acid=0.4, residual sugar=6.9, chlorides=0.05, free sulfur dioxide=30.0, total sulfur dioxide=97.0, density=0.9951, pH=3.26, sulphates=0.44, alcohol=10.1, quality=6),
 Row(_c0=3, type='white', fixed acidity=7.2, volatile acidity=0.23, citric acid=0.32, residual sugar=8.5, chlorides=0.058, free sulfur dioxide=47.0, total sulfur dioxide=186.0, density=0.9956, pH=3.19, sulphates=0.4, alcohol=9.9, quality=6),
 Row(_c0=4, type='white', fixed aci

In [None]:
RDD2.filter(lambda x : x['volatile acidity'] > 1.2 ).collect()

[Row(_c0=5024, Unnamed: 0=5024, type='red', fixed acidity=8.2, volatile acidity=1.33, citric acid=0.0, residual sugar=1.7, chlorides=0.081, free sulfur dioxide=3.0, total sulfur dioxide=12.0, density=0.9964, pH=3.53, sulphates=0.49, alcohol=10.9, quality=5),
 Row(_c0=5025, Unnamed: 0=5025, type='red', fixed acidity=8.1, volatile acidity=1.33, citric acid=0.0, residual sugar=1.8, chlorides=0.082, free sulfur dioxide=3.0, total sulfur dioxide=12.0, density=0.9964, pH=3.54, sulphates=0.48, alcohol=10.9, quality=5),
 Row(_c0=5570, Unnamed: 0=5570, type='red', fixed acidity=9.8, volatile acidity=1.24, citric acid=0.34, residual sugar=2.0, chlorides=0.079, free sulfur dioxide=32.0, total sulfur dioxide=151.0, density=0.998, pH=3.15, sulphates=0.53, alcohol=9.5, quality=5),
 Row(_c0=6197, Unnamed: 0=6197, type='red', fixed acidity=7.6, volatile acidity=1.58, citric acid=0.0, residual sugar=2.1, chlorides=0.137, free sulfur dioxide=5.0, total sulfur dioxide=9.0, density=0.99476, pH=3.5, sulpha

In [11]:
# flatmap
rdd_filtered_map = RDD2.flatMap(lambda x : [x['citric acid']/2 , x['pH']/14] )
rdd_filtered_map.take(10)

[0.18,
 0.21428571428571427,
 0.17,
 0.2357142857142857,
 0.2,
 0.23285714285714285,
 0.16,
 0.22785714285714284,
 0.16,
 0.22785714285714284]

In [17]:
# sortby
rdd_sorted = RDD2.sortBy(lambda x: x['fixed acidity'])
rdd_sorted.take(20)

[Row(_c0=4259, Unnamed: 0=4259, type='white', fixed acidity=3.8, volatile acidity=0.31, citric acid=0.02, residual sugar=11.1, chlorides=0.036, free sulfur dioxide=20.0, total sulfur dioxide=114.0, density=0.99248, pH=3.75, sulphates=0.44, alcohol=12.4, quality=6),
 Row(_c0=4787, Unnamed: 0=4787, type='white', fixed acidity=3.9, volatile acidity=0.225, citric acid=0.4, residual sugar=4.2, chlorides=0.03, free sulfur dioxide=29.0, total sulfur dioxide=118.0, density=0.989, pH=3.57, sulphates=0.36, alcohol=12.8, quality=8),
 Row(_c0=2872, Unnamed: 0=2872, type='white', fixed acidity=4.2, volatile acidity=0.17, citric acid=0.36, residual sugar=1.8, chlorides=0.029, free sulfur dioxide=93.0, total sulfur dioxide=161.0, density=0.98999, pH=3.65, sulphates=0.89, alcohol=12.0, quality=7),
 Row(_c0=3265, Unnamed: 0=3265, type='white', fixed acidity=4.2, volatile acidity=0.215, citric acid=0.23, residual sugar=5.1, chlorides=0.041, free sulfur dioxide=64.0, total sulfur dioxide=157.0, density=0

## ACTIONS

In [18]:
# first
rdd_sorted.first()

Row(_c0=4259, Unnamed: 0=4259, type='white', fixed acidity=3.8, volatile acidity=0.31, citric acid=0.02, residual sugar=11.1, chlorides=0.036, free sulfur dioxide=20.0, total sulfur dioxide=114.0, density=0.99248, pH=3.75, sulphates=0.44, alcohol=12.4, quality=6)

In [22]:
# reduce
data = np.arange(10)
print(data)
sc.parallelize(data).reduce(add)

[0 1 2 3 4 5 6 7 8 9]


45

In [24]:
# take
rdd_sorted.take(10)

[Row(_c0=4259, Unnamed: 0=4259, type='white', fixed acidity=3.8, volatile acidity=0.31, citric acid=0.02, residual sugar=11.1, chlorides=0.036, free sulfur dioxide=20.0, total sulfur dioxide=114.0, density=0.99248, pH=3.75, sulphates=0.44, alcohol=12.4, quality=6),
 Row(_c0=4787, Unnamed: 0=4787, type='white', fixed acidity=3.9, volatile acidity=0.225, citric acid=0.4, residual sugar=4.2, chlorides=0.03, free sulfur dioxide=29.0, total sulfur dioxide=118.0, density=0.989, pH=3.57, sulphates=0.36, alcohol=12.8, quality=8),
 Row(_c0=2872, Unnamed: 0=2872, type='white', fixed acidity=4.2, volatile acidity=0.17, citric acid=0.36, residual sugar=1.8, chlorides=0.029, free sulfur dioxide=93.0, total sulfur dioxide=161.0, density=0.98999, pH=3.65, sulphates=0.89, alcohol=12.0, quality=7),
 Row(_c0=3265, Unnamed: 0=3265, type='white', fixed acidity=4.2, volatile acidity=0.215, citric acid=0.23, residual sugar=5.1, chlorides=0.041, free sulfur dioxide=64.0, total sulfur dioxide=157.0, density=0

In [25]:
# join 
x = sc.parallelize([("a", 1), ("b", 2)])
y = sc.parallelize([("a", 3), ("a", 4), ("b", 5)])
x.join(y).collect()

[('b', (2, 5)), ('a', (1, 3)), ('a', (1, 4))]

In [27]:
# groupByKey
rdd = sc.parallelize([('B',5),('B',4),('A',3),('C',1),('A',2),('A',1)])
rdd = rdd.groupByKey()
[(j[0], list(j[1])) for j in rdd.collect()]

[('C', [1]), ('B', [5, 4]), ('A', [3, 2, 1])]

In [None]:
RDD4 = sc.parallelize([("a", 1), ("b", 1), ("a", 1), ("c",2), ("a",2) , ("d",1)])

In [13]:
# reduceByKey
from operator import add
sorted(RDD4.reduceByKey(add).collect())

[('a', 4), ('b', 1), ('c', 2), ('d', 1)]

In [32]:
# countByKey
sorted(RDD4.countByKey().items())


[('a', 3), ('b', 1), ('c', 1), ('d', 1)]

## WORD COUNT

In [36]:
rdd = sc.textFile("content.txt")
rdd.collect()

['Love, hate, or feel meh about Harry Potter, it’s hard to argue that J.K. Rowling filled the books with ',
 'intentional writing choices. From made up words to the meanings of names to the well-scripted first and last lines of each novel, ',
 'Rowling wanted to the writing to match the intricate fantasy world she created for the now-iconic boy wizard. To examine a few of ',
 'these choices, I’ll be taking a closer look at the first line of Harry Potter, as well as the last lines, from all of the Harry Potter ',
 'novels']

In [40]:
rdd2 = rdd.flatMap(lambda x: x.split(" "))
rdd2.take(10)

['Love,',
 'hate,',
 'or',
 'feel',
 'meh',
 'about',
 'Harry',
 'Potter,',
 'it’s',
 'hard']

In [41]:
rdd3 = rdd2.map(lambda x: (x,1))
rdd3.take(10)

[('Love,', 1),
 ('hate,', 1),
 ('or', 1),
 ('feel', 1),
 ('meh', 1),
 ('about', 1),
 ('Harry', 1),
 ('Potter,', 1),
 ('it’s', 1),
 ('hard', 1)]

In [39]:
rdd4 = rdd3.reduceByKey(lambda a,b: a+b)
rdd4.take(10)

[('hate,', 1),
 ('feel', 1),
 ('meh', 1),
 ('Potter,', 2),
 ('it’s', 1),
 ('filled', 1),
 ('books', 1),
 ('', 4),
 ('intentional', 1),
 ('writing', 2)]

In [43]:
rdd5 = rdd4.map(lambda x: (x[1],x[0])).sortByKey()
rdd5.collect()

[(1, 'hate,'),
 (1, 'feel'),
 (1, 'meh'),
 (1, 'it’s'),
 (1, 'filled'),
 (1, 'books'),
 (1, 'intentional'),
 (1, 'choices.'),
 (1, 'names'),
 (1, 'match'),
 (1, 'world'),
 (1, 'created'),
 (1, 'boy'),
 (1, 'wizard.'),
 (1, 'these'),
 (1, 'choices,'),
 (1, 'I’ll'),
 (1, 'look'),
 (1, 'at'),
 (1, 'line'),
 (1, 'Love,'),
 (1, 'or'),
 (1, 'about'),
 (1, 'hard'),
 (1, 'argue'),
 (1, 'that'),
 (1, 'J.K.'),
 (1, 'with'),
 (1, 'From'),
 (1, 'made'),
 (1, 'up'),
 (1, 'words'),
 (1, 'meanings'),
 (1, 'well-scripted'),
 (1, 'and'),
 (1, 'lines'),
 (1, 'each'),
 (1, 'novel,'),
 (1, 'wanted'),
 (1, 'intricate'),
 (1, 'fantasy'),
 (1, 'she'),
 (1, 'for'),
 (1, 'now-iconic'),
 (1, 'To'),
 (1, 'examine'),
 (1, 'few'),
 (1, 'be'),
 (1, 'taking'),
 (1, 'closer'),
 (1, 'well'),
 (1, 'lines,'),
 (1, 'from'),
 (1, 'all'),
 (1, 'Potter'),
 (1, 'novels'),
 (2, 'Potter,'),
 (2, 'writing'),
 (2, 'last'),
 (2, 'as'),
 (2, 'Rowling'),
 (2, 'first'),
 (2, 'a'),
 (3, 'Harry'),
 (4, ''),
 (5, 'of'),
 (5, 'to'),
 (9