https://github.com/mahmoudparsian/pyspark-tutorial

# Start

- Download Spark: http://spark.apache.org/downloads.html and unzip
- pip/conda install pyspark `conda install -c conda-forge pyspark`
- Specify environment variables (`export $SPARK_HOME=...`)

In [1]:
# import findspark
# findspark.init()

In [2]:
import pyspark
import pyarrow
from pyspark.sql import SparkSession
from pyspark import SparkContext, SparkConf

In [3]:
conf = SparkConf().setAll([('spark.executor.memory', '8g'), 
                                   ('spark.executor.cores', '3'), 
                                   ('spark.cores.max', '3'), 
                                   ('spark.driver.memory','8g')])

In [4]:
sc = pyspark.SparkContext(conf=conf)
sc.getConf().getAll()

[('spark.app.id', 'local-1529450945242'),
 ('spark.executor.id', 'driver'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.cores.max', '3'),
 ('spark.driver.port', '50573'),
 ('spark.driver.host', 'us7025751-m001.clients.us.kworld.kpmg.com'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.memory', '8g'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.master', 'local[*]'),
 ('spark.executor.memory', '8g'),
 ('spark.executor.cores', '3'),
 ('spark.submit.deployMode', 'client'),
 ('spark.ui.showConsoleProgress', 'true')]

"SparkSession in Spark 2.0 provides builtin support for Hive features including the ability to write queries using HiveQL, access to Hive UDFs, and the ability to read data from Hive tables"

In [6]:
spark = SparkSession(sc)
spark

Enable Arrow-based columnar data transfers (e.g., `toPandas()`)

In [9]:
spark.conf.set("spark.sql.execution.arrow.enabled", "true")

In [6]:
print(sc.version)

2.3.1


# Basics

## RRD and its transformations

In [7]:
data = sc.parallelize(
    [('Amber', 22), ('Alfred', 23), ('Skye',4), ('Albert', 12), ('Amber', 9)]) #schema-less, allow diff types

In [8]:
data.take(2)

[('Amber', 22), ('Alfred', 23)]

In [7]:
#map
data.map(lambda row: (row[0], row[0])).take(2)

[('Amber', 'Amber'), ('Alfred', 'Alfred')]

In [8]:
#filter
data.filter(lambda row: row[1] < 20).take(3)

[('Skye', 4), ('Albert', 12), ('Amber', 9)]

In [9]:
#flatMap
data.flatMap(lambda row: (row[1], row[1]+1)).take(12)

[22, 23, 23, 24, 4, 5, 12, 13, 9, 10]

In [10]:
#distinct
data.map(lambda row: row[0]).distinct().collect()

['Amber', 'Skye', 'Alfred', 'Albert']

In [11]:
# Join
data_2 = sc.parallelize([('Amber', 2222), ('Alfred', 23)]) 
rdd = data.leftOuterJoin(data_2)
rdd.collect()

[('Amber', (22, 2222)),
 ('Amber', (9, 2222)),
 ('Skye', (4, None)),
 ('Alfred', (23, 23)),
 ('Albert', (12, None))]

In [12]:
rdd = data.join(data_2)
rdd.collect()

[('Amber', (22, 2222)), ('Amber', (9, 2222)), ('Alfred', (23, 23))]

In [13]:
#intersection
rdd = data.intersection(data_2)
rdd.collect()

[('Alfred', 23)]

## RRD and its actions

In [14]:
#count
data.count()

5

In [15]:
#sample
data.takeSample(num=3, withReplacement=False)

[('Skye', 4), ('Amber', 9), ('Albert', 12)]

In [16]:
#reduce
data.map(lambda row: row[1]).reduce(lambda x, y: x + y)

70

In [17]:
data.map(lambda row: row[1]).reduce(lambda x, y: x / y)

0.0022141706924315623

In [18]:
#reduce by key
data.reduceByKey(lambda x, y: x + y).collect() # note the 1st one turple

[('Amber', 31), ('Skye', 4), ('Alfred', 23), ('Albert', 12)]

In [19]:
# count by key
data.countByKey().items()

dict_items([('Amber', 2), ('Alfred', 1), ('Skye', 1), ('Albert', 1)])

In [None]:
# Save and Read
data.saveAsTextFile('./temp/data_key.txt') # partitioned into 8 pieces

In [21]:
def parseInput(row):
    import re
    
    pattern = re.compile(r'\(\'([A-Za-z]*)\', ([0-9]*)\)')
    row_split = pattern.split(row)
    
    return (row_split[1], int(row_split[2]))
    
data_key_reread = sc \
    .textFile('./temp/data_key.txt') \
    .map(parseInput)
    
data_key_reread.collect()

[('Amber', 22), ('Alfred', 23), ('Skye', 4), ('Albert', 12), ('Amber', 9)]

In [22]:
# for each
def function(x):
    print(x) #print to terminal
    
data.foreach(function)

## DataFrames

In [23]:
stringJSONRDD = sc.parallelize((""" 
  { "id": "123",
    "name": "Katie",
    "age": 19,
    "eyeColor": "brown"
  }""",
   """{
    "id": "234",
    "name": "Michael",
    "age": 22,
    "eyeColor": "green"
  }""", 
  """{
    "id": "345",
    "name": "Michael",
    "age": 23,
    "eyeColor": "green"
  }""")
)

In [24]:
# Create DataFrame from JSON
swimmersJSON = spark.read.json(stringJSONRDD)

In [25]:
#create table
swimmersJSON.createOrReplaceTempView("swimmersJSON")

In [26]:
#schema - auto-inferred
swimmersJSON.printSchema()

root
 |-- age: long (nullable = true)
 |-- eyeColor: string (nullable = true)
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)



