# PySpark 

Apache Spark is a system that provides a cluster-based distributed computing environment with the help of its packages, including:
*   SQL querying
*   streaming data processing
*   machine learning

## Spark Session

In [1]:
from pyspark.sql import SparkSession
from pyspark.context import SparkContext
from pyspark.sql.functions import *
from pyspark.ml.feature import StringIndexer

In [2]:
# spark = SparkSession.builder.getOrCreate()
spark = SparkSession.builder.appName("FirstSparkApplication").config ("spark.executor.memory", "8g").getOrCreate()

2022-05-28 17:52:09 WARN  Utils:66 - Your hostname, siul resolves to a loopback address: 127.0.1.1; using 172.25.140.151 instead (on interface eth0)
2022-05-28 17:52:09 WARN  Utils:66 - Set SPARK_LOCAL_IP if you need to bind to another address
2022-05-28 17:52:10 WARN  NativeCodeLoader:62 - Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


# Exploratory Data Analysis

The “Titanic” dataset will be used and may be downloaded from Kaggle website [here](https://www.kaggle.com/c/titanic/data).

In [3]:
training_dataset = spark.read.format("csv").option("inferSchema", True).option("header", "true").load('data/train.csv')
test_dataset = spark.read.format("csv").option("inferSchema", True).option("header", "true").load('data/test.csv')

training_dataset.printSchema

<bound method DataFrame.printSchema of DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]>

In [4]:
print("Renaming Column Name")
training_dataset = training_dataset.withColumnRenamed("Pclass","PassengerClasses").withColumnRenamed("Sex","Gender")
training_dataset

Renaming Column Name


DataFrame[PassengerId: int, Survived: int, PassengerClasses: int, Name: string, Gender: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

# Feature Engineering

The 'Name' column in the titanic dataset also includes the person’s title. This information might be beneficial in the model. So let’s generate it as a new variable. A new title column can be created using the 'withColumn' operation.

In [5]:
training_dataset = training_dataset.withColumn("Title", regexp_extract(col("Name"),"([A-Za-z]+)\.", 1))
training_dataset.select("Name","Title").show(10) 

+--------------------+------+
|                Name| Title|
+--------------------+------+
|Braund, Mr. Owen ...|    Mr|
|Cumings, Mrs. Joh...|   Mrs|
|Heikkinen, Miss. ...|  Miss|
|Futrelle, Mrs. Ja...|   Mrs|
|Allen, Mr. Willia...|    Mr|
|    Moran, Mr. James|    Mr|
|McCarthy, Mr. Tim...|    Mr|
|Palsson, Master. ...|Master|
|Johnson, Mrs. Osc...|   Mrs|
|Nasser, Mrs. Nich...|   Mrs|
+--------------------+------+
only showing top 10 rows



The `Name` column in the titanic dataset also includes the person’s title. This information might be beneficial in the model. So let’s generate it as a new variable. A new title column can be created using the `withColumn` operation.

In [7]:
feature_df = training_dataset.\
replace(["Mme", 
         "Mlle","Ms",
         "Major","Dr", "Capt","Col","Rev",
         "Lady","Dona", "the Countess","Countess", "Don", "Sir", "Jonkheer","Master"],
        ["Mrs", 
         "Miss", "Miss",
         "Ranked","Ranked","Ranked","Ranked","Ranked",
         "Royalty","Royalty","Royalty","Royalty","Royalty","Royalty","Royalty","Royalty"])

feature_df.groupBy("Title").count().sort(desc("count")).show()

+-------+-----+
|  Title|count|
+-------+-----+
|     Mr|  517|
|   Miss|  185|
|    Mrs|  126|
|Royalty|   45|
| Ranked|   18|
+-------+-----+



Some duplicated or misspelled writer names may exist. You may replace them by using the function 'replace' as the following.

In [8]:
feature_df.dtypes

[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('PassengerClasses', 'int'),
 ('Name', 'string'),
 ('Gender', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string'),
 ('Title', 'string')]

In [9]:
df = feature_df.select(
    "Survived",
    "PassengerClasses",
    "SibSp",
    "Parch")

df = df.dropna()
df = df.fillna(0)
df.dtypes

[('Survived', 'int'),
 ('PassengerClasses', 'int'),
 ('SibSp', 'int'),
 ('Parch', 'int')]

## String Indexing

Before starting the model implementation stage, the formats of all features should be inspected. Since the prediction method requires numerical variables, string-formatted columns shall be all converted into corresponding numerical types in the final modeling dataset.

In [10]:
from pyspark.ml.feature import StringIndexer
parchIndexer = StringIndexer(inputCol="Parch", outputCol="Parch_Ind").fit(df)
sibspIndexer = StringIndexer(inputCol="SibSp", outputCol="SibSp_Ind").fit(df)
passangerIndexer = StringIndexer(inputCol="PassengerClasses", outputCol="PassengerClasses_Ind").fit(df)
survivedIndexer = StringIndexer(inputCol="Survived", outputCol="Survived_Ind").fit(df)

## Vector Assembler

After the indexing and dropping of old string-formatted operations, the DataFrame has all numerical variables. Since all the columns have a non-string format, we can generate a feature vector using the columns in the DataFrame. The 'VectorAssembler' can be applied to transform the 'features' vector column.

In [11]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
  inputCols = ["PassengerClasses","SibSp","Parch"],
  outputCol = "features")

## Split Train/Test

The 'randomSplit' method can be used to divide the data into train and test sets. 

In [12]:
(train, test) = df.randomSplit([0.8, 0.2], seed = 345)

# Modeling with Spark MLlib

## Define Classifier

In [13]:
from pyspark.ml.classification import DecisionTreeClassifier
classifier = DecisionTreeClassifier(featuresCol="features", labelCol="Survived")
classifier

DecisionTreeClassifier_f25ebaa8ff9e

## Create Pipeline

In [14]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler, classifier, parchIndexer, sibspIndexer, passangerIndexer, survivedIndexer])
pipeline

