In [18]:
import os.path as osp
import string


from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

from pyspark.sql.types import IntegerType, StringType
from pyspark.sql.functions import mean, avg, stddev, udf

import pandas as pd
import numpy as np

from itertools import cycle
from operator import add, mul

from functools import reduce


In [19]:
session = SparkSession.builder.getOrCreate()

sc = session.sparkContext

# SparkContext (RDD)

- filter, foreach, distinct, first, reduce
- map, mapValues, flatMap, flatMapValues, collectAsMap, mapPartitions, mapPartitionsWithSplit
- groupBy, groupByKey, reduceByKey, countByKey, combineByKey, foldByKey, cogroup
- glom
- cartesian
- count, countByValue
- take
- join, union, rightOuterJoin, leftOuterJoin
- pipe

## Simple RDD

In [20]:
n = zip(cycle(['even', 'odd']), np.arange(10))
numbers = sc.parallelize(n)
print(numbers.collect())

[('even', 0), ('odd', 1), ('even', 2), ('odd', 3), ('even', 4), ('odd', 5), ('even', 6), ('odd', 7), ('even', 8), ('odd', 9)]


In [21]:
print(numbers.getNumPartitions())
numbers.glom().collect()

4


[[('even', 0), ('odd', 1)],
 [('even', 2), ('odd', 3)],
 [('even', 4), ('odd', 5)],
 [('even', 6), ('odd', 7), ('even', 8), ('odd', 9)]]

In [22]:
print(numbers.aggregateByKey(seqFunc=lambda x, y: x * y,
                      
                            combFunc=lambda x, y: x-y, zeroValue=1).collect())

#print(numbers.combineByKey(createCombiner=lambda x: [x], mergeCombiners=lambda x, y: x-y, mergeValue=lambda x,y: x*y).collect())

print(numbers.reduceByKey(func=lambda x, y: x * y).collect())
print(numbers.foldByKey(zeroValue=2, func=lambda x, y: x * y ).collect()) # (2*1) * (2*3) * (2*5) * (2*7*9) 

[('even', -54), ('odd', -70)]
[('even', 0), ('odd', 945)]
[('even', 0), ('odd', 15120)]


In [8]:
print(numbers.groupByKey().mapValues(list).collect())

[('even', [0, 2, 4, 6, 8]), ('odd', [1, 3, 5, 7, 9])]


In [23]:
print(numbers.countByKey())
print(numbers.countByKey()['even'])

defaultdict(<class 'int'>, {'even': 5, 'odd': 5})
5


## map / flatMap / mapValues / flatMapValues

In [24]:
print(numbers.map(lambda s: s[0]).collect())
print('\n')
print(numbers.flatMap(lambda s: s[0]).collect())
print('\n')
print(numbers.mapValues(lambda s: s**2).collect())
print('\n')
numbers_with_list_as_value = sc.parallelize([('even', [0, 2, 4, 6, 8]), ('odd', [1, 3, 5, 7, 9]), ('odd', [11])])
numbers_with_list_as_value.flatMapValues(lambda s: s).collect()

['even', 'odd', 'even', 'odd', 'even', 'odd', 'even', 'odd', 'even', 'odd']


['e', 'v', 'e', 'n', 'o', 'd', 'd', 'e', 'v', 'e', 'n', 'o', 'd', 'd', 'e', 'v', 'e', 'n', 'o', 'd', 'd', 'e', 'v', 'e', 'n', 'o', 'd', 'd', 'e', 'v', 'e', 'n', 'o', 'd', 'd']


[('even', 0), ('odd', 1), ('even', 4), ('odd', 9), ('even', 16), ('odd', 25), ('even', 36), ('odd', 49), ('even', 64), ('odd', 81)]




[('even', 0),
 ('even', 2),
 ('even', 4),
 ('even', 6),
 ('even', 8),
 ('odd', 1),
 ('odd', 3),
 ('odd', 5),
 ('odd', 7),
 ('odd', 9),
 ('odd', 11)]

