<div style="line-height:0.4">
<h1 style="color:#0FCBC6"> PySpark 3: Functions, Preprocessing, and Classifications </h1>
<span style="display: inline-block;">
    <h3 style="color: lightblue; display: inline;">Keywords:</h3> keyword_only + pandas inferSchema + VectorAssembler + mlflow + avgMetrics
</span>
</div>

<h3 style="color:#0FCBC6"> Recap: </h3>
<div style="margin-top: -8px;">
The findspark library is used to searche for the location of the Spark installation and sets the necessary environment variables to ensure <br> 
that the Python interpreter can find and use the Spark libraries and executables.

SPARK_HOME environment variable, which should point to the root directory where Spark is installed.

+ findspark.init() simplifies the process of setting up the local development environment for working with PySpark. <br>
It allows to import PySpark and work with Spark functionality directly 

</div>

In [1]:
#import findspark
#findspark.init()

In [1]:
import os

from sklearn.metrics import accuracy_score

from pyspark.sql import SparkSession
from pyspark import SparkFiles
from pyspark.ml.classification import LogisticRegression, GBTClassifier, RandomForestClassifier
from pyspark.ml.evaluation import BinaryClassificationEvaluator, MulticlassClassificationEvaluator
from pyspark.ml.tuning import CrossValidator, ParamGridBuilder
from pyspark.ml.feature import VectorAssembler, StringIndexer, QuantileDiscretizer
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.pipeline import Transformer,Estimator
from pyspark.ml.util import DefaultParamsReadable, DefaultParamsWritable

from pyspark.sql.functions import isnan, col, when, rand

from pyspark.ml.param.shared import Param, Params, HasOutputCols
from pyspark.sql.functions import count, lit, regexp_extract
from pyspark import keyword_only
from pyspark.sql.functions import monotonically_increasing_id

import mlflow
from mlflow import spark

<h3 style="color:#0FCBC6"> Recap: Possible Warnings </h3>
<div style="margin-top: -8px;">

- WARN Utils: Your hostname, hpmint resolves to a loopback address: 127.0.1.1; using 192.168.1.81 instead (on interface eno1)
- WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
For SparkR, use setLogLevel(newLevel).
- WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
- WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
- WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.

<h2 style="color:#0FCBC6"><u>Example 1 Logistic Regression</u></h2>

In [3]:
# Create Session
spark = SparkSession.builder \
    .appName("LogisticRegression with PySpark") \
    .getOrCreate()

<h3 style="color:#0FCBC6"> => Load and prepare data </h3>

In [12]:
# Get the current working directory
current_dir = os.getcwd()

# Construct the full path to the CSV file
csv_file_path = os.path.join(current_dir, "datasets_for_pyspark/social_network_ads.csv")

# Read the CSV file using the full path
df = spark.read.csv(csv_file_path, header=True, inferSchema=True)
df.show(3)

+--------+------+---+---------------+---------+
| User ID|Gender|Age|EstimatedSalary|Purchased|
+--------+------+---+---------------+---------+
|15624510|  Male| 19|          19000|        0|
|15810944|  Male| 35|          20000|        0|
|15668575|Female| 26|          43000|        0|
+--------+------+---+---------------+---------+
only showing top 3 rows



In [21]:
# Rename the columns to make easier to create the VectorAssembler with a unique feature
columns = [f'feature_{i}' for i in range(1, 6)]
columns

['feature_1', 'feature_2', 'feature_3', 'feature_4', 'feature_5']

In [22]:
dataset_ok = df.toDF(*columns)
dataset_ok.show(3)

+---------+---------+---------+---------+---------+
|feature_1|feature_2|feature_3|feature_4|feature_5|
+---------+---------+---------+---------+---------+
| 15624510|     Male|       19|    19000|        0|
| 15810944|     Male|       35|    20000|        0|
| 15668575|   Female|       26|    43000|        0|
+---------+---------+---------+---------+---------+
only showing top 3 rows



In [39]:
# Rename a column
old_column_name = "feature_5"
new_column_name = "target"
dataset_ok = dataset_ok.withColumnRenamed(old_column_name, new_column_name)
dataset_ok.show(3)

+---------+---------+---------+---------+------+
|feature_1|feature_2|feature_3|feature_4|target|
+---------+---------+---------+---------+------+
| 15624510|     Male|       19|    19000|     0|
| 15810944|     Male|       35|    20000|     0|
| 15668575|   Female|       26|    43000|     0|
+---------+---------+---------+---------+------+
only showing top 3 rows



In [28]:
dataset_columns = dataset_ok.columns
dataset_columns

['feature_1', 'feature_2', 'feature_3', 'feature_4', 'target']

