# Spark (WIP)


**TODO: Merging with ds_pragmatic_programming_pyspark**

To run this notebook you need to run jupyetr from inside the docker container. Soes not work in binder right now

* how to run using docker image for spark

```sh

# Install images
docker pull ucsddse230/cse255-dse230

# Run container
docker run --name ds_pragmatic -it -p 8890:8888 -v /media/leandroohf/sdb1/leandro/ds_pragmatic_programming:/home/ucsddse230/ ucsddse230/cse255-dse230 /bin/bash

# If you need to ssh to the container
docker exec -it ds_pragmatic /bin/bash

# Run jupyter inside container
jupyter notebook

```

1. http://localhost:8889/tree
2. Copy and paste token to login in the notebook

 
* refs:

    * https://courses.edx.org/courses/course-v1:BerkeleyX+CS105x+1T2016/course/
    * https://courses.edx.org/courses/course-v1:BerkeleyX+CS105x+1T2016/courseware/d1f293d0cb53466dbb5c0cd81f55b45b/fe9a95cc542d4c30b855e632663c4797/8?activate_block_id=block-v1%3ABerkeleyX%2BCS105x%2B1T2016%2Btype%40vertical%2Bblock%4083ff2d3b4e93489b9b7b4861811e0872



In [1]:
import numpy as np
import pandas as pd

from scipy import stats

import matplotlib
import matplotlib.pyplot as plt
%matplotlib inline 

import IPython
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

from sqlalchemy import create_engine
import datetime as dt


from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql.types import Row, StructField, StructType, StringType, IntegerType
%pylab inline

#start the SparkContext
import findspark
findspark.init()

from pyspark import SparkContext

sc = SparkContext(master="local[4]")


Populating the interactive namespace from numpy and matplotlib


In [31]:
?SparkContext

In [2]:
!pwd
!ls data/Weather/

/home/ucsddse230
NY.tgz


## Creation

```python
# example of reading form HDFS
# supported formats: TEXT, CSV, Avro, Parquet and JSON 
rddFromFile = sc.textFile("hdfs://nn1home:8020/text01.txt")
```

In [3]:
# form txt file 
!head data/Moby-Dick.txt

# method is used to read a text file from HDFS, S3 and any Hadoop supported file system
text_file = sc.textFile('data/Moby-Dick.txt')
type(text_file)


pair_rdd = sc.parallelize([(1,2), (3,4)])
print(pair_rdd.collect())

﻿The Project Gutenberg EBook of Moby Dick; or The Whale, by Herman Melville

This eBook is for the use of anyone anywhere at no cost and with
almost no restrictions whatsoever.  You may copy it, give it away or
re-use it under the terms of the Project Gutenberg License included
with this eBook or online at www.gutenberg.org


Title: Moby Dick; or The Whale



pyspark.rdd.RDD

[(1, 2), (3, 4)]


In [4]:
!pwd

# data
!ls data/Weather/

/home/ucsddse230
NY.tgz


## Transformations

* You cannot use any operation on the map function. The operation should NOT depend of the other like subtraction or division. Will get different results while runnning multiple times

 Transformations on (key,value) rdds. **RDD $\to$ RDD**

### map, filter n sample 

* **No communication needed**.

In [5]:
regular_rdd = sc.parallelize([1, 2, 3, 4, 2, 5, 6])

# multiplication does not depend of the order
pair_rdd = regular_rdd.map( lambda x: (x, x*x) )
print(pair_rdd.collect())

print(regular_rdd.filter( lambda x: x > 3 ).collect())

# sample(withReplacement, fraction, seed)
print(regular_rdd.sample(True, 0.5, 11))


rdd = sc.parallelize([(1,2), (2,4), (2,6)])
print("Original RDD :", rdd.collect())

# LHOF Notes
x = 3
print('list: ', list(range(x,x+2)))

# the lambda function generates for each number i, an iterator that produces i,i+1
print("After transformation : ", rdd.flatMapValues(lambda x: list(range(x,x+2))).collect())

[(1, 1), (2, 4), (3, 9), (4, 16), (2, 4), (5, 25), (6, 36)]
[4, 5, 6]
PythonRDD[6] at RDD at PythonRDD.scala:48
Original RDD : [(1, 2), (2, 4), (2, 6)]
list:  [3, 4]
After transformation :  [(1, 2), (1, 3), (2, 4), (2, 5), (2, 6), (2, 7)]


### GroupbyKey,  reduceByKey n sortByKey

**Shuffles:** RDD $\to$ RDD, **shuffle** needed