##  <font color='blue'>  Word count in text file </font>

In [25]:
txtpath = 'input.txt'
data = sc.textFile(txtpath, minPartitions=20)
data.collect()

['Oak is strong and also gives shade.',
 'Cats and dogs each hate the other.',
 'The Pipe began to rush whicle new.',
 "Open the crate but don't break the glass.",
 'Add the sum to the product of these three.',
 'Thieves who rob friends deserves jail.',
 'The ripe taste of cheese improves with age.',
 'Act on these orders with great speed.',
 'The hog crawled under the high frence.',
 'Move the vat over the hot fire.s']

In [12]:
# Remove punctuation from the words. This applies to every punctuation mark regardless its position.
a = '.?lk-jsf./.m./.nalsdf!asdf!'
translator = str.maketrans("", "", string.punctuation)
a.translate(translator)

'lkjsfmnalsdfasdf'

In [13]:
punctuation_marks = ['.', '?', '-']

def remove_punctuation_mark(string, marks_list):
    for mark in marks_list:
        if string.endswith(mark):
            return string[:-1]
    else:
        return string

test_string = 'lkafj?'
remove_punctuation_mark(test_string, punctuation_marks)

'lkafj'

In [26]:
print(sorted(data.flatMap(lambda s: s.split()).map(lambda w: (remove_punctuation_mark(w.lower(), punctuation_marks), 1))\
       .reduceByKey(lambda x, y: x+y).collect(),\
       key=lambda n: n[1], reverse=True ))

[('the', 11), ('to', 2), ('and', 2), ('of', 2), ('with', 2), ('these', 2), ('hate', 1), ('ripe', 1), ('pipe', 1), ('hot', 1), ('add', 1), ('cheese', 1), ('high', 1), ('gives', 1), ('who', 1), ('orders', 1), ('dogs', 1), ('crate', 1), ('crawled', 1), ('move', 1), ('is', 1), ('deserves', 1), ('product', 1), ('hog', 1), ('jail', 1), ('over', 1), ('vat', 1), ('each', 1), ('rush', 1), ('break', 1), ('on', 1), ('strong', 1), ('began', 1), ('open', 1), ('rob', 1), ('act', 1), ('frence', 1), ("don't", 1), ('taste', 1), ('other', 1), ('but', 1), ('oak', 1), ('also', 1), ('friends', 1), ('improves', 1), ('fire.s', 1), ('age', 1), ('great', 1), ('shade', 1), ('new', 1), ('glass', 1), ('three', 1), ('thieves', 1), ('under', 1), ('cats', 1), ('whicle', 1), ('speed', 1), ('sum', 1)]


## Union / Join

In [27]:
x = sc.parallelize([('even', 10), ('odd', 21), ('even', 20)])
y = sc.parallelize([('even', 0), ('even', 20), ('odd', 1), ('even', 20)])

In [28]:
y.join(x).collect() # Join every element of y to every element of x based on the same key.

[('even', (0, 10)),
 ('even', (0, 20)),
 ('even', (20, 10)),
 ('even', (20, 20)),
 ('even', (20, 10)),
 ('even', (20, 20)),
 ('odd', (1, 21))]

In [29]:
y.union(x).collect() # Union of the two RDDs. If there is a common element in the two RDDs, it is included twice.

[('even', 0),
 ('even', 20),
 ('odd', 1),
 ('even', 20),
 ('even', 10),
 ('odd', 21),
 ('even', 20)]

# SparkSession

## DataFrames

There are two ways to query the DataFrame.

- 
```
df = session.createDataFrame(data)
df.createOrReplaceTempView('name')
```

- If we have already created a spark DataFrame through `.toDF`, and we want to query the dataframe we first need to register it, e.g. `df.registerTempView('name')`.

### Spark Syntax

In [30]:
filepath = '/home/thanasissdr/Downloads/sample.csv'

df = session.read.csv(filepath, sep=',', header=True)
df.printSchema()