Pipeline_afbcbf0614e0


## Prepare training with ParamGridBuilder

When the pipeline is created, the parameters of the classifier can be optimized with the help of 'ParamGridBuilder'. Corresponding parameters will be created after the grid search.

In [15]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

paramGrid = ParamGridBuilder() \
  .addGrid(classifier.maxDepth, [5, 10, 15, 20]) \
  .addGrid(classifier.maxBins, [25, 30]) \
  .build()
paramGrid

[{Param(parent='DecisionTreeClassifier_f25ebaa8ff9e', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5,
  Param(parent='DecisionTreeClassifier_f25ebaa8ff9e', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 25},
 {Param(parent='DecisionTreeClassifier_f25ebaa8ff9e', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5,
  Param(parent='DecisionTreeClassifier_f25ebaa8ff9e', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 30},
 {Param(parent='DecisionTreeClassifier_f25ebaa8ff9e', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.')

With this respect, the 'label', 'features', and 'metric' columns can be applied.

In [16]:
tvs = TrainValidationSplit(
  estimator=pipeline,
  estimatorParamMaps=paramGrid,
  evaluator=MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="weightedPrecision"),
  trainRatio=0.8)

tvs

TrainValidationSplit_43054fe595d4


## Model Fitting

When the 'TrainValidationSplit' phase is finalized, the model can be fitted.

In [17]:
model_generated = tvs.fit(train)

## Model Evaluation

Print accuracy results by each parameter

In [18]:
list(zip(model_generated.validationMetrics, model_generated.getEstimatorParamMaps()))

[(0.735344132067275,
  {Param(parent='DecisionTreeClassifier_f25ebaa8ff9e', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5,
   Param(parent='DecisionTreeClassifier_f25ebaa8ff9e', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 25}),
 (0.735344132067275,
  {Param(parent='DecisionTreeClassifier_f25ebaa8ff9e', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5,
   Param(parent='DecisionTreeClassifier_f25ebaa8ff9e', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 30}),
 (0.7359553281797201,
  {Param(parent='DecisionTreeClassifier_f25ebaa8ff9e', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., dep

# Model Serving with MLFlow

Machine learning models generated using PySpark can be distributed with the help of the software package MLFlow. 

## Execution of MLFlow

You may run the 'start_run()' function after importing MLflow to activate MLflow in a Spark session.

In [22]:
# import mlflow

from mlflow import spark
with mlflow.start_run(): 
    model = tvs.fit(train) 
    mlflow.spark.log_model(model_generated, "sparkML-model")

ModuleNotFoundError: No module named 'mlflow'

The corresponding model inferences can be occupied by using the `mlflow.pyfunc` function. For this purpose, it is crucial to assign the model and dataset paths separately. Then, a Spark UDF can be generated by using the model path. The next step is to read and register them into a dataframe. For the final phase, a new feature is created with the help of the formerly defined Spark UDF.

In [None]:
import mlflow.pyfunc
from pyspark.sql import SQLContext

train.toPandas().to_csv('dataset.csv')

model_path = '/Users/ersoyp/Documents/LAYER/ServingModelsWithApacheSpark/Scripts/mlruns/1/51ef199ab3b945e8a31b47cdfbf60912/artifacts/sparkML-model'
titanic_path = '/Users/ersoyp/Documents/LAYER/ServingModelsWithApacheSpark/Scripts/dataset.csv'
titanic_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("inferSchema", True).option("header", "true").option('delimiter', ';').load(titanic_path)

columns = ['PassengerClasses', 'SibSp', 'Parch']
          
df.withColumn('Inferences', titanic_udf(*columns)).show(False)

In [None]:
mlflow.end_run()