# PART 1: RDD Basics

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

!wget -q https://www-us.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz

!tar xf spark-2.4.8-bin-hadoop2.7.tgz

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.5-bin-hadoop2.7"

In [None]:
import findspark
findspark.init()

In [None]:
conda install pyspark

### Library Importing

In [2]:
from pyspark import SparkContext, SparkConf
from pyspark.sql import SparkSession
from pyspark.ml import Pipeline
from pyspark.sql.functions import *
from pyspark.ml.feature import *
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

### RDD Creation

In [1]:
spark_configurations = (SparkConf().setMaster("local[*]").setAppName("firstSparkSession").set("spark.executor.memory", "2g"))
spark_context = SparkContext(conf = spark_configurations)

**Retrieve SparkContext version**

In [2]:
spark_context.version

'3.0.0'

**Retrieve Python version**

In [3]:
spark_context.pythonVer

'3.7'

**Master URL to connect to**

In [4]:
spark_context.master

'local[*]'

**Retrieves application name**

In [5]:
spark_context.appName

'firstSparkSession'

**Retrieve application ID**

In [6]:
spark_context.applicationId

'local-1623274365399'

**Return default level of parallelism**

In [7]:
spark_context.defaultParallelism

12

**Return default minimun number of partitions**

In [None]:
spark_context.defaultMinPartitions

## RDD Operations

### Spark Actions
#### Collect

In [8]:
items = spark_context.parallelize ([4,13,13,28,36,47,56])

number_list = items.collect()
print ("Printing elements for collect: %s" % (number_list))

Printing elements for collect: [4, 13, 13, 28, 36, 47, 56]


#### First

In [9]:
items = spark_context.parallelize ([4,13,13,28,36,47,56])
first_element = items.first()
print ("Printing first element with first operation of RDD: %s" % (first_element))

Printing first element with first operation of RDD: 4


#### Take

In [10]:
items = spark_context.parallelize ([4,13,13,28,36,47,56])
take_element = items.take(3)
print ("Printing specified number of elements with take operation of RDD: %s" % (take_element))

Printing specified number of elements with take operation of RDD: [4, 13, 13]


#### Take Sample

In [12]:
items = spark_context.parallelize ([4,13,13,28,36,47,56])
items.takeSample(True, 5, 1)

[56, 13, 13, 13, 47]

#### Take Ordered

In [15]:
items = spark_context.parallelize ([44,131,836,147,56]).takeOrdered(6)
print(items)

[44, 56, 131, 147, 836]


#### Count

In [16]:
items = spark_context.parallelize ([4,13,13,28,36,47,56])
element_count = items.count()
print ("Printing number of instances for count operation of RDD: %i" % (element_count))

Printing number of instances for count operation of RDD: 7


#### CountByKey

In [17]:
countKey = spark_context.parallelize([("first_num", 300), ("second_num", 500), ("third_num", 900), ("second_num", 500), ])
sorted(countKey.countByKey().items())

[('first_num', 1), ('second_num', 2), ('third_num', 1)]

#### GetPartitions

In [18]:
partitioned = spark_context.parallelize ([4,13,13,28,36,47,56], 3)
partition_list = partitioned.getNumPartitions() #List the number of partitions
print ("Printing number of partitions for getNumPartitions operation of RDD: %i" % (partition_list))

Printing number of partitions for getNumPartitions operation of RDD: 3


#### Sum

In [19]:
summation_list = items.sum() 
print ("Printing sum of items for sum operation of RDD: %i" % (summation_list))

Printing sum of items for sum operation of RDD: 197


#### GroupBy

In [20]:
grouped_list = items.groupBy(lambda x : x%2).mapValues(list).collect()
print ("Printing grouped items for groupBy operation of RDD: ", (grouped_list))


Printing grouped items for groupBy operation of RDD:  [(0, [4, 28, 36, 56]), (1, [13, 13, 47])]


#### Repartition

In [21]:
repartitioned_list = items.repartition(4).collect()
print ("Printing repartitioned list for repartition operation of RDD: ", (repartitioned_list))