In [27]:
#show
swimmersJSON.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 19|   brown|123|  Katie|
| 22|   green|234|Michael|
| 23|   green|345|Michael|
+---+--------+---+-------+



In [28]:
# SQL query
sql_query = "select * from swimmersJSON"
spark.sql(sql_query).collect()

[Row(age=19, eyeColor='brown', id='123', name='Katie'),
 Row(age=22, eyeColor='green', id='234', name='Michael'),
 Row(age=23, eyeColor='green', id='345', name='Michael')]

In [29]:
# DataFrame Query
swimmersJSON.select(swimmersJSON.age, swimmersJSON.eyeColor) \
            .filter(swimmersJSON.age > 20) \
            .show()

+---+--------+
|age|eyeColor|
+---+--------+
| 22|   green|
| 23|   green|
+---+--------+



In [30]:
# DataFrame Query
swimmersJSON.select(['age', 'eyeColor']) \
            .where(swimmersJSON.age > 20) \
            .show()

+---+--------+
|age|eyeColor|
+---+--------+
| 22|   green|
| 23|   green|
+---+--------+



In [31]:
import pyspark.sql.functions as fn

swimmersJSON.agg(
    fn.count('id').alias('count'),
    fn.countDistinct('id').alias('distinct')
).show()

+-----+--------+
|count|distinct|
+-----+--------+
|    3|       3|
+-----+--------+



In [32]:
# drop duplicated
df = swimmersJSON.dropDuplicates(
    subset=[c for c in swimmersJSON.columns \
            if c not in ['id','age']]) # exclude id column, drop duplicates
df.show()

+---+--------+---+-------+
|age|eyeColor| id|   name|
+---+--------+---+-------+
| 22|   green|234|Michael|
| 19|   brown|123|  Katie|
+---+--------+---+-------+



In [33]:
swimmersJSON.groupBy('eyeColor').count().show()

+--------+-----+
|eyeColor|count|
+--------+-----+
|   green|    2|
|   brown|    1|
+--------+-----+



In [34]:
swimmersJSON.describe('Age').show()

+-------+------------------+
|summary|               Age|
+-------+------------------+
|  count|                 3|
|   mean|21.333333333333332|
| stddev| 2.081665999466133|
|    min|                19|
|    max|                23|
+-------+------------------+



In [35]:
swimmersJSON.agg({'Age':'mean'}).show()

+------------------+
|          avg(Age)|
+------------------+
|21.333333333333332|
+------------------+



In [36]:
# Histrogram
hists = swimmersJSON.select('Age').rdd.flatMap(lambda row: row).histogram(2)

In [37]:
hists # size of bins calculated by workers before returning to driver

([19, 21, 23], [1, 2])

## Pandas UDF (Vectorized UDF)

Ref: https://databricks.com/blog/2017/10/30/introducing-vectorized-udfs-for-pyspark.html