AnalysisException: 'Path does not exist: file:/home/thanasissdr/Downloads/sample.csv;'

In [18]:
df.select(['policyID', 'county']).show(5)

+--------+-----------+
|policyID|     county|
+--------+-----------+
|  119736|CLAY COUNTY|
|  448094|CLAY COUNTY|
|  206893|CLAY COUNTY|
|  333743|CLAY COUNTY|
|  172534|CLAY COUNTY|
+--------+-----------+
only showing top 5 rows



In [19]:
df.select('county').groupby('county').count().orderBy('count', ascending=False).limit(10).show()

+-------------------+-----+
|             county|count|
+-------------------+-----+
|  MIAMI DADE COUNTY| 4315|
|     BROWARD COUNTY| 3193|
|  PALM BEACH COUNTY| 2791|
|       DUVAL COUNTY| 1894|
|      ORANGE COUNTY| 1811|
|    PINELLAS COUNTY| 1774|
|        POLK COUNTY| 1629|
|     VOLUSIA COUNTY| 1367|
|HILLSBOROUGH COUNTY| 1166|
|      MARION COUNTY| 1138|
+-------------------+-----+



In [20]:
data = sc.parallelize([('a', 10), ('b', 2), ('a', 2), ('a', 20), ('b', 10)])

small_df = session.createDataFrame(data, ['BLOOD_TYPE', 'AGE'])
small_df = small_df.toDF('blood_type', 'age')
small_df.show()

+----------+---+
|blood_type|age|
+----------+---+
|         a| 10|
|         b|  2|
|         a|  2|
|         a| 20|
|         b| 10|
+----------+---+



In [21]:
small_df.where(small_df.age < 10).show()

+----------+---+
|blood_type|age|
+----------+---+
|         b|  2|
|         a|  2|
+----------+---+



In [22]:
small_df.groupBy('blood_type').count().show()

+----------+-----+
|blood_type|count|
+----------+-----+
|         b|    2|
|         a|    3|
+----------+-----+



In [23]:
double = lambda s: s**2
small_df.withColumn('squared_age', double(small_df.age)).show()
small_df.withColumnRenamed('age', 'new_column').show()

+----------+---+-----------+
|blood_type|age|squared_age|
+----------+---+-----------+
|         a| 10|      100.0|
|         b|  2|        4.0|
|         a|  2|        4.0|
|         a| 20|      400.0|
|         b| 10|      100.0|
+----------+---+-----------+

+----------+----------+
|blood_type|new_column|
+----------+----------+
|         a|        10|
|         b|         2|
|         a|         2|
|         a|        20|
|         b|        10|
+----------+----------+



### SQL Syntax

In [24]:
df.createOrReplaceTempView('df')
small_df.createOrReplaceTempView('small_df')

In [25]:
query = 'SELECT policyID, county FROM df'
session.sql(query).show(5)

+--------+-----------+
|policyID|     county|
+--------+-----------+
|  119736|CLAY COUNTY|
|  448094|CLAY COUNTY|
|  206893|CLAY COUNTY|
|  333743|CLAY COUNTY|
|  172534|CLAY COUNTY|
+--------+-----------+
only showing top 5 rows



In [26]:
query = 'SELECT county, COUNT(county) as `c` FROM df GROUP BY county ORDER BY c DESC LIMIT 10'
session.sql(query).show()

+-------------------+----+
|             county|   c|
+-------------------+----+
|  MIAMI DADE COUNTY|4315|
|     BROWARD COUNTY|3193|
|  PALM BEACH COUNTY|2791|
|       DUVAL COUNTY|1894|
|      ORANGE COUNTY|1811|
|    PINELLAS COUNTY|1774|
|        POLK COUNTY|1629|
|     VOLUSIA COUNTY|1367|
|HILLSBOROUGH COUNTY|1166|
|      MARION COUNTY|1138|
+-------------------+----+



## Applying rdd operations 
We can apply map/filter into a DataFrame  by applying 
`DataFrame.rdd.map` or `DataFrame.rdd.filter` etc.