Printing repartitioned list for repartition operation of RDD:  [13, 28, 36, 4, 13, 56, 47]


#### SaveAsTextFile

In [None]:
saved_list= items.saveAsTextFile("items.txt")

### Spark Transformations

#### Map

In [23]:
items = spark_context.parallelize ([4,13,13,28,36,47,56])
mapped_list = items.map(lambda x: x+2).collect()
print ("Printing mapped items for map operation of RDD: ", (mapped_list))

Printing mapped items for map operation of RDD:  [6, 15, 15, 30, 38, 49, 58]


**MapPartitions**

In [24]:
partitioned = spark_context.parallelize ([4,13,13,28,36,47,56], 2)
def mapPartitionFunc(ind): yield sum(ind)
partitioned.mapPartitions(mapPartitionFunc).collect()

[30, 167]

**MapPartitionsByIndex**

In [25]:
partitioned = spark_context.parallelize ([4,13,13,28,36,47,56], 4)
def mapPartitionByIndexFunc(indSlicer, ind): yield indSlicer
partitioned.mapPartitionsWithIndex(mapPartitionByIndexFunc).sum()

6

**Filter**

In [22]:
items = spark_context.parallelize ([4,13,13,28,36,47,56])
filtered_list = items.filter(lambda x: x % 2 == 0).collect()
print ("Printing filtered list items for filter operation of RDD: ", (filtered_list))

Printing filtered list items for filter operation of RDD:  [4, 28, 36, 56]


**FlatMap**

In [26]:
items = spark_context.parallelize ([4,13,13,28,36,47,56])
items.flatMap(lambda x: range(1, x)).collect()

[1,
 2,
 3,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 1,
 2,
 3,
 4,
 5,
 6,
 7,
 8,
 9,
 10,
 11,
 12,
 13,
 14,
 15,
 16,
 17,
 18,
 19,
 20,
 21,
 22,
 23,
 24,
 25,
 26,
 27,
 28,
 29,
 30,
 31,
 32,
 33,
 34,
 35,
 36,
 37,
 38,
 39,
 40,
 41,
 42,
 43,
 44,
 45,
 46,
 47,
 48,
 49,
 50,
 51,
 52,
 53,
 54,
 55]

**Sample**

In [27]:
sampling_items = spark_context.parallelize(range(20), 4)
sampling_items.sample(True, 0.3).collect()

[3, 5, 5, 9, 10, 10, 11, 17, 19]

**Join**

In [36]:
list1 = spark_context.parallelize([("k", 98), ("m", 65)])
list2 = spark_context.parallelize([("k", 120), ("k", 43)])
sorted(list1.join(list2).collect())

[('k', (98, 43)), ('k', (98, 120))]

**Union**

In [28]:
union_items = spark_context.parallelize(range(5), 2)
union_items.union(union_items).collect()

[0, 1, 2, 3, 4, 0, 1, 2, 3, 4]

**Intersection**

In [30]:
group1 = spark_context.parallelize([2, 10, 17, 3, 14, 5])
group2 = spark_context.parallelize([2, 8, 5, 34, 42, 14])
group1.intersection(group2).collect()

[2, 5, 14]

**Distinct**

In [31]:
items = spark_context.parallelize ([4,13,13,28,36,47,56])
unique_element_list = items.distinct().collect()
print ("Printing distinct items for distinct operation of RDD: ", (unique_element_list))

Printing distinct items for distinct operation of RDD:  [36, 13, 4, 28, 56, 47]


**GroupByKey**

In [32]:
groupedKeys = spark_context.parallelize([("first_num", 300), ("second_num", 500), ("third_num", 900)])

print(sorted(groupedKeys.groupByKey().mapValues(len).collect()))
print(sorted(groupedKeys.groupByKey().mapValues(list).collect()))

[('first_num', 1), ('second_num', 1), ('third_num', 1)]
[('first_num', [300]), ('second_num', [500]), ('third_num', [900])]


**ReduceByKey**