In [34]:
from pyspark.sql.types import *
from pyspark.sql.functions import col, count, rand, collect_list, explode, struct, count, lit
from pyspark.sql.functions import pandas_udf, PandasUDFType

create synthetic data

In [35]:
df = spark.range(0, 10 * 1000 * 1000).withColumn('id', (col('id') / 10000).cast('integer')).withColumn('v', rand())
df.cache()
df.count()

10000000

In [36]:
df.show(2)

+---+--------------------+
| id|                   v|
+---+--------------------+
|  0| 0.49844744160544097|
|  0|0.018039739783799802|
+---+--------------------+
only showing top 2 rows



define udf function

In [37]:
@pandas_udf("double", PandasUDFType.SCALAR)
def pandas_plus_one(v):
    return v + 1

%timeit -n 1 -r 1 df.withColumn('v', pandas_plus_one(df.v)).agg(count(col('v'))).show()

+--------+
|count(v)|
+--------+
|10000000|
+--------+

3.72 s ± 0 ns per loop (mean ± std. dev. of 1 run, 1 loop each)


# ML

- MLlib package: operates on `RDD`, to be deprecated
- ML: operates on `DataFrames`

In [95]:
import pyspark.sql.types as typ
import pyspark.ml.feature as ft
import pyspark.ml.classification as cl
from pyspark.ml import Pipeline
from pyspark.ml import PipelineModel
import pyspark.ml.evaluation as ev
import pyspark.ml.tuning as tune

## Import data with scehma definition

In [60]:
labels = [
('INFANT_ALIVE_AT_REPORT', typ.IntegerType()),
('BIRTH_PLACE', typ.StringType()),
('MOTHER_AGE_YEARS', typ.IntegerType()),
('FATHER_COMBINED_AGE', typ.IntegerType()),
('CIG_BEFORE', typ.IntegerType()),
('CIG_1_TRI', typ.IntegerType()),
('CIG_2_TRI', typ.IntegerType()),
('CIG_3_TRI', typ.IntegerType()),
('MOTHER_HEIGHT_IN', typ.IntegerType()),
('MOTHER_PRE_WEIGHT', typ.IntegerType()),
('MOTHER_DELIVERY_WEIGHT', typ.IntegerType()),
('MOTHER_WEIGHT_GAIN', typ.IntegerType()),
('DIABETES_PRE', typ.IntegerType()),
('DIABETES_GEST', typ.IntegerType()),
('HYP_TENS_PRE', typ.IntegerType()),
('HYP_TENS_GEST', typ.IntegerType()),
('PREV_BIRTH_PRETERM', typ.IntegerType())
    ]
schema = typ.StructType([
    typ.StructField(e[0], e[1], False) for e in labels
    ])
births = spark.read.csv('./data/births_transformed.csv',
    header=True,
    schema=schema)

convert to pandas to take a look at first rows

In [107]:
births.toPandas().head(3)

Unnamed: 0,INFANT_ALIVE_AT_REPORT,BIRTH_PLACE,MOTHER_AGE_YEARS,FATHER_COMBINED_AGE,CIG_BEFORE,CIG_1_TRI,CIG_2_TRI,CIG_3_TRI,MOTHER_HEIGHT_IN,MOTHER_PRE_WEIGHT,MOTHER_DELIVERY_WEIGHT,MOTHER_WEIGHT_GAIN,DIABETES_PRE,DIABETES_GEST,HYP_TENS_PRE,HYP_TENS_GEST,PREV_BIRTH_PRETERM,BIRTH_PLACE_INT
0,0,1,29,99,0,0,0,0,99,999,999,99,0,0,0,0,0,1
1,0,1,22,29,0,0,0,0,65,180,198,18,0,0,0,0,0,1
2,0,1,38,40,0,0,0,0,63,155,167,12,0,0,0,0,0,1


## Define `transformer`

- select transformer
- inputCol
- outputCol

change column from charatcer to numeric

In [61]:
births = births \
    .withColumn('BIRTH_PLACE_INT', births['BIRTH_PLACE'] \
    .cast(typ.IntegerType()))

In [69]:
births.take(1)[0]['BIRTH_PLACE_INT']

1

define one-hot encoder

In [70]:
encoder = ft.OneHotEncoder(
    inputCol = 'BIRTH_PLACE_INT',
    outputCol = 'BIRTH_PLACE_VEC')

create a single column with all features