In [27]:
query = 'SELECT * FROM small_df WHERE small_df.age <= 12'
teenagers = session.sql(query)
teenagers.rdd.map(lambda s: s['age']*2).collect()

[20, 4, 4, 20]

In [28]:
teenagers.rdd.mapValues(lambda s: s**2).toDF(['blood_type', 'age']).show()

+----------+---+
|blood_type|age|
+----------+---+
|         a|100|
|         b|  4|
|         a|  4|
|         b|100|
+----------+---+



# Writing a `parquet` file.

In [32]:
if not osp.exists('sample.parquet'):
    df.write.parquet('sample.qarquet')

In [30]:
parquetFile = session.read.parquet('sample.parquet')
small_df.show()

+----------+---+
|blood_type|age|
+----------+---+
|         a| 10|
|         b|  2|
|         a|  2|
|         a| 20|
|         b| 10|
+----------+---+



## Applying an operation to a DataFrame column.

In [33]:
def cube(x):
    return x**3

small_df.withColumn('age', cube(small_df.age)).show()

+----------+------+
|blood_type|   age|
+----------+------+
|         a|1000.0|
|         b|   8.0|
|         a|   8.0|
|         a|8000.0|
|         b|1000.0|
+----------+------+



In [34]:
numbers = sc.parallelize(zip(cycle(['a', 'b', 'ab']), [int(i) for i in np.random.randint(low=1, high=30, size=20)], \
                        [int(j) for j in np.random.randint(low=1, high=100, size=20)]))
cols = ['blood_type', 'age', 'days_to_birthday']

In [35]:
df = numbers.toDF(cols)
df.show()

+----------+---+----------------+
|blood_type|age|days_to_birthday|
+----------+---+----------------+
|         a| 25|               5|
|         b| 25|              60|
|        ab| 17|               7|
|         a| 20|              93|
|         b| 14|              70|
|        ab| 26|              43|
|         a| 10|              91|
|         b| 22|              88|
|        ab| 28|              89|
|         a| 18|              98|
|         b|  1|              64|
|        ab| 22|              19|
|         a| 23|              23|
|         b| 27|               9|
|        ab|  9|              81|
|         a| 13|              25|
|         b|  3|              31|
|        ab| 19|              61|
|         a| 20|              38|
|         b| 19|              84|
+----------+---+----------------+



## Creating a new column in an existing DataFrame

In [36]:
multiply_by_ten = udf(lambda s: s*10)
extended_df = df.withColumn('age_multiplied_by_ten', multiply_by_ten(df.age))
extended_df.show()

+----------+---+----------------+---------------------+
|blood_type|age|days_to_birthday|age_multiplied_by_ten|
+----------+---+----------------+---------------------+
|         a| 25|               5|                  250|
|         b| 25|              60|                  250|
|        ab| 17|               7|                  170|
|         a| 20|              93|                  200|
|         b| 14|              70|                  140|
|        ab| 26|              43|                  260|
|         a| 10|              91|                  100|
|         b| 22|              88|                  220|
|        ab| 28|              89|                  280|
|         a| 18|              98|                  180|
|         b|  1|              64|                   10|
|        ab| 22|              19|                  220|
|         a| 23|              23|                  230|
|         b| 27|               9|                  270|
|        ab|  9|              81|               

## Calculating `mean`, `std` 

In [45]:
import time
start = time.time()
print(df.select('age').rdd.map(lambda s: s[0]).mean())
end = time.time()
print(end - start)
#print(df.rdd.map(lambda s: s['age']).stdev())

18.05
0.12297463417053223


In [51]:
start = time.time()
print(df.rdd.map(lambda s: s['age']).mean())
end = time.time()
print(end - start)

18.05
0.10369205474853516


In [52]:
start = time.time()
print(df.select(mean("age")))
end = time.time()
print(end - start)

DataFrame[avg(age): double]
0.009303569793701172