In [33]:
from operator import sub
reducedKeys = spark_context.parallelize([("first_num", 300), ("second_num", 500), ("third_num", 900), ("second_num", 500), ])
sorted(reducedKeys.reduceByKey(sub).collect())

[('first_num', 300), ('second_num', 0), ('third_num', 900)]

**AggregateByKey**

In [34]:
item_group1 = spark_context.parallelize([('first',5),('first',3),('second',3)])
item_group2 = spark_context.parallelize(range(20))

# Aggregate RDD elements of each partition and then the results
firstGroup = (lambda x,y: (x[0]+y,x[1]+1))
aggregatedGroup = (lambda x,y:(x[0]+y[0],x[1]+y[1]))

print(item_group2.aggregate((0,0),firstGroup,aggregatedGroup))

#Aggregate values of each RDD key.collect()
print(item_group1.aggregateByKey((0,0),firstGroup,aggregatedGroup))

(190, 20)
PythonRDD[90] at RDD at PythonRDD.scala:53


**SortByKey**

In [35]:
item_list = [('first', 7), ('second', 9), ('third', 11), ('fourth', 34), ('fifth', 58)]
spark_context.parallelize(item_list).sortByKey().first()

('fifth', 58)

### RDD Persistence

In [37]:
item_list = spark_context.parallelize([('first',5),('first',3),('second',3)])

item_list.cache()

ParallelCollectionRDD[109] at readRDDFromFile at PythonRDD.scala:262

In [38]:
item_list.persist()

ParallelCollectionRDD[109] at readRDDFromFile at PythonRDD.scala:262

In [None]:
item_list.persist(pyspark.StorageLevel.MEMORY_AND_DISK )
item_list.getStorageLevel()
print(item_list.getStorageLevel())

In [None]:
item_list.persist(pyspark.StorageLevel.MEMORY_AND_DISK_2 )
item_list.getStorageLevel()
print(item_list.getStorageLevel())

# PART 2: Spark ML Model Serving

In [3]:
spark = SparkSession.builder.appName("FirstSparkApplication").config ("spark.executor.memory", "8g").getOrCreate()

In [4]:
training_dataset = spark.read.format("csv").option("inferSchema", True).option("header", "true").load('dataset/titanic_train.csv')
test_dataset = spark.read.format("csv").option("inferSchema", True).option("header", "true").load('dataset/titanic_test.csv')

training_dataset.printSchema

<bound method DataFrame.printSchema of DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]>

### Exploratory Data Analysis

In [7]:
print("Training Dataset Row Count")
training_dataset.count()

Training Dataset Row Count


891

In [8]:
print("Unique Passenger Counts")
training_dataset.agg(countDistinct("PassengerId")).show()

Unique Passenger Counts
+------------------+
|count(PassengerId)|
+------------------+
|               891|
+------------------+



In [9]:
print("Test Dataset Row Count")
test_dataset.count()

Test Dataset Row Count


418

In [10]:
training_dataset.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [11]:
training_dataset.show()

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
|          6|       0|     3|    Moran, Mr. James|  male|null|    0|    0|      

In [12]:
training_dataset.show(2, truncate=False, vertical=True)

-RECORD 0----------------------------------------------------------
 PassengerId | 1                                                   
 Survived    | 0                                                   
 Pclass      | 3                                                   
 Name        | Braund, Mr. Owen Harris                             
 Sex         | male                                                
 Age         | 22.0                                                
 SibSp       | 1                                                   
 Parch       | 0                                                   
 Ticket      | A/5 21171                                           
 Fare        | 7.25                                                
 Cabin       | null                                                
 Embarked    | S                                                   
-RECORD 1----------------------------------------------------------
 PassengerId | 2                                

In [13]:
training_dataset.describe().show(3,vertical=True)

-RECORD 0--------------------------
 summary     | count               
 PassengerId | 891                 
 Survived    | 891                 
 Pclass      | 891                 
 Name        | 891                 
 Sex         | 891                 
 Age         | 714                 
 SibSp       | 891                 
 Parch       | 891                 
 Ticket      | 891                 
 Fare        | 891                 
 Cabin       | 204                 
 Embarked    | 889                 