In [73]:
featuresCreator = ft.VectorAssembler(
    inputCols = [col[0] for col in labels[2:]] + [encoder.getOutputCol()],
    outputCol = 'features' 
)

## Define `Estimator`

- Classification
- Regression
- Clustering

logistic regression

In [79]:
logistic = cl.LogisticRegression(
    maxIter = 10,
    regParam = 0.01,
    featuresCol = 'features',
    labelCol='INFANT_ALIVE_AT_REPORT') # default is label

## Define `Pipeline`

- End to End transformation-estimation process
- Includes **transformation** and **estimation** (optional)
- pipeline.fit = transformer.transform + estimator.fit
    - Output: PipelineModel
    - Prediction: PipelineModel.fit

Encoder --> featureCreator --> logistic

In [80]:
pipeline = Pipeline(stages=[
    encoder,
    featuresCreator,
    logistic
    ])

pipeline saving and loading

In [91]:
pipelinePath = './temp/infant_oneHotEncoder_Logistic_Pipeline'
pipeline.write().overwrite().save(pipelinePath)
loadedPipeline = Pipeline.load(pipelinePath)

## Model fitting

split train/test

In [81]:
births_train, births_test = births.randomSplit([0.7, 0.3], seed=666)

In [82]:
model = pipeline.fit(births_train)

save and load model

In [93]:
modelPath = './temp/infant_oneHotEncoder_Logistic_PipelineModel'
model.write().overwrite().save(modelPath)
loadedPipelineModel = PipelineModel.load(modelPath)

apply fitted model and get prediction

In [94]:
test_model = model.transform(births_test)
test_model.take(1)

[Row(INFANT_ALIVE_AT_REPORT=0, BIRTH_PLACE='1', MOTHER_AGE_YEARS=13, FATHER_COMBINED_AGE=99, CIG_BEFORE=0, CIG_1_TRI=0, CIG_2_TRI=0, CIG_3_TRI=0, MOTHER_HEIGHT_IN=66, MOTHER_PRE_WEIGHT=133, MOTHER_DELIVERY_WEIGHT=135, MOTHER_WEIGHT_GAIN=2, DIABETES_PRE=0, DIABETES_GEST=0, HYP_TENS_PRE=0, HYP_TENS_GEST=0, PREV_BIRTH_PRETERM=0, BIRTH_PLACE_INT=1, BIRTH_PLACE_VEC=SparseVector(9, {1: 1.0}), features=SparseVector(24, {0: 13.0, 1: 99.0, 6: 66.0, 7: 133.0, 8: 135.0, 9: 2.0, 16: 1.0}), rawPrediction=DenseVector([1.0573, -1.0573]), probability=DenseVector([0.7422, 0.2578]), prediction=0.0)]

## Model evaluation

define evaluator

In [87]:
evaluator = ev.BinaryClassificationEvaluator(
    rawPredictionCol='probability',
    labelCol='INFANT_ALIVE_AT_REPORT')

In [88]:
print(evaluator.evaluate(test_model,
    {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(test_model,
    {evaluator.metricName: 'areaUnderPR'}))

0.7401301847095617
0.7139354342365674


## Hyper-param tuning

In [96]:
logistic = cl.LogisticRegression(
    labelCol='INFANT_ALIVE_AT_REPORT')
grid = tune.ParamGridBuilder() \
    .addGrid(logistic.maxIter,[2, 10, 50]) \
    .addGrid(logistic.regParam,[0.01, 0.05, 0.3]) \
    .build()

Define cv - Three components:
- estimator
- paramMaps
- evaluator

In [99]:
cv = tune.CrossValidator(
    estimator = logistic,
    estimatorParamMaps = grid,
    evaluator = evaluator
)

Redefine pipeline to have only transformer

In [100]:
pipeline = Pipeline(stages=[encoder ,featuresCreator])
data_transformer = pipeline.fit(births_train)

apply new fitted pipeline to cv

In [101]:
data_train = data_transformer.transform(births_train)
cvModel = cv.fit(data_train)

see results

In [102]:
results = cvModel.transform(data_train)
print(evaluator.evaluate(results,
    {evaluator.metricName: 'areaUnderROC'}))
print(evaluator.evaluate(results,
    {evaluator.metricName: 'areaUnderPR'}))

0.7393228708792929
0.7102457229997121