In [33]:
# Remove the last column
column_to_remove = dataset_columns[-1]
df_removed = dataset_ok.drop(column_to_remove)

In [34]:
df_removed.show()

+---------+---------+---------+---------+
|feature_1|feature_2|feature_3|feature_4|
+---------+---------+---------+---------+
| 15624510|     Male|       19|    19000|
| 15810944|     Male|       35|    20000|
| 15668575|   Female|       26|    43000|
| 15603246|   Female|       27|    57000|
| 15804002|     Male|       19|    76000|
| 15728773|     Male|       27|    58000|
| 15598044|   Female|       27|    84000|
| 15694829|   Female|       32|   150000|
| 15600575|     Male|       25|    33000|
| 15727311|   Female|       35|    65000|
| 15570769|   Female|       26|    80000|
| 15606274|   Female|       26|    52000|
| 15746139|     Male|       20|    86000|
| 15704987|     Male|       32|    18000|
| 15628972|     Male|       18|    82000|
| 15697686|     Male|       29|    80000|
| 15733883|     Male|       47|    25000|
| 15617482|     Male|       45|    26000|
| 15704583|     Male|       46|    28000|
| 15621083|   Female|       48|    29000|
+---------+---------+---------+---

In [35]:
dd_columns = dataset_columns[:-1]
dd_columns

['feature_1', 'feature_2', 'feature_3', 'feature_4']

<div style="line-height:0.5">
<h3 style="color:#0FCBC6"> => Transform data </h3>
Encoding categorical data is neceeray to create the VectorAssembler 
</div>

In [48]:
# Create a StringIndexer for a single column
string_indexer = StringIndexer(inputCol="feature_2", outputCol="feature2_encoded")
string_indexer

StringIndexer_4eb872988d3f

In [49]:
# Fit and transform the DataFrame
dataframe_ok = string_indexer.fit(dataset_ok).transform(df_removed)
dataframe_ok.show()

+---------+---------+---------+---------+----------------+
|feature_1|feature_2|feature_3|feature_4|feature2_encoded|
+---------+---------+---------+---------+----------------+
| 15624510|     Male|       19|    19000|             1.0|
| 15810944|     Male|       35|    20000|             1.0|
| 15668575|   Female|       26|    43000|             0.0|
| 15603246|   Female|       27|    57000|             0.0|
| 15804002|     Male|       19|    76000|             1.0|
| 15728773|     Male|       27|    58000|             1.0|
| 15598044|   Female|       27|    84000|             0.0|
| 15694829|   Female|       32|   150000|             0.0|
| 15600575|     Male|       25|    33000|             1.0|
| 15727311|   Female|       35|    65000|             0.0|
| 15570769|   Female|       26|    80000|             0.0|
| 15606274|   Female|       26|    52000|             0.0|
| 15746139|     Male|       20|    86000|             1.0|
| 15704987|     Male|       32|    18000|             1.

In [50]:
# Replace the feature2 with 
dataframe_tmp  = dataframe_ok.withColumn("feature_2", col("feature2_encoded")).drop("feature2_encoded")
dataframe_tmp.show()

+---------+---------+---------+---------+
|feature_1|feature_2|feature_3|feature_4|
+---------+---------+---------+---------+
| 15624510|      1.0|       19|    19000|
| 15810944|      1.0|       35|    20000|
| 15668575|      0.0|       26|    43000|
| 15603246|      0.0|       27|    57000|
| 15804002|      1.0|       19|    76000|
| 15728773|      1.0|       27|    58000|
| 15598044|      0.0|       27|    84000|
| 15694829|      0.0|       32|   150000|
| 15600575|      1.0|       25|    33000|
| 15727311|      0.0|       35|    65000|
| 15570769|      0.0|       26|    80000|
| 15606274|      0.0|       26|    52000|
| 15746139|      1.0|       20|    86000|
| 15704987|      1.0|       32|    18000|
| 15628972|      1.0|       18|    82000|
| 15697686|      1.0|       29|    80000|
| 15733883|      1.0|       47|    25000|
| 15617482|      1.0|       45|    26000|
| 15704583|      1.0|       46|    28000|
| 15621083|      0.0|       48|    29000|
+---------+---------+---------+---

In [52]:
assembler = VectorAssembler(inputCols=dd_columns, outputCol="assembled_features")
dataframe = assembler.transform(dataframe_tmp)

In [67]:
df = dataframe.select("assembled_features")
df.show(3)