-RECORD 1--------------------------
 summary     | mean                
 PassengerId | 446.0               
 Survived    | 0.3838383838383838  
 Pclass      | 2.308641975308642   
 Name        | null                
 Sex         | null                
 Age         | 29.69911764705882   
 SibSp       | 0.5230078563411896  
 Parch       | 0.38159371492704824 
 Ticket      | 260318.54916792738  
 Fare        | 32.2042079685746    
 Cabin       | null                
 Embarked    | null         

In [14]:
# Counting the number of null values
from pyspark.sql.functions import *

print ("NaN values\n")
training_dataset.select([count(when(isnan(item), item)).alias(item) for item in training_dataset.columns]).show(5)

print ("Null values\n")
training_dataset.select([count(when(col(item).isNull(), item)).alias(item) for item in training_dataset.columns]).show(5)

print ("Not Null values\n")
training_dataset.select([count(when(col(item).isNotNull(), item)).alias(item) for item in training_dataset.columns]).show(5)

NaN values

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|  0|    0|    0|     0|   0|    0|       0|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+

Null values

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+

Not Null values

+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|

In [5]:
print("Renaming Column Name")
training_dataset = training_dataset.withColumnRenamed("Pclass","PassengerClasses").withColumnRenamed("Sex","Gender")
training_dataset

Renaming Column Name


