### About **Spark**

Spark is a platform for cluster computing. Spark lets you spread data and computations over clusters with multiple nodes
(think of each node as a separate computer). Splitting up your data makes it easier to work with very large datasets
because each node only works with a small amount of data.

As each node works on its own subset of the total data, it also carries out a part of the total calculations required,
so that both data processing and computation are performed in parallel over the nodes in the cluster. It is a fact
that parallel computation can make certain types of programming tasks much faster.

When to use Spark? Answering questions like the following is helpful:

- Is my data too big to work with on a single machine?
- Can my calculations be easily parallelized?

Spark's core data structure is the Resilient Distributed Dataset (RDD). RDDs are hard to work with and one usually uses
Spark DataFrame abstraction built on top of RDDs. DataFrames are also more optimized for complicated operations than
RDDs.

Important:

 - Spark only handles numeric data (integer, double)

#### Setup
To run spark locally on a windows machine, make sure to download `hadoop` [here](https://github.com/cdarlint/winutils)
and add the following to the environmental variables:
> `HADOOP_HOME=<your local hadoop folder (eg. C:\usr\bin\Hadoop\hadoop-3.2.2\bin)>


#### Session Initiation

In [39]:
import numpy as np
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import pandas as pd

# create new spark session if necessary
spark = SparkSession.builder.\
    config("spark.driver.host", "127.0.0.1").\
    config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true").\
    config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true").\
    config("spark.sql.execution.arrow.pyspark.enabled", "true").\
    config("spark.sql.shuffle.partitions", "8").\
    config("spark.driver.memory", "8g").getOrCreate()
# list tables in cluster
spark.catalog.listTables()

[]

#### Reading / Importing Data

In [8]:
# 1. PySpark dataframe from pandas
# note: follow this process to make sure pyspark runs locally: https://github.com/cdarlint/winutils
dfp = pd.read_csv("data/brics.csv", index_col=0).rename_axis("country_code").reset_index()
dfs = spark.createDataFrame(dfp)
dfs.printSchema()
dfs.show()

# 2.
dfs_ = spark.read.csv("data/brics.csv", header=True)
dfs_ = dfs_.withColumnRenamed("_c0", "country_code")
dfs_.printSchema()
dfs_.show()

root
 |-- country_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- capital: string (nullable = true)
 |-- area: double (nullable = true)
 |-- population: double (nullable = true)

+------------+------------+---------+-----+----------+
|country_code|     country|  capital| area|population|
+------------+------------+---------+-----+----------+
|          BR|      Brazil| Brasilia|8.516|     200.4|
|          RU|      Russia|   Moscow| 17.1|     143.5|
|          IN|       India|New Delhi|3.286|    1252.0|
|          CH|       China|  Beijing|9.597|    1357.0|
|          SA|South Africa| Pretoria|1.221|     52.98|
+------------+------------+---------+-----+----------+

root
 |-- country_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- capital: string (nullable = true)
 |-- area: string (nullable = true)
 |-- population: string (nullable = true)

+------------+------------+---------+-----+----------+
|country_code|     country|  capital|

#### Running SQL queries

In [7]:
# register the DF as a SQL temporary view
dfs.createOrReplaceTempView("population")
df_sql = spark.sql("SELECT * FROM population")
df_sql.show()

# convert results to pandas DF
df_sql.toPandas()

+------------+------------+---------+-----+----------+
|country_code|     country|  capital| area|population|
+------------+------------+---------+-----+----------+
|          BR|      Brazil| Brasilia|8.516|     200.4|
|          RU|      Russia|   Moscow| 17.1|     143.5|
|          IN|       India|New Delhi|3.286|    1252.0|
|          CH|       China|  Beijing|9.597|    1357.0|
|          SA|South Africa| Pretoria|1.221|     52.98|
+------------+------------+---------+-----+----------+



Unnamed: 0,country_code,country,capital,area,population
0,BR,Brazil,Brasilia,8.516,200.4
1,RU,Russia,Moscow,17.1,143.5
2,IN,India,New Delhi,3.286,1252.0
3,CH,China,Beijing,9.597,1357.0
4,SA,South Africa,Pretoria,1.221,52.98


#### Manipulating Data

In [26]:
# mutations & adding a new column
dfs_ = dfs_.withColumn("country_", F.lower(dfs_.country))
dfs_.show()
dfs_ = dfs_.withColumn("population", dfs_.population.cast("double"))


# filters
dfs_.filter("area > 5").show()

# selections
dfs_.select("country_code", "area", "population").show()
dfs_ed = dfs_.select("country_code", "population", dfs_.area.alias("area_"))
dfs_ed.show()

# aggregations
auto = spark.read.csv("data/auto-mpg.csv", header=True)
auto.show()
auto.groupBy("origin").agg(F.max("weight")).show()

# joins
auto2 = [("US", "north ameria"),
         ("Asia", "emerging markets"),
         ("Europe", "europe")]
auto2_schema = StructType([StructField("origin", StringType(), True),
                           StructField("region", StringType(), True)])
auto_df = spark.createDataFrame(data=auto2, schema=auto2_schema)
auto_df.show()
auto.join(auto_df, on="origin", how="leftouter").show()


+------------+------------+---------+-----+----------+------------+
|country_code|     country|  capital| area|population|    country_|
+------------+------------+---------+-----+----------+------------+
|          BR|      Brazil| Brasilia|8.516|     200.4|      brazil|
|          RU|      Russia|   Moscow|17.10|     143.5|      russia|
|          IN|       India|New Delhi|3.286|      1252|       india|
|          CH|       China|  Beijing|9.597|      1357|       china|
|          SA|South Africa| Pretoria|1.221|     52.98|south africa|
+------------+------------+---------+-----+----------+------------+

+------------+-------+--------+-----+----------+--------+
|country_code|country| capital| area|population|country_|
+------------+-------+--------+-----+----------+--------+
|          BR| Brazil|Brasilia|8.516|     200.4|  brazil|
|          RU| Russia|  Moscow|17.10|     143.5|  russia|
|          CH|  China| Beijing|9.597|    1357.0|   china|
+------------+-------+--------+-----+--

Py4JJavaError: An error occurred while calling o1005.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 169.0 failed 1 times, most recent failure: Lost task 0.0 in stage 169.0 (TID 223) (D-S4FK4411.investecam.corp executor driver): org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174)
	... 29 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2258)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2207)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2206)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1079)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1079)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2445)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2387)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2376)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:868)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2196)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2217)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2236)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:472)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:425)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:47)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:3696)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:2722)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:2929)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:301)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:338)
	at sun.reflect.GeneratedMethodAccessor57.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Python worker failed to connect back.
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:182)
	at org.apache.spark.api.python.PythonWorkerFactory.create(PythonWorkerFactory.scala:107)
	at org.apache.spark.SparkEnv.createPythonWorker(SparkEnv.scala:119)
	at org.apache.spark.api.python.BasePythonRunner.compute(PythonRunner.scala:145)
	at org.apache.spark.api.python.PythonRDD.compute(PythonRDD.scala:65)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:373)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:337)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:131)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:497)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1439)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:500)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: java.net.SocketTimeoutException: Accept timed out
	at java.net.DualStackPlainSocketImpl.waitForNewConnection(Native Method)
	at java.net.DualStackPlainSocketImpl.socketAccept(DualStackPlainSocketImpl.java:135)
	at java.net.AbstractPlainSocketImpl.accept(AbstractPlainSocketImpl.java:409)
	at java.net.PlainSocketImpl.accept(PlainSocketImpl.java:199)
	at java.net.ServerSocket.implAccept(ServerSocket.java:545)
	at java.net.ServerSocket.accept(ServerSocket.java:513)
	at org.apache.spark.api.python.PythonWorkerFactory.createSimpleWorker(PythonWorkerFactory.scala:174)
	... 29 more


#### Machine Learning Pipelines

Core of the ML in pyspark are the Transformer and Estimator classes:

- Transformer: Transformer classes have a .transform() method that takes a DataFrame and returns a new DataFrame

- Estimator: Estimator classes all implement a .fit() method. These methods also take a DataFrame, but instead of
returning another DataFrame they return a model object.


In [38]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
from pyspark.ml.classification import LogisticRegression
import pyspark.ml.evaluation as evals
import pyspark.ml.tuning as tune
auto.show()
# create categorical feature
or_indexer = StringIndexer(inputCol="origin", outputCol="origin_index")
or_encoder = OneHotEncoder(inputCol="origin_index", outputCol="origin_fact")
vec_assembler = VectorAssembler(inputCols=["origin_fact", "weight"], outputCol="features")

# initialise pipeline
auto_pipe = Pipeline(stages=[or_indexer, or_encoder, vec_assembler])
auto_pipe.fit(auto).transform(auto)

# test-train set
train_, test_ = auto_pipe.randomSplit([.6, .4])

# fitting a model
lr = LogisticRegression()
evaluator = evals.BinaryClassificationEvaluator(metricName="areaUnderROC")

# create the hyper-parameter grid
grid = tune.ParamGridBuilder()
grid = grid.addGrid(lr.regParam, np.arange(0, 0.1, 0.01))
grid = grid.addGrid(lr.elasticNetParam, [0, 1])
grid = grid.build()

# cross-validation
cv = tune.CrossValidator(estimator=lr,
                         estimatorParamMaps=grid,
                         evaluator=evaluator)
models = cv.fit(train_)
best_lr = models.bestModel

# evaluation
test_res = best_lr.transform(test_)
# Evaluate the predictions
print(evaluator.evaluate(test_res))


+----+---+-----+---+------+-----+---+------+--------------------+
| mpg|cyl|displ| hp|weight|accel| yr|origin|                name|
+----+---+-----+---+------+-----+---+------+--------------------+
|18.0|  8|307.0|130|  3504| 12.0| 70|    US|chevrolet chevell...|
|15.0|  8|350.0|165|  3693| 11.5| 70|    US|   buick skylark 320|
|18.0|  8|318.0|150|  3436| 11.0| 70|    US|  plymouth satellite|
|16.0|  8|304.0|150|  3433| 12.0| 70|    US|       amc rebel sst|
|17.0|  8|302.0|140|  3449| 10.5| 70|    US|         ford torino|
|15.0|  8|429.0|198|  4341| 10.0| 70|    US|    ford galaxie 500|
|14.0|  8|454.0|220|  4354|  9.0| 70|    US|    chevrolet impala|
|14.0|  8|440.0|215|  4312|  8.5| 70|    US|   plymouth fury iii|
|14.0|  8|455.0|225|  4425| 10.0| 70|    US|    pontiac catalina|
|15.0|  8|390.0|190|  3850|  8.5| 70|    US|  amc ambassador dpl|
|15.0|  8|383.0|170|  3563| 10.0| 70|    US| dodge challenger se|
|14.0|  8|340.0|160|  3609|  8.0| 70|    US|  plymouth 'cuda 340|
|15.0|  8|

IllegalArgumentException: Data type string of column weight is not supported.