+--------------------+
|  assembled_features|
+--------------------+
|[1.562451E7,1.0,1...|
|[1.5810944E7,1.0,...|
|[1.5668575E7,0.0,...|
+--------------------+
only showing top 3 rows



In [62]:
# Select the "target" column from df2 and alias it as "target_column"
target_column = dataset_ok.select("target").alias("target_column")
target_column.show()

+------+
|target|
+------+
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     1|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     0|
|     1|
|     1|
|     1|
|     1|
+------+
only showing top 20 rows



In [72]:
""" Add the "target_column" to df1 using withColumn """
# Add a unique ID to both DataFrames
df1_with_id = df.withColumn("id", monotonically_increasing_id())
df2_with_id = dataset_ok.withColumn("id", monotonically_increasing_id())

print(df1_with_id.show(4))
print(df2_with_id.show(4))

# Join the DataFrames based on the common ID column
joined_df = df1_with_id.join(df2_with_id, on="id", how="inner").drop("id")

# Select the desired columns
result_df = joined_df.select("assembled_features", "target")

result_df.show()

+--------------------+---+
|  assembled_features| id|
+--------------------+---+
|[1.562451E7,1.0,1...|  0|
|[1.5810944E7,1.0,...|  1|
|[1.5668575E7,0.0,...|  2|
|[1.5603246E7,0.0,...|  3|
+--------------------+---+
only showing top 4 rows

None
+---------+---------+---------+---------+------+---+
|feature_1|feature_2|feature_3|feature_4|target| id|
+---------+---------+---------+---------+------+---+
| 15624510|     Male|       19|    19000|     0|  0|
| 15810944|     Male|       35|    20000|     0|  1|
| 15668575|   Female|       26|    43000|     0|  2|
| 15603246|   Female|       27|    57000|     0|  3|
+---------+---------+---------+---------+------+---+
only showing top 4 rows

None
+--------------------+------+
|  assembled_features|target|
+--------------------+------+
|[1.562451E7,1.0,1...|     0|
|[1.5810944E7,1.0,...|     0|
|[1.5668575E7,0.0,...|     0|
|[1.5603246E7,0.0,...|     0|
|[1.5804002E7,1.0,...|     0|
|[1.5728773E7,1.0,...|     0|
|[1.5598044E7,0.0,...|     0|


<h3 style="color:#0FCBC6"> => Train data </h3>

In [73]:
train_data, test_data = result_df.randomSplit([0.8, 0.2], seed=12)

In [74]:
train_data.show(5)

+--------------------+------+
|  assembled_features|target|
+--------------------+------+
|[1.5566689E7,0.0,...|     0|
|[1.5570769E7,0.0,...|     0|
|[1.5570932E7,1.0,...|     0|
|[1.5571059E7,0.0,...|     0|
|[1.5573452E7,0.0,...|     0|
+--------------------+------+
only showing top 5 rows



<h3 style="color:#0FCBC6"> => Logistic Regression </h3>

In [75]:
logistic_regression = LogisticRegression(featuresCol="assembled_features", labelCol="target")
model = logistic_regression.fit(train_data)

<h3 style="color:#0FCBC6"> => Inspect the model coefficients and intercept </h3>

In [77]:
coefficients = model.coefficients
intercept = model.intercept

print("Coefficients: ", coefficients)
print("Intercept: {:.3f}".format(intercept))

Coefficients:  [-8.708251472782162e-08,0.41870463194828345,0.23618911044528476,3.319963305616845e-05]
Intercept: -11.242


<h3 style="color:#0FCBC6"> => Evaluate the model on test data </h3>

In [82]:
predictions = model.transform(test_data)
predictions.show()

+--------------------+------+--------------------+--------------------+----------+
|  assembled_features|target|       rawPrediction|         probability|prediction|
+--------------------+------+--------------------+--------------------+----------+
|[1.5569641E7,0.0,...|     1|[-4.2553027869232...|[0.01399028859221...|       1.0|
|[1.5574305E7,1.0,...|     0|[2.15313286257781...|[0.89596116836296...|       0.0|
|[1.5579781E7,0.0,...|     0|[4.18864681938546...|[0.98505980052822...|       0.0|
|[1.5582492E7,1.0,...|     1|[1.48319526631109...|[0.81505472177296...|       0.0|
|[1.5583681E7,1.0,...|     1|[0.63814126480846...|[0.65433317020271...|       0.0|
|[1.5584114E7,1.0,...|     0|[5.74805626154796...|[0.99682116391712...|       0.0|
|[1.5587177E7,1.0,...|     0|[0.57491263980694...|[0.63989596665148...|       0.0|
|[1.5591433E7,1.0,...|     0|[1.95163493450095...|[0.87562480537294...|       0.0|
|[1.5592877E7,1.0,...|     0|[4.02530464734309...|[0.98245532948915...|       0.0|
|[1.

In [84]:
# AUC-ROC
evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction", labelCol="target")
auc = evaluator.evaluate(predictions)
print(f"AUC-ROC: {auc:.4f}")

AUC-ROC: 0.9300


In [85]:
# Accuracy, Precision, and Recall
multi_evaluator = MulticlassClassificationEvaluator(labelCol="target", predictionCol="prediction")
accuracy = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "accuracy"})
precision = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedPrecision"})
recall = multi_evaluator.evaluate(predictions, {multi_evaluator.metricName: "weightedRecall"})

print(f"Accuracy: {accuracy:.4f}")
print(f"Precision: {precision:.4f}")
print(f"Recall: {recall:.4f}")

Accuracy: 0.8590
Precision: 0.8599
Recall: 0.8590


In [None]:
spark.close()

<h2 style="color:#0FCBC6"><u>Example 2 Pipeline</u></h2>

<h3>PySpark pipeline: </h3>
<div style="margin-top: -15px;">
The pipeline is a sequence of stages (executed in order) when fitted, each of them acts as an estimator (or Transformer).    <br>
<div style="line-height:1.5">
GOAL => Predict if a person is survided or not, using the Titanic dataset.  <br>
</div>

In [71]:
""" Spark session creation """
spa = SparkSession \
        .builder \
        .appName('Example2TitanicData') \
        .getOrCreate()

In [72]:
path = "./datasets_for_pyspark/titanic.csv"
data_titanic_raw = spa.read.csv(path, inferSchema=True, header=True)
data_titanic_raw

DataFrame[PassengerId: int, Survived: int, Pclass: int, Name: string, Sex: string, Age: double, SibSp: int, Parch: int, Ticket: string, Fare: double, Cabin: string, Embarked: string]

In [73]:
data_titanic_raw.show(5)

+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       1|     1|Cumings, Mrs. Joh...|female|38.0|    1|    0|        PC 17599|71.2833|  C85|       C|
|          3|       1|     3|Heikkinen, Miss. ...|female|26.0|    0|    0|STON/O2. 3101282|  7.925| null|       S|
|          4|       1|     1|Futrelle, Mrs. Ja...|female|35.0|    1|    0|          113803|   53.1| C123|       S|
|          5|       0|     3|Allen, Mr. Willia...|  male|35.0|    0|    0|          373450|   8.05| null|       S|
+-----------+--------+------+--------------------+------+----+-----+-----+------

In [74]:
def check_if_there_are_null_values(df):
    """" Search for null values ()= 0) for each column of given DataFrame. 
    
    Details:
        - For each column "c":
            - counts the number of NULLs ('when' condition returns the column value)
            - alias the results with the original column name

    Returns:
        - Selection of rows from the df with null values [pyspark.sql.dataframe.DataFrame]
    """    
    df.select([count(when(col(c).isNull(),c)).alias(c) for c in df.columns]).show()
    return df

def check_if_there_are_nan_values(df):
    df.select([col_name for col_name in df.columns if df.select(isnan(col(col_name))).collect()[0][0]])
    return df

df1 = check_if_there_are_null_values(data_titanic_raw)
df2 = check_if_there_are_nan_values(data_titanic_raw)

print("The type of dataframes is: {}".format(type(df1)))
print(f"size of df1 is: {df1.count()}")
print(f"size of df2 is: {df2.count()}")

df1.show(4), df2.show(4)


+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|PassengerId|Survived|Pclass|Name|Sex|Age|SibSp|Parch|Ticket|Fare|Cabin|Embarked|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+
|          0|       0|     0|   0|  0|177|    0|    0|     0|   0|  687|       2|
+-----------+--------+------+----+---+---+-----+-----+------+----+-----+--------+

The type of dataframes is: <class 'pyspark.sql.dataframe.DataFrame'>
size of df1 is: 891
size of df2 is: 891
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|PassengerId|Survived|Pclass|                Name|   Sex| Age|SibSp|Parch|          Ticket|   Fare|Cabin|Embarked|
+-----------+--------+------+--------------------+------+----+-----+-----+----------------+-------+-----+--------+
|          1|       0|     3|Braund, Mr. Owen ...|  male|22.0|    1|    0|       A/5 21171|   7.25| null|       S|
|          2|       

(None, None)

<h3 style="color:#0FCBC6"> => Preprocessing </h3>

In [75]:
class preprocess_transform(Transformer, HasOutputCols, DefaultParamsReadable, DefaultParamsWritable):
	""" Custom Transformer class, to execute the whole process of refining data, preparing data in the proper way. \\
		It implements sklean-style fit/transform methods for use in ML pipelines.
	"""	
	value = Param(Params._dummy(),"value","value to fill")
	"""
	@keyword_only
	def __init__(self, df, outputCols=None): 
		super(preprocess_transform, self).__init__()
		self.df = df
		kwargs = self._input_kwargs
		self._set(**kwargs)
	"""
	@keyword_only
	def __init__(self, df, outputCols=None):
		super(preprocess_transform, self).__init__()

		# Define _input_kwargs
		self._input_kwargs = {}

		self.df = df
		kwargs = self._input_kwargs
		self._set(**kwargs)

	@keyword_only
	def setParams(self, outputCols=None):
		""" Set the output columns for the transformer.

		Parameters:
			- List of output columns after transformation [list]
		Returns: 
			- 
		"""
		kwargs = self._input_kwargs
		return self._set(**kwargs)

	def setValue(self, value):
		""" Set the value to use for imputing missing values.

		Parameters:
			- Value to use for missing value imputation
		Returns: 
			- 
		"""
		return self._set(value=value)

	def getValue(self):
		""" Get the currently set value for missing value imputation. """
		return self.getOrDefault(self.value)
		
	def feature_generation(self, df):
		""" Generate new feature columns. 
		
		Parameters:
			- DataFrame to check and adjust [pyspark.sql.dataframe.DataFrame]

		Details:
			- Genearate the following new features:
				- Initial: Extract title from Name as new Initial column:
					- Extract the first word (before a dot) in the "Name" column as a new column "Initial" using a regex pattern.
					- pattern => ([A-Za-z]+) matches one or more alphabetic characters
				- Normalized titles: Change titles into standard categories
				- Family_Size: Adds family size column by adding SibSp and Parch columns
				- Alone: Adds indicator for traveling alone. \\
					lit() takes a value as input and wraps it into a Column expression.	
		Returns:
			- Tranformed DataFrame with imputed data [pyspark.sql.dataframe.DataFrame]			
		

		"""
		# Extract title from Name as new Initial column
		df = df.withColumn("Initial", regexp_extract(col("Name"),"([A-Za-z]+)\.",1))
		
		# Change titles
		df = df.replace(['Mlle','Mme', 'Ms', 'Dr','Major','Lady','Countess','Jonkheer','Col','Rev','Capt','Sir','Don'],
						['Miss','Miss','Miss','Mr','Mr',  'Mrs',  'Mrs',  'Other',	'Other','Other','Mr','Mr','Mr'])
		
		# Create Family_Size 
		df = df.withColumn("Family_Size", col('SibSp') + col('Parch'))
		# Create Alone indicator (a literal/constant value).		
		df = df.withColumn('Alone', lit(0))
		# Set Alone to 1 if Family_Size is 0, to 0 otherwise
		df = df.withColumn("Alone",when(df["Family_Size"] ==0, 1).otherwise(df["Alone"]))
		
		return df
	
	def Age_impute(self, df):
		""" Impute missing age values based on the mean age for each title.
		
		Parameters:
			- Input DataFrame to check and adjust [pyspark.sql.dataframe.DataFrame]

		Details:
			- Calculate mean age for each title using groupby
			- Rename column to semantic name
			- Get list of titles for Initial adn Age features
				- Convert the df to an RDD to leverage RDD operations, flattens into a 1D RDD, and collects the RDD rows into a Python list.
			- Impute age with mean if null, for each title

		Returns:
			- Tranformed DataFrame with imputed data [pyspark.sql.dataframe.DataFrame]
		"""
		Age_mean = df.groupBy("Initial").avg('Age')
		Age_mean = Age_mean.withColumnRenamed('avg(Age)','mean_age')
		## Get lists 
		Initials_list = Age_mean.select("Initial").rdd.flatMap(lambda x: x).collect()
		Mean_list = Age_mean.select("mean_age").rdd.flatMap(lambda x: x).collect()
		
		# Iterate through the Initials_list and Mean_list simultaneously using zip and imputing the mean age if age is null for that title.
		for i, j in zip(Initials_list, Mean_list):
			df = df.withColumn("Age",when((df["Initial"] == i) & (df["Age"].isNull()), j).otherwise(df["Age"]))
		return df
	
	def Embark_impute(self, df):
		""" Impute missing embark values. 
		
		Parameters:
			- Dataframe to check and adjust [pyspark.sql.dataframe.DataFrame]

		Details:
			- Groups the data by the Embarked column
				- count the rows for each embarkation point
				- sort descending by the counts
				- collect to the driver and takes the top result
			
			- fills NA values in the DataFrame. \\
				{'Embarked': mode_value} specifies to fill NAs in the Embarked column with the mode value

		Returns:
			- Tranformed Dataframe with imputed data pyspark.sql.dataframe.DataFrame]			
		"""
		mode_value = df.groupBy('Embarked').count().sort(col('count').desc()).collect()[0][0]
		df = df.fillna({'Embarked':mode_value})
		return df

	def Fare_impute(self, df):
		""" Impute missing fare values based on the average fare for each passenger class ('Pclass').
		
		Parameters:
			- Data to check and adjust [pyspark.sql.dataframe.DataFrame]

		Details:
			- For each passenger class 'i' in 'Pclass':
				- 1 Calculate average fare for that class:
					- Group by class and compute average fare
					- Select only the class and mean columns
					- Filter for the current class
					- Get the mean fare value

				- 2 Update 'Fare' column:
					- Check if fare is missing
					- Check if class matches
					- Impute mean fare value if above conditions met
					- Keep original if not missing

		Returns:
			- Tranformed df [pyspark.sql.dataframe.DataFrame]			
		"""

		Select_pclass = df.filter(col('Fare').isNull()).select('Pclass')
		if Select_pclass.count() > 0:
			Pclass = Select_pclass.rdd.flatMap(lambda x: x).collect()
			for i in Pclass:
				mean_pclass_fare = df.groupBy('Pclass').mean().select('Pclass','avg(Fare)').filter(col('Pclass')== i).collect()[0][1]
				df = df.withColumn("Fare",when((col('Fare').isNull()) & (col('Pclass') == i), mean_pclass_fare).otherwise(col('Fare')))
		return df

	def all_impute_together(self, df):
		""" Impute all missing values.  """

		df = self.Age_impute(df)
		df = self.Embark_impute(df)
		df = self.Fare_impute(df)
		return df

	def stringToNumeric_conv(self, df, col_list):
		""" Convert categorical string columns to numeric. 
		Parameters:
			- Data to check and adjust [pyspark.sql.dataframe.DataFrame]
			- List of categorical columns to convert [list]

		Details:
			- Create a StringIndexer for each column (with list chìomprehension) to encode string columns \\
				Fit the StringIndexer on the DataFrame
			- Create pipeline with the indexers 
				Pipeline() chains together transformations into a pipeline
			- Fit the Pipeline and transform the DataFrame 

		Returns:
			- Converted DataFrame [pyspark.sql.dataframe.DataFrame]				
		"""
		indexer = [StringIndexer(inputCol=column,outputCol=column+"_index").fit(df) for column in col_list]
		string_change_pipeline = Pipeline(stages=indexer)
		df = string_change_pipeline.fit(df).transform(df)
		return df

	def drop_column(self, df, col_list):
		""" Remove given columns entirely. """
		for i in col_list:
			df = df.drop(col(i))
		return df

	def _transform(self):
		""" Main transformation logic. Apply all feature engineering steps and transformations to the DataFrame. """		
		print("...pending tranformations...")

		col_list = ["Sex","Embarked","Initial"]
		dataset = self.feature_generation(self.df)
		df_impute = self.all_impute_together(dataset)
		df_numeric = self.stringToNumeric_conv(df_impute, col_list)
		df_final = self.drop_column(df_numeric, ['Cabin','Name','Ticket','Family_Size','SibSp','Parch','Sex','Embarked','Initial'])
		
		print("Done.")
		return df_final

In [88]:
# If not on Google Colab, works only with less samples (10% of the original data)
data_titanic_raw = data_titanic_raw.sample(fraction=0.1) 
my_model = preprocess_transform(df=data_titanic_raw)

dataframe_final = my_model._transform()

...pending tranformations...
Done.


In [77]:
dataframe_final.show(4)

+-----------+--------+------+------------------+------+-----+---------+--------------+-------------+
|PassengerId|Survived|Pclass|               Age|  Fare|Alone|Sex_index|Embarked_index|Initial_index|
+-----------+--------+------+------------------+------+-----+---------+--------------+-------------+
|          3|       1|     3|              26.0| 7.925|    1|      1.0|           0.0|          1.0|
|         18|       1|     2|35.166666666666664|  13.0|    1|      0.0|           0.0|          0.0|
|         30|       0|     3|35.166666666666664|7.8958|    1|      0.0|           0.0|          0.0|
|         42|       0|     2|              27.0|  21.0|    0|      1.0|           0.0|          2.0|
+-----------+--------+------+------------------+------+-----+---------+--------------+-------------+
only showing top 4 rows



In [78]:
# Split dataset after transformation 
train_final, test_final = dataframe_final.randomSplit([0.7, 0.3]) 
train_final.show(4), test_final.show(4)

+-----------+--------+------+------------------+-------+-----+---------+--------------+-------------+
|PassengerId|Survived|Pclass|               Age|   Fare|Alone|Sex_index|Embarked_index|Initial_index|
+-----------+--------+------+------------------+-------+-----+---------+--------------+-------------+
|          3|       1|     3|              26.0|  7.925|    1|      1.0|           0.0|          1.0|
|         18|       1|     2|35.166666666666664|   13.0|    1|      0.0|           0.0|          0.0|
|         30|       0|     3|35.166666666666664| 7.8958|    1|      0.0|           0.0|          0.0|
|         49|       0|     3|35.166666666666664|21.6792|    0|      0.0|           1.0|          0.0|
+-----------+--------+------+------------------+-------+-----+---------+--------------+-------------+
only showing top 4 rows

+-----------+--------+------+------------------+------+-----+---------+--------------+-------------+
|PassengerId|Survived|Pclass|               Age|  Fare|Alo

(None, None)

In [79]:
# Merge multiple columns into a vector column, with a feature transformer assembler
feature = VectorAssembler(inputCols=['Pclass','Age','Fare','Alone','Sex_index','Embarked_index','Initial_index'],outputCol="features")

In [80]:
""" Classifiers """
lr = LogisticRegression(labelCol='Survived',featuresCol='features')
rf = RandomForestClassifier(labelCol="Survived", featuresCol="features", numTrees=10)
gb = GBTClassifier(labelCol="Survived", featuresCol="features", maxIter=10)

<h3 style="color:#0FCBC6"> => Grid Param Search for 3 classification techniques </h3>

In [81]:
""" 1) Pipeline for Logistic Regression. 
Pipeline stages are:
+ initilization
+ fit 
+ transform. 
"""
pipeline = Pipeline(stages=[feature, lr])

paramGrid = ParamGridBuilder().addGrid(lr.regParam, [0.1, 0.01]).addGrid(lr.elasticNetParam, [0.0, 0.5, 1.0]).build()

evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

crossval = CrossValidator(estimator = pipeline,
                        estimatorParamMaps = paramGrid,
                        evaluator = evaluator,
                        numFolds = 3) 

## Run cross-validation, and choose the best set of parameters
cvModel = crossval.fit(train_final)
prediction = cvModel.transform(train_final)
prediction.show(5)

DataFrame[PassengerId: int, Survived: int, Pclass: int, Age: double, Fare: double, Alone: int, Sex_index: double, Embarked_index: double, Initial_index: double, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [91]:
# Average metrics on the holdout folds for Logistic Regression. 
cvModel.avgMetrics

[0.7589495850365416,
 0.7589495850365416,
 0.7589495850365416,
 0.7461290722160286,
 0.7589495850365416,
 0.7589495850365416,
 0.7589495850365416,
 0.7589495850365416,
 0.77746810355506,
 0.77746810355506,
 0.77746810355506,
 0.77746810355506]

In [82]:
""" 2) Pipeline for Random Forest. 
Pipeline stages are:
+ initilization
+ fit 
+ transform. 
"""
pipeline = Pipeline(stages=[feature, rf])

paramGrid = ParamGridBuilder().addGrid(rf.numTrees, [100,300]).build()

evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

crossval = CrossValidator(estimator = pipeline,
                        estimatorParamMaps = paramGrid,
                        evaluator = evaluator,
                        numFolds = 3)  

# Run cross-validation
cvModel = crossval.fit(train_final)
prediction = cvModel.transform(train_final)
prediction.show(5)

23/08/18 15:42:31 WARN DAGScheduler: Broadcasting large task binary with size 1054.1 KiB
23/08/18 15:42:34 WARN DAGScheduler: Broadcasting large task binary with size 1140.3 KiB


+-----------+--------+------+------------------+-------+-----+---------+--------------+-------------+--------------------+--------------------+--------------------+----------+
|PassengerId|Survived|Pclass|               Age|   Fare|Alone|Sex_index|Embarked_index|Initial_index|            features|       rawPrediction|         probability|prediction|
+-----------+--------+------+------------------+-------+-----+---------+--------------+-------------+--------------------+--------------------+--------------------+----------+
|          3|       1|     3|              26.0|  7.925|    1|      1.0|           0.0|          1.0|[3.0,26.0,7.925,1...|[28.9016284899647...|[0.28901628489964...|       1.0|
|         18|       1|     2|35.166666666666664|   13.0|    1|      0.0|           0.0|          0.0|[2.0,35.166666666...|[34.6597009457690...|[0.34659700945769...|       1.0|
|         30|       0|     3|35.166666666666664| 7.8958|    1|      0.0|           0.0|          0.0|[3.0,35.166666666..

In [90]:
# Average metrics on the holdout folds for Random Forest.
cvModel.avgMetrics

[0.7589495850365416,
 0.7589495850365416,
 0.7589495850365416,
 0.7461290722160286,
 0.7589495850365416,
 0.7589495850365416,
 0.7589495850365416,
 0.7589495850365416,
 0.77746810355506,
 0.77746810355506,
 0.77746810355506,
 0.77746810355506]

In [83]:
""" 3) Pipeline for Gradient Boosting. 
Pipeline stages are:
+ initilization
+ fit 
+ transform. 
"""
pipeline = Pipeline(stages=[feature, gb])

paramGrid = ParamGridBuilder().addGrid(gb.maxDepth, [2, 4, 6]).addGrid(gb.maxBins, [20, 60]).addGrid(gb.maxIter, [10, 20]).build()

evaluator = MulticlassClassificationEvaluator(labelCol="Survived", predictionCol="prediction", metricName="accuracy")

crossval = CrossValidator(estimator = pipeline,
                        estimatorParamMaps = paramGrid,
                        evaluator= evaluator,
                        numFolds = 3)  

# Run cross-validation
cvModel = crossval.fit(train_final)
prediction = cvModel.transform(train_final)
prediction.show(5)

23/08/18 15:42:47 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 60 to 44 (= number of training instances)
23/08/18 15:42:49 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 60 to 44 (= number of training instances)
23/08/18 15:43:09 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 60 to 44 (= number of training instances)
23/08/18 15:43:12 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 60 to 44 (= number of training instances)
23/08/18 15:43:26 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 60 to 44 (= number of training instances)
23/08/18 15:43:30 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 60 to 44 (= number of training instances)
23/08/18 15:43:41 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 60 to 49 (= number of training instances)
23/08/18 15:43:43 WARN DecisionTreeMetadata: DecisionTree reducing maxBins from 60 to 49 (= number of training instances)
23/08/18 15:43:54 WARN D

DataFrame[PassengerId: int, Survived: int, Pclass: int, Age: double, Fare: double, Alone: int, Sex_index: double, Embarked_index: double, Initial_index: double, features: vector, rawPrediction: vector, probability: vector, prediction: double]

In [89]:
# Average metrics on the holdout folds for Gradient Boosting. 
cvModel.avgMetrics

[0.7589495850365416,
 0.7589495850365416,
 0.7589495850365416,
 0.7461290722160286,
 0.7589495850365416,
 0.7589495850365416,
 0.7589495850365416,
 0.7589495850365416,
 0.77746810355506,
 0.77746810355506,
 0.77746810355506,
 0.77746810355506]

In [84]:
# Store model 
cvModel.bestModel.write().overwrite().save('./datasets_for_pyspark/Titanic_model_saved')

In [87]:
# Load trained model to perform prediction => Returns the entire df with "prediction" column like before
my_model_laoded = PipelineModel.load('./datasets_for_pyspark/Titanic_model_saved')
predictions = my_model_laoded.transform(test_final)
predictions.show(10)

+-----------+--------+------+------------------+-------+-----+---------+--------------+-------------+--------------------+--------------------+--------------------+----------+
|PassengerId|Survived|Pclass|               Age|   Fare|Alone|Sex_index|Embarked_index|Initial_index|            features|       rawPrediction|         probability|prediction|
+-----------+--------+------+------------------+-------+-----+---------+--------------+-------------+--------------------+--------------------+--------------------+----------+
|         42|       0|     2|              27.0|   21.0|    0|      1.0|           0.0|          2.0|[2.0,27.0,21.0,0....|[-1.3259026792203...|[0.06587782434721...|       1.0|
|         63|       0|     1|              45.0| 83.475|    0|      0.0|           0.0|          0.0|(7,[0,1,2],[1.0,4...|[-0.9424782990884...|[0.13182058510852...|       1.0|
|         88|       0|     3|35.166666666666664|   8.05|    1|      0.0|           0.0|          0.0|[3.0,35.166666666..

<h3 style="color:#0FCBC6"> => Metrics </h3>

In [102]:
# Rename last column since BinaryClassificationEvaluator expects a column named "label" by default, 
predictions_renamed = predictions.withColumnRenamed("Survived", "label")

evaluator = BinaryClassificationEvaluator()

# Area under the ROC Curve
auroc = evaluator.evaluate(predictions_renamed, {evaluator.metricName: "areaUnderROC"})
# Area under the precision-recall curve
aurpr = evaluator.evaluate(predictions_renamed, {evaluator.metricName: "areaUnderPR"})

print(f"auroc is : {auroc}")
print(f"aurpr is : {aurpr}")

auroc is : 0.736
aurpr is : 0.2162962962962963


In [96]:
# Calculate accuracy on Pandas DataFrame for comparison

pred_pdf = predictions.toPandas()
accuracy = accuracy_score(pred_pdf['Survived'], pred_pdf['prediction'])
print(f"The accuracy is {accuracy}")

The accuracy is 0.6666666666666666