DataFrame[PassengerId: int, Survived: int, PassengerClasses: int, Name: string, Gender: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

In [16]:
print("Counting the number of Passenger per Classes")
training_dataset.groupBy("PassengerClasses").count().sort("PassengerClasses").show()


print("Counting the number of Survivals by Classes")
training_dataset.groupBy("PassengerClasses",
                         "Gender",
                         "Survived").count().sort("PassengerClasses",
                                                  "Gender",
                                                  "Survived").show()

Counting the number of Passenger per Classes
+----------------+-----+
|PassengerClasses|count|
+----------------+-----+
|               1|  216|
|               2|  184|
|               3|  491|
+----------------+-----+

Counting the number of Survivals by Classes
+----------------+------+--------+-----+
|PassengerClasses|Gender|Survived|count|
+----------------+------+--------+-----+
|               1|female|       0|    3|
|               1|female|       1|   91|
|               1|  male|       0|   77|
|               1|  male|       1|   45|
|               2|female|       0|    6|
|               2|female|       1|   70|
|               2|  male|       0|   91|
|               2|  male|       1|   17|
|               3|female|       0|   72|
|               3|female|       1|   72|
|               3|  male|       0|  300|
|               3|  male|       1|   47|
+----------------+------+--------+-----+



In [17]:
print("Counting the number of Survivals by Classes")
training_dataset.groupBy("PassengerClasses",
                         "Gender",
                         "Survived").count().sort("PassengerClasses",
                                                  "Gender",
                                                  "Survived").show()

Counting the number of Survivals by Classes
+----------------+------+--------+-----+
|PassengerClasses|Gender|Survived|count|
+----------------+------+--------+-----+
|               1|female|       0|    3|
|               1|female|       1|   91|
|               1|  male|       0|   77|
|               1|  male|       1|   45|
|               2|female|       0|    6|
|               2|female|       1|   70|
|               2|  male|       0|   91|
|               2|  male|       1|   17|
|               3|female|       0|   72|
|               3|female|       1|   72|
|               3|  male|       0|  300|
|               3|  male|       1|   47|
+----------------+------+--------+-----+



## Feature Engineering

In [6]:
training_dataset = training_dataset.withColumn("Title", regexp_extract(col("Name"),"([A-Za-z]+)\.", 1))
training_dataset.select("Name","Title").show(10) 

+--------------------+------+
|                Name| Title|
+--------------------+------+
|Braund, Mr. Owen ...|    Mr|
|Cumings, Mrs. Joh...|   Mrs|
|Heikkinen, Miss. ...|  Miss|
|Futrelle, Mrs. Ja...|   Mrs|
|Allen, Mr. Willia...|    Mr|
|    Moran, Mr. James|    Mr|
|McCarthy, Mr. Tim...|    Mr|
|Palsson, Master. ...|Master|
|Johnson, Mrs. Osc...|   Mrs|
|Nasser, Mrs. Nich...|   Mrs|
+--------------------+------+
only showing top 10 rows



In [34]:
training_dataset.groupBy("Title").count().show()

+--------+-----+
|   Title|count|
+--------+-----+
|     Don|    1|
|    Miss|  182|
|Countess|    1|
|     Col|    2|
|     Rev|    6|
|    Lady|    1|
|  Master|   40|
|     Mme|    1|
|    Capt|    1|
|      Mr|  517|
|      Dr|    7|
|     Mrs|  125|
|     Sir|    1|
|Jonkheer|    1|
|    Mlle|    2|
|   Major|    2|
|      Ms|    1|
+--------+-----+



In [7]:
feature_df = training_dataset.\
replace(["Mme", 
         "Mlle","Ms",
         "Major","Dr", "Capt","Col","Rev",
         "Lady","Dona", "the Countess","Countess", "Don", "Sir", "Jonkheer","Master"],
        ["Mrs", 
         "Miss", "Miss",
         "Ranked","Ranked","Ranked","Ranked","Ranked",
         "Royalty","Royalty","Royalty","Royalty","Royalty","Royalty","Royalty","Royalty"])

feature_df.groupBy("Title").count().sort(desc("count")).show()

+-------+-----+
|  Title|count|
+-------+-----+
|     Mr|  517|
|   Miss|  185|
|    Mrs|  126|
|Royalty|   45|
| Ranked|   18|
+-------+-----+



In [37]:
feature_df.dtypes

[('PassengerId', 'int'),
 ('Survived', 'int'),
 ('PassengerClasses', 'int'),
 ('Name', 'string'),
 ('Gender', 'string'),
 ('Age', 'double'),
 ('SibSp', 'int'),
 ('Parch', 'int'),
 ('Ticket', 'string'),
 ('Fare', 'double'),
 ('Cabin', 'string'),
 ('Embarked', 'string'),
 ('Title', 'string')]

In [8]:
df = feature_df.select(
    "Survived",
    "PassengerClasses",
    "SibSp",
    "Parch")

df = df.dropna()
df = df.fillna(0)
df.dtypes

[('Survived', 'int'),
 ('PassengerClasses', 'int'),
 ('SibSp', 'int'),
 ('Parch', 'int')]

### String Indexing

In [9]:
from pyspark.ml.feature import StringIndexer
parchIndexer = StringIndexer(inputCol="Parch", outputCol="Parch_Ind").fit(df)
sibspIndexer = StringIndexer(inputCol="SibSp", outputCol="SibSp_Ind").fit(df)
passangerIndexer = StringIndexer(inputCol="PassengerClasses", outputCol="PassengerClasses_Ind").fit(df)
survivedIndexer = StringIndexer(inputCol="Survived", outputCol="Survived_Ind").fit(df)

### Vector Assembler

In [10]:
from pyspark.ml.feature import VectorAssembler
assembler = VectorAssembler(
  inputCols = ["PassengerClasses","SibSp","Parch"],
  outputCol = "features")

### Define Classifier

In [11]:
from pyspark.ml.classification import DecisionTreeClassifier
classifier = DecisionTreeClassifier(featuresCol="features", labelCol="Survived")
classifier

DecisionTreeClassifier_c34a79a4a517

### Create Pipeline

In [12]:
from pyspark.ml import Pipeline
pipeline = Pipeline(stages=[assembler, classifier, parchIndexer, sibspIndexer, passangerIndexer, survivedIndexer])
pipeline

Pipeline_a32dcc338182

### Prepare training with ParamGridBuilder

In [13]:
from pyspark.ml.tuning import ParamGridBuilder
from pyspark.ml.tuning import TrainValidationSplit
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

paramGrid = ParamGridBuilder() \
  .addGrid(classifier.maxDepth, [5, 10, 15, 20]) \
  .addGrid(classifier.maxBins, [25, 30]) \
  .build()
paramGrid

[{Param(parent='DecisionTreeClassifier_c34a79a4a517', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5,
  Param(parent='DecisionTreeClassifier_c34a79a4a517', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 25},
 {Param(parent='DecisionTreeClassifier_c34a79a4a517', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5,
  Param(parent='DecisionTreeClassifier_c34a79a4a517', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 30},
 {Param(parent='DecisionTreeClassifier_c34a79a4a517', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.')

In [14]:
tvs = TrainValidationSplit(
  estimator=pipeline,
  estimatorParamMaps=paramGrid,
  evaluator=MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="weightedPrecision"),
  trainRatio=0.8)

tvs

TrainValidationSplit_e439011fad8f

### Model Fitting

In [15]:
(train, test) = df.randomSplit([0.8, 0.2], seed = 345)

### Model Evaluation

#### Print accuracy results by each parameter

In [58]:
list(zip(model.validationMetrics, model.getEstimatorParamMaps()))

[(0.7014687100893997,
  {Param(parent='DecisionTreeClassifier_d5f6b1ac3be4', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5,
   Param(parent='DecisionTreeClassifier_d5f6b1ac3be4', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 251}),
 (0.7014687100893997,
  {Param(parent='DecisionTreeClassifier_d5f6b1ac3be4', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g., depth 0 means 1 leaf node; depth 1 means 1 internal node + 2 leaf nodes.'): 5,
   Param(parent='DecisionTreeClassifier_d5f6b1ac3be4', name='maxBins', doc='Max number of bins for discretizing continuous features.  Must be >=2 and >= number of categories for any categorical feature.'): 300}),
 (0.6830897733770511,
  {Param(parent='DecisionTreeClassifier_d5f6b1ac3be4', name='maxDepth', doc='Maximum depth of the tree. (>= 0) E.g.,

### Model Serving with MLFlow

In [63]:
import mlflow
from mlflow import spark
with mlflow.start_run(): 
    model = tvs.fit(train) 
    mlflow.spark.log_model(model, "sparkML-model")

In [27]:
import mlflow.pyfunc
from pyspark.sql import SQLContext

train.toPandas().to_csv('dataset.csv')

model_path = '/Users/ersoyp/Documents/LAYER/ServingModelsWithApacheSpark/Scripts/mlruns/1/51ef199ab3b945e8a31b47cdfbf60912/artifacts/sparkML-model'
titanic_path = '/Users/ersoyp/Documents/LAYER/ServingModelsWithApacheSpark/Scripts/dataset.csv'
titanic_udf = mlflow.pyfunc.spark_udf(spark, model_path)

df = spark.read.format("csv").option("inferSchema", True).option("header", "true").option('delimiter', ';').load(titanic_path)

columns = ['PassengerClasses', 'SibSp', 'Parch']
          
df.withColumn('Inferences', titanic_udf(*columns)).show(False)

+----------------+-----+-----+----------+
|PassengerClasses|SibSp|Parch|Inferences|
+----------------+-----+-----+----------+
|1               |0    |0    |0.39849624|
|1               |0    |0    |0.39849624|
|1               |0    |0    |0.39849624|
|1               |0    |0    |0.39849624|
|1               |0    |0    |0.39849624|
|1               |0    |0    |0.39849624|
|1               |0    |0    |0.39849624|
|1               |0    |0    |0.39849624|
|1               |0    |0    |0.39849624|
|1               |1    |0    |0.39849624|
|1               |1    |0    |0.39849624|
|1               |1    |2    |0.21428572|
|2               |0    |0    |0.6545454 |
|2               |0    |0    |0.6545454 |
|2               |0    |0    |0.6545454 |
|2               |0    |0    |0.6545454 |
|2               |0    |0    |0.6545454 |
|2               |0    |0    |0.6545454 |
|2               |0    |0    |0.6545454 |
|2               |1    |0    |0.6545454 |
+----------------+-----+-----+----

In [72]:
mlflow.end_run()