**Shuffles are costly transfromations**

* **Examples:** sort, distinct, repartition, sortByKey, reduceByKey, join [More](http://spark.apache.org/docs/latest/rdd-programming-guide.html#shuffle-operations)
  * **A LOT** of communication might be needed.


**Properties of reduce operations**

* Reduce operations **must not depend on the order**
  * Order of operands should not matter
  * Order of application of reduce operator should not matter

* Multiplication and summation are good:

```
                1 + 3 + 5 + 2                      5 + 3 + 1 + 2 
```

 * Division and subtraction are bad:

```
                    1 - 3 - 5 - 2                      1 - 3 - 5 - 2
```


**groupByKey():**
Returns a new RDD of `(key,<iterator>)` pairs where the iterator iterates over the values associated with the key.


[Iterators](http://anandology.com/python-practice-book/iterators.html) are python objects that generate a sequence of values. Writing a loop over `n` elements as 
```python
for i in range(n):
    ##do something
```
is inefficient because it first allocates a list of `n` elements and then iterates over it.
Using the iterator `xrange(n)` achieves the same result without materializing the list. Instead, elements are generated on the fly.

To materialize the list of values returned by an iterator we will use the list comprehension command:
```python
[a for a in <iterator>]
```


In [6]:
# groupByKey return (key, <iterator>)

A = sc.parallelize([(1,3), (3,100),(1,-5),(3,2)])
A.groupByKey().mapValues(lambda x: [elem for elem in x ]) # <== x is iterator

# output
#[ (1, [3,-5]), (3, [100, 2]) ]
print(A.groupByKey().map(lambda elem: (elem[0],[x for x in elem[1] ])).collect())

rdd = sc.parallelize([(1,2), (2,4), (2,6)])
print("Original RDD :", rdd.collect())

# output
# [(1,2),(2,10)]
print("After transformation : ", rdd.reduceByKey(lambda a,b: a+b).collect())


rdd = sc.parallelize([(2,2), (1,4), (3,6)])
print("Original RDD :", rdd.collect())

# output
# [(1,4),(2,2),(3,6)]
print("After transformation : ", rdd.sortByKey().collect())

# Using sortBy
# output
# [(3,6),(1,4),(2,2)]  <== Sorting descent
print("After transformation : ", rdd.sortBy(lambda x: x[1],ascending=False).collect())


PythonRDD[14] at RDD at PythonRDD.scala:48

[(1, [3, -5]), (3, [100, 2])]
Original RDD : [(1, 2), (2, 4), (2, 6)]
After transformation :  [(1, 2), (2, 10)]
Original RDD : [(2, 2), (1, 4), (3, 6)]
After transformation :  [(1, 4), (2, 2), (3, 6)]
After transformation :  [(3, 6), (1, 4), (2, 2)]


### Operations 2 rdds  (SET operatoins)

**subtractByKey** and **subtract**.
Remove from RDD1 all elements whose key is present in RDD2


In [8]:
# LHOF Notes

rdd1 = sc.parallelize([(1,2), (2,1), (2,2)])
rdd2 = sc.parallelize([(2,5), (3,1)])

print('rdd1: ', rdd1.collect())
print('rdd2: ', rdd2.collect())
print('subtractByKey: ', rdd1.subtractByKey(rdd2).collect())

print()
# Pay attention. This is a set operation
x = sc.parallelize([("a", 1), ("b", 4), ("b", 5), ("a", 3)])
y = sc.parallelize([("a", 3), ("c", None)])

print('x: ', x.collect())
print('y: ', y.collect())
print('subtract: ', sorted(x.subtract(y).collect()))

rdd1:  [(1, 2), (2, 1), (2, 2)]
rdd2:  [(2, 5), (3, 1)]
subtractByKey:  [(1, 2)]

x:  [('a', 1), ('b', 4), ('b', 5), ('a', 3)]
y:  [('a', 3), ('c', None)]
subtract:  [('a', 1), ('b', 4), ('b', 5)]


**join**

* A fundamental operation in relational databases.
* assumes two tables have a **key** column in common. 
* merges rows with the same key.


When `Join` is called on datasets of type `(Key, V)` and `(Key, W)`, it  returns a dataset of `(Key, (V, W))` pairs with all pairs of elements for each key. Joining the 2 datasets above yields: 


There are four variants of `join` which differ in how they treat keys that appear in one dataset but not the other.
* `join` is an *inner* join which means that keys that appear only in one dataset are eliminated.
* `leftOuterJoin` keeps all keys from the left dataset even if they don't appear in the right dataset. The result of leftOuterJoin in our example will contain the keys `John, Jill, Kate`
* `rightOuterJoin` keeps all keys from the right dataset even if they don't appear in the left dataset. The result of leftOuterJoin in our example will contain the keys `Jill, Grace, John`
* `FullOuterJoin` keeps all keys from both datasets. The result of leftOuterJoin in our example will contain the keys `Jill, Grace, John, Kate`

In outer joins, if the element appears only in one dataset, the element in `(K,(V,W))` that does not appear in the dataset is represented bye `None`

In [9]:
# OuterJoin
print('rdd1=',rdd1.collect())
print('rdd2=',rdd2.collect())
print("Result:", rdd1.rightOuterJoin(rdd2).collect())

print()

# leftOuterJoin
print('rdd1=',rdd1.collect())
print('rdd2=',rdd2.collect())
print("Result:", rdd1.leftOuterJoin(rdd2).collect())


rdd1= [(1, 2), (2, 1), (2, 2)]
rdd2= [(2, 5), (3, 1)]
Result: [(2, (1, 5)), (2, (2, 5)), (3, (None, 1))]

rdd1= [(1, 2), (2, 1), (2, 2)]
rdd2= [(2, 5), (3, 1)]
Result: [(1, (2, None)), (2, (1, 5)), (2, (2, 5))]


## Actions

Actions on (key,val) RDDs. **RDD $\to$ Python-object in head node.**

In [15]:
#  countByKey: returns dictionary
A = sc.parallelize([(1,3), (3,100),(1,-5),(3,2)])

A.countByKey()

# output (dictionnary
# {1:2, 3:2}

# lookup (key): returns the list of all of the values associated with key
A = sc.parallelize([(1,3), (3,100),(1,-5),(3,2)])

len(A.lookup(3))  # <== This bring data to memory
A.lookup(3)

# output (list)
# [100,2]

#  collectAsMap(): like collect() - collect returns list of tuples -  but returns a map = Dictionary
A = sc.parallelize([(1,3), (3,100),(1,-5),(3,2)])
A.collectAsMap()

# output Dictionary
# {1:[3,-5], 3: [100,2]}

regular_rdd = sc.parallelize([1, 2, 3, 4, 2, 5, 6])

# takeSample(withReplacement, num, [seed])
print(regular_rdd.takeSample(True, 5, 11))


defaultdict(int, {1: 2, 3: 2})

2

[100, 2]

{1: -5, 3: 2}

[3, 3, 3, 3, 3]


## Famous word count example (hello word)

In [25]:
## Famous word count example (hello word)

words = ['this', 'is', 'the', 'best', 'mac', 'every', 'is', 'it', 'the', 'the']

words_rdd = sc.parallelize(words)


words_pairs_rdd = words_rdd.map(lambda w: (w,1))

count_words_rdd = words_pairs_rdd.reduceByKey(lambda c1,c2: c1 + c2)

print("words counter")
count_words_rdd.collect()

print()

print("largest word")
# Find the largest word 
words_rdd.reduce(lambda w,v: w if len(w) > len(v) else v)


words counter


[('best', 1),
 ('the', 3),
 ('every', 1),
 ('this', 1),
 ('is', 2),
 ('mac', 1),
 ('it', 1)]


largest word


'every'

## Dataframe 

Dataframe are just RDDs with shema (metinfo). Because of the restrictions imposed by the schema, they can be more optmized


sheet cheat:
https://s3.amazonaws.com/assets.datacamp.com/blog_assets/PySpark_SQL_Cheat_Sheet_Python.pdf

### Create Dataframes



In [32]:
# from pyspark.sql import Row
# #from pyspark import sqlContext


## Option 1 
sqlContext = SQLContext(sc)

rdd = sc.parallelize([Row(name=u"John",  age=19),
                      Row(name=u"Smith", age=23),
                      Row(name=u"Sarah", age=18)])

rdd.collect()
# Output:
# [Row(name=u"John",  age=19),
#  Row(name=u"Smith", age=23),
#  Row(name=u"Sarah", age=18)]

df = sqlContext.createDataFrame(rdd)
df.printSchema()


# OPtion 2
rdd2 = sc.parallelize([("John",19),("Smith",23),("Sarah",18)])


schema = StructType([StructField("person_name", StringType(),False),
                     StructField("person_age",  IntegerType(),False)
])

df2 = sqlContext.createDataFrame(rdd2,schema)
df.printSchema()


[Row(age=19, name='John'),
 Row(age=23, name='Smith'),
 Row(age=18, name='Sarah')]

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)

root
 |-- age: long (nullable = true)
 |-- name: string (nullable = true)



### Join Dataframes

In [34]:

emp = [(1,"Smith",-1,"2018","10","M",3000), \
    (2,"Rose",1,"2010","20","M",4000), \
    (3,"Williams",1,"2010","10","M",1000), \
    (4,"Jones",2,"2005","10","F",2000), \
    (5,"Brown",2,"2010","40","",-1), \
      (6,"Brown",2,"2010","50","",-1) \
  ]

emp_columns = ["emp_id","name","superior_emp_id","year_joined", \
             "emp_dept_id","gender","salary"]

emp_df = sqlContext.createDataFrame(data=emp, schema = emp_columns)
emp_df.printSchema()

emp_df.show(truncate=False)

dept = [("Finance",10), \
    ("Marketing",20), \
    ("Sales",30), \
    ("IT",40) \
  ]

dept_columns = ["dept_name","dept_id"]
dept_df = sqlContext.createDataFrame(data=dept, schema = dept_columns)
dept_df.printSchema()
dept_df.show(truncate=False)


root
 |-- emp_id: long (nullable = true)
 |-- name: string (nullable = true)
 |-- superior_emp_id: long (nullable = true)
 |-- year_joined: string (nullable = true)
 |-- emp_dept_id: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- salary: long (nullable = true)

+------+--------+---------------+-----------+-----------+------+------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|
+------+--------+---------------+-----------+-----------+------+------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |
|2     |Rose    |1              |2010       |20         |M     |4000  |
|3     |Williams|1              |2010       |10         |M     |1000  |
|4     |Jones   |2              |2005       |10         |F     |2000  |
|5     |Brown   |2              |2010       |40         |      |-1    |
|6     |Brown   |2              |2010       |50         |      |-1    |
+------+--------+---------------+-----------+-----------+------+-----

In [40]:
# inner join
emp_df.join(dept_df,emp_df.emp_dept_id == dept_df.dept_id,"inner") \
      .show(truncate=False)

# left right join
emp_df.join(dept_df,emp_df.emp_dept_id == dept_df.dept_id,"left") \
      .show(truncate=False)
    
# outter join: Outer a.k.a full, fullouter join returns all rows from both datasets
emp_df.join(dept_df,emp_df.emp_dept_id == dept_df.dept_id,"outer") \
      .show(truncate=False)

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|1     |Smith   |-1             |2018       |10         |M     |3000  |Finance  |10     |
|3     |Williams|1              |2010       |10         |M     |1000  |Finance  |10     |
|4     |Jones   |2              |2005       |10         |F     |2000  |Finance  |10     |
|2     |Rose    |1              |2010       |20         |M     |4000  |Marketing|20     |
|5     |Brown   |2              |2010       |40         |      |-1    |IT       |40     |
+------+--------+---------------+-----------+-----------+------+------+---------+-------+

+------+--------+---------------+-----------+-----------+------+------+---------+-------+
|emp_id|name    |superior_emp_id|year_joined|emp_dept_id|gender|salary|dept_name|dept_id|
+------+-

### Read and write from disk


* parquet files (it is folder) are very popular e efficeent for IO in disk
* parqet can be query directly from the disk

In [27]:
## Read
parquet_file = 'data/users.parquet'
print(parquet_file)

df = sqlContext.read.load(parquet_file)

df.show()

# Write
!rm -rv data/df.parquet

# This throw an error if file already exit
df.write.save("df.parquet")
!ls 
!echo
!ls df.parquet

data/users.parquet
+------+--------------+----------------+
|  name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa|          null|  [3, 9, 15, 20]|
|   Ben|           red|              []|
+------+--------------+----------------+

rm: cannot remove 'data/df.parquet': No such file or directory
data
df.parquet
Dockerfile
Dockerfile-spark
ds_pragamatic_MLOPS.ipynb
ds_pragamatic_mongo_db.ipynb
ds_pragmatic_computer_vision.ipynb
ds_pragmatic_numpy_vectorization.ipynb
ds_pragmatic_programming_casual.ipynb
ds_pragmatic_programming_code_best_practices.ipynb
ds_pragmatic_programming_geodata.ipynb
ds_pragmatic_programming_graph_data.ipynb
ds_pragmatic_programming_machine_learning.ipynb
ds_pragmatic_programming_NLP.ipynb
ds_pragmatic_programming_python.ipynb
ds_pragmatic_programming_python_time_series.ipynb
ds_pragmatic_programming_python_visualization.ipynb
ds_pragmatic_programming_reiforcement_learning.ipynb
ds_pragmatic_programming_scrapping.ipynb
ds_pragmati

### Query 


* parquet file
* Dataframe

In [28]:
# In this way because the parquet file was not distributed in the hadoop cluster you do not have parallelism
parquet_file = "data/Weather/NY.parquet"

query="""
SELECT station,measurement,year 
FROM parquet.`%s` 
WHERE measurement=\"SNOW\" 
"""%(parquet_file)

print(query)

df2 = sqlContext.sql(query)
print(df2.count(),df2.columns)

df2.show(5)


SELECT station,measurement,year 
FROM parquet.`data/Weather/NY.parquet` 
WHERE measurement="SNOW" 



AnalysisException: 'Path does not exist: file:/home/ucsddse230/data/Weather/NY.parquet;; line 3 pos 5'

In [None]:
path = "data/people.json"

# Create a DataFrame from the file(s) pointed to by path
people = sqlContext.read.json(path)
#print('people is a',type(people))

print(people.count())

# The inferred schema can be visualized using the printSchema() method.
people.show()


## Needs to register the dataframe first
people.registerTempTable("people")

teenagers = sqlContext.sql("SELECT name FROM people WHERE age >= 13 AND age <= 19")
for each in teenagers.collect():
    print(each[0])

### Data wrangler

#### Slicing

In [None]:
df.select("name","favorite_color").show(5)

#### Agregations

In [None]:
df.groupby('measurement').agg({'year': 'min', 'station':'count'}).show(5)

print("True value: {}".format(df.select('station').distinct().count()))
df.agg({'station':'approx_count_distinct'}).show()

### Dataframe Stat Functions

Methods for statistics functionality. [documented here](http://takwatanabe.me/pyspark/generated/generated/pyspark.sql.DataFrameStatFunctions.html)

**Some of these functions use inference to get aproximated estimations** and it is a way more faster if the dataframe or RDD are huge (like in Big Data)

* **approxQuantile(col, probabilities, relativeError)**	Calculates the approximate quantiles of a numerical column of a DataFrame.
* **corr(col1, col2[, method])**	Calculates the correlation of two columns of a DataFrame as a double value.
* **cov(col1, col2)**	Calculate the sample covariance for the given columns, specified by their names, as a double value.
* **crosstab(col1, col2)**	Computes a pair-wise frequency table of the given columns.
* **freqItems(cols[, support])**	Finding frequent items for columns, possibly with false positives.
* **sampleBy(col, fractions[, seed])**	Returns a stratified sample without replacement based on the fraction given on each stratum.


In [None]:
# This make the quantiles
print([0.1*i for i in range(1,10)])


# That is very cool. Based on inference**
print()
print("Get the qiantiles")
print('with accuracy 0.1: ',df.approxQuantile('year', [0.1*i for i in range(1,10)], 0.1))
print('with accuracy 0.01: ',df.approxQuantile('year', [0.1*i for i in range(1,10)], 0.01))

### Dataframes UDF

    **Similar to lambda function**
   
    User define functions *UDF* are functions that can be ran against a
    dataframe and spark can optimize it

    Steps:
    1. define function that operates in the value field
    2. register the function
    3. use the function



In [29]:
from pyspark.sql.functions import udf

def packArray(a):
    """
    pack a numpy array into a bytearray that can be stored as a single 
    field in a spark DataFrame

    :param a: a numpy ndarray 
    :returns: a bytearray
    :rtype:

    """
    if type(a)!=np.ndarray:
        raise Exception("input to packArray should be numpy.ndarray. It is instead "+str(type(a)))
    return bytearray(a.tobytes())

def unpackArray(x,data_type=np.float16):
    """
    unpack a bytearray into a numpy.ndarray

    :param x: a bytearray
    :param data_type: The dtype of the array. This is important because if determines how many bytes go into each entry in the array.
    :returns: a numpy array
    :rtype: a numpy ndarray of dtype data_type.

    """
    return np.frombuffer(x,dtype=data_type)

def Count_nan(V):

    A=unpackArray(V,data_type=np.float16)

    return int(sum(np.isnan(A)))

#register
Count_nan_udf = udf(Count_nan, IntegerType())


In [30]:
df = df.withColumn("na_no", Count_nan_udf(df.Values))

df.select("Station","Year","Measurement","na_no").show(5)

AttributeError: 'DataFrame' object has no attribute 'Values'