# Building Machine Learning Pipelines using PySpark
- Spark is a framework for working with Big Data, and has it's own machine learning libraries and workflow
- Using the libraries and functions available through the PySpark module, this project aims to portray a classic Logistic Regression workflow in PySpark exploring a dataset on the English Football Premier League

## Starting Spark Session
- Vital for interacting with Spark's functionalities, such as distributed computing execution, and the use of SQL operations for preprocessing 
- The use of these functionalities allows for more scalabale and efficient data processing for machine learning pipelines

In [17]:
import pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder \
                    .master('local[1]') \
                    .appName('SparkByExamples.com') \
                    .getOrCreate()

## Loading data
- Using Spark's DataFrame API, reading CSV files is simple, but defining the data's `schema` is important for feature engineering and model training
- As one can see, when the `schema` is inferred, every column is a `StringType`, which can be adjusted by defining a schema using `pyspark.sql.types`

In [18]:
data = spark.read.csv('epldata_final.csv', header=True) # loading data
data.printSchema()

root
 |-- name: string (nullable = true)
 |-- club: string (nullable = true)
 |-- age: string (nullable = true)
 |-- position: string (nullable = true)
 |-- position_cat: string (nullable = true)
 |-- market_value: string (nullable = true)
 |-- page_views: string (nullable = true)
 |-- fpl_value: string (nullable = true)
 |-- fpl_sel: string (nullable = true)
 |-- fpl_points: string (nullable = true)
 |-- region: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- new_foreign: string (nullable = true)
 |-- age_cat: string (nullable = true)
 |-- club_id: string (nullable = true)
 |-- big_club: string (nullable = true)
 |-- new_signing: string (nullable = true)



### Schema Definition

In [19]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType

defined_schema = StructType([StructField('name', StringType()), # defining schema
                             StructField('club', StringType()),
                             StructField('age', IntegerType()),
                             StructField('position', StringType()),
                             StructField('position_cat', IntegerType()),
                             StructField('market_value', DoubleType()),
                             StructField('page_views',  IntegerType()),
                             StructField('fpl_value', DoubleType()),
                             StructField('fpl_sel', DoubleType()),
                             StructField('fpl_points', IntegerType()),
                             StructField('region', StringType()),
                             StructField('nationality', StringType()),
                             StructField('new_foreign', IntegerType()),
                             StructField('age_cat', IntegerType()),
                             StructField('club_id', IntegerType()),
                             StructField('big_club', IntegerType()),
                             StructField('new_signing', IntegerType())])

data = spark.read.csv('epldata_final.csv', header=True, schema=defined_schema)
data.printSchema()

root
 |-- name: string (nullable = true)
 |-- club: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- position: string (nullable = true)
 |-- position_cat: integer (nullable = true)
 |-- market_value: double (nullable = true)
 |-- page_views: integer (nullable = true)
 |-- fpl_value: double (nullable = true)
 |-- fpl_sel: double (nullable = true)
 |-- fpl_points: integer (nullable = true)
 |-- region: string (nullable = true)
 |-- nationality: string (nullable = true)
 |-- new_foreign: integer (nullable = true)
 |-- age_cat: integer (nullable = true)
 |-- club_id: integer (nullable = true)
 |-- big_club: integer (nullable = true)
 |-- new_signing: integer (nullable = true)



### Dropping unneeded columns

In [20]:
data = data.drop(*['big_club', 'new_signing', 'new_foreign', 'age_cat']) # dropping unneeded columns
data.columns

['name',
 'club',
 'age',
 'position',
 'position_cat',
 'market_value',
 'page_views',
 'fpl_value',
 'fpl_sel',
 'fpl_points',
 'region',
 'nationality',
 'club_id']

## Exploratory Data Analysis
- Using Spark's specific querying functions, one can explore a dataset thorougly to understand the dataset


In [21]:
data.count(), len(data.columns) 

(461, 13)

In [22]:
data.select('market_value', 'fpl_value', 'page_views').describe().show() # exploring numerical data

+-------+------------------+------------------+-----------------+
|summary|      market_value|         fpl_value|       page_views|
+-------+------------------+------------------+-----------------+
|  count|               461|               461|              461|
|   mean|11.012039045553143| 5.447939262472885|763.7765726681127|
| stddev|12.257402881461974|1.3466952645024657| 931.805757407049|
|    min|              0.05|               4.0|                3|
|    max|              75.0|              12.5|             7664|
+-------+------------------+------------------+-----------------+



In [23]:
data.groupBy('club').count().show() # group by statement
data.groupBy('nationality').count().show()

+-----------------+-----+
|             club|count|
+-----------------+-----+
|        Tottenham|   20|
|Brighton+and+Hove|   22|
|         West+Ham|   22|
|   Leicester+City|   24|
|          Arsenal|   28|
|Manchester+United|   25|
|        West+Brom|   19|
|          Burnley|   18|
|      Bournemouth|   24|
| Newcastle+United|   21|
|      Southampton|   23|
|          Swansea|   25|
|     Huddersfield|   28|
|   Crystal+Palace|   21|
|        Liverpool|   27|
|          Chelsea|   20|
|  Manchester+City|   20|
|          Watford|   24|
|          Everton|   28|
|       Stoke+City|   22|
+-----------------+-----+

+-------------+-----+
|  nationality|count|
+-------------+-----+
|      Senegal|    7|
|       Sweden|    3|
|      Germany|   16|
|       France|   25|
|       Greece|    1|
|     Congo DR|    4|
|      Algeria|    3|
|        Wales|   12|
|    Argentina|   17|
|      Belgium|   18|
|      Ecuador|    2|
|      Finland|    1|
|        Ghana|    5|
|   The Gambia|    1|
|

In [24]:
import pyspark.sql.functions as f # using sql functionality for null value check

aggregated_data = data.agg(*[f.count(f.when(f.isnull(c), c)).alias(c) for c in data.columns])
aggregated_data.show()

+----+----+---+--------+------------+------------+----------+---------+-------+----------+------+-----------+-------+
|name|club|age|position|position_cat|market_value|page_views|fpl_value|fpl_sel|fpl_points|region|nationality|club_id|
+----+----+---+--------+------------+------------+----------+---------+-------+----------+------+-----------+-------+
|   0|   0|  0|       0|           0|           0|         0|        0|    461|         0|     0|          0|      0|
+----+----+---+--------+------------+------------+----------+---------+-------+----------+------+-----------+-------+



## String Indexing
- `StringIndexer()`will convert the categorical values of this dataset, which allows a logistic regression model to interpret categorical variables as numerical feautures during training processes of machine learning

In [25]:
from pyspark.ml.feature import StringIndexer # string indexing

si_name = StringIndexer(inputCol='name', outputCol='id_name')
si_club = StringIndexer(inputCol='club', outputCol='id_club')
si_nation = StringIndexer(inputCol='nationality', outputCol='id_nationality')

data = si_name.fit(data).transform(data)
data = si_club.fit(data).transform(data)
data = si_nation.fit(data).transform(data)

In [26]:
data.select('name', 'id_name', 'club', 'id_club', 'nationality', 'id_nationality').show(5)

+-----------------+-------+-------+-------+--------------+--------------+
|             name|id_name|   club|id_club|   nationality|id_nationality|
+-----------------+-------+-------+-------+--------------+--------------+
|   Alexis Sanchez|   19.0|Arsenal|    0.0|         Chile|          31.0|
|       Mesut Ozil|  302.0|Arsenal|    0.0|       Germany|           7.0|
|        Petr Cech|  348.0|Arsenal|    0.0|Czech Republic|          32.0|
|     Theo Walcott|  418.0|Arsenal|    0.0|       England|           0.0|
|Laurent Koscielny|  250.0|Arsenal|    0.0|        France|           2.0|
+-----------------+-------+-------+-------+--------------+--------------+
only showing top 5 rows



## One Hot Encoding
- `OneHotEncoder()` is another method in which PySpark uses to convert categorical variables into numerical values

In [27]:
from pyspark.ml.feature import OneHotEncoder # one hot encoding

OHE = OneHotEncoder(inputCols=['id_name', 'id_club', 'id_nationality'],
                   outputCols=['ohe_name', 'ohe_club', 'ohe_nationality'])

data = OHE.fit(data).transform(data)

In [28]:
data.select('name', 'id_name', 'ohe_name').show(5)
data.select('club', 'id_club', 'ohe_club').show(5)
data.select('nationality', 'id_nationality', 'ohe_nationality').show(5)

+-----------------+-------+-----------------+
|             name|id_name|         ohe_name|
+-----------------+-------+-----------------+
|   Alexis Sanchez|   19.0| (460,[19],[1.0])|
|       Mesut Ozil|  302.0|(460,[302],[1.0])|
|        Petr Cech|  348.0|(460,[348],[1.0])|
|     Theo Walcott|  418.0|(460,[418],[1.0])|
|Laurent Koscielny|  250.0|(460,[250],[1.0])|
+-----------------+-------+-----------------+
only showing top 5 rows

+-------+-------+--------------+
|   club|id_club|      ohe_club|
+-------+-------+--------------+
|Arsenal|    0.0|(19,[0],[1.0])|
|Arsenal|    0.0|(19,[0],[1.0])|
|Arsenal|    0.0|(19,[0],[1.0])|
|Arsenal|    0.0|(19,[0],[1.0])|
|Arsenal|    0.0|(19,[0],[1.0])|
+-------+-------+--------------+
only showing top 5 rows

+--------------+--------------+---------------+
|   nationality|id_nationality|ohe_nationality|
+--------------+--------------+---------------+
|         Chile|          31.0|(60,[31],[1.0])|
|       Germany|           7.0| (60,[7],[1.0])|

## Vector Assembler
- The `VectorAssembler()` in PySpark asserts all the numerical values into a single column. This helps assemble feature vectors for machine learning algorithms.

In [29]:
from pyspark.ml.feature import VectorAssembler # vectorizing data

assembler = VectorAssembler(inputCols=['market_value',
                                       'fpl_value',
                                       'id_name',
                                       'ohe_name',
                                       'id_club',
                                       'ohe_club',
                                       'id_nationality',
                                       'ohe_nationality'],
                            outputCol='vector')

data = data.fillna(0)
final_data = assembler.transform(data)

final_data.select('vector').show()

+--------------------+
|              vector|
+--------------------+
|(544,[0,1,2,22,46...|
|(544,[0,1,2,305,4...|
|(544,[0,1,2,351,4...|
|(544,[0,1,2,421,4...|
|(544,[0,1,2,253,4...|
|(544,[0,1,2,162,4...|
|(544,[0,1,2,339,4...|
|(544,[0,1,2,325,4...|
|(544,[0,1,2,402,4...|
|(544,[0,1,2,19,46...|
|(544,[0,1,2,153,4...|
|(544,[0,1,2,20,46...|
|(544,[0,1,2,176,4...|
|(544,[0,1,2,6,464...|
|(544,[0,1,2,138,4...|
|(544,[0,1,2,141,4...|
|(544,[0,1,2,244,4...|
|(544,[0,1,2,388,4...|
|(544,[0,1,2,99,46...|
|(544,[0,1,2,365,4...|
+--------------------+
only showing top 20 rows



## Logistic Regression
- Now that the data is all vectorized, you can perform a `LogisticRegression()`
- The model will aim to predict the `id_name` column given the `vector` column from the `VectorAssembler()` 

In [30]:
data_train, data_test = final_data.randomSplit([.8, .2], seed=1) # train-test-split

In [31]:
from pyspark.ml.classification import LogisticRegression # Logistic Regression

logit = LogisticRegression(featuresCol='vector', labelCol='id_name').fit(data_train)

prediction = logit.transform(data_test)
prediction.select('vector','id_name', 'rawPrediction',
                  'prediction', 'probability').show()

+--------------------+-------+--------------------+----------+--------------------+
|              vector|id_name|       rawPrediction|prediction|         probability|
+--------------------+-------+--------------------+----------+--------------------+
|(544,[0,1,2,7,463...|    4.0|[0.24329764999350...|     457.0|[0.00148177657594...|
|(544,[0,1,2,16,46...|   13.0|[0.18028050849506...|     115.0|[0.00178395079154...|
|(544,[0,1,2,17,46...|   14.0|[0.32276799015208...|     265.0|[1.47898190919739...|
|(544,[0,1,2,18,46...|   15.0|[0.31027327143040...|     265.0|[1.47278801917426...|
|(544,[0,1,2,20,46...|   17.0|[0.56966790497067...|      96.0|[0.00287209348982...|
|(544,[0,1,2,32,46...|   29.0|[-0.0083534992384...|       2.0|[0.00162949064057...|
|(544,[0,1,2,35,46...|   32.0|[0.08215885074565...|     343.0|[0.00106303727317...|
|(544,[0,1,2,36,46...|   33.0|[0.23828420684094...|     124.0|[8.28560608597087...|
|(544,[0,1,2,45,46...|   42.0|[0.14654659750553...|     389.0|[8.06873209394

## Machine Learning Pipeline
- Using the same machine learning workflow as above, one can make a `Pipeline()`
- Using the `stages` in the code section below, it shows how through the use of PySpark's distributed computing abilities, you can deploy scalable pipelines that can perform simple Machine Learning tasks on big data

In [32]:
from pyspark.ml import Pipeline

pipe_data = spark.read.csv('epldata_final.csv', header=True, schema=defined_schema)
stage_1 = StringIndexer(inputCol='name', outputCol='id_name_pipe')
stage_2 = StringIndexer(inputCol='club', outputCol='id_club_pipe')
stage_3 = StringIndexer(inputCol='nationality', outputCol='id_nationality_pipe')
stage_4 = OneHotEncoder(inputCols=['id_name_pipe', 'id_club_pipe', 'id_nationality_pipe'],
                   outputCols=['ohe_name_pipe', 'ohe_club_pipe', 'ohe_nationality_pipe'])
stage_5 = VectorAssembler(inputCols=['market_value',
                                       'fpl_value',
                                       'id_name_pipe',
                                       'ohe_name_pipe',
                                       'id_club_pipe',
                                       'ohe_club_pipe',
                                       'id_nationality_pipe',
                                       'ohe_nationality_pipe'],
                            outputCol='vector')
stage_6 = LogisticRegression(featuresCol='vector', labelCol='id_name_pipe')

In [33]:
regression_pipeline = Pipeline(stages=[stage_1, stage_2, stage_3, stage_4, stage_5, stage_6])
pipe_train, pipe_test = pipe_data.randomSplit([0.8,0.2], seed=1)

model = regression_pipeline.fit(pipe_train)
pipe_train = model.transform(pipe_train)

pipe_train.select('vector','id_name_pipe', 'rawPrediction',
                  'prediction', 'probability').show()

+--------------------+------------+--------------------+----------+--------------------+
|              vector|id_name_pipe|       rawPrediction|prediction|         probability|
+--------------------+------------+--------------------+----------+--------------------+
|(453,[0,1,3,378,3...|         0.0|[43.5902413358697...|       0.0|[1.0,1.8295883451...|
|(453,[0,1,2,4,379...|         1.0|[0.43891107242437...|       1.0|[1.88641726789220...|
|(453,[0,1,2,5,378...|         2.0|[-0.0107105684804...|       2.0|[6.43543400427312...|
|(453,[0,1,2,6,378...|         3.0|[0.01211729401848...|       3.0|[3.49137053052877...|
|(453,[0,1,2,7,378...|         4.0|[0.02632240996252...|       4.0|[7.19104370614431...|
|(453,[0,1,2,8,378...|         5.0|[0.33565779776431...|       5.0|[1.36569827957330...|
|(453,[0,1,2,9,378...|         6.0|[0.42580820882416...|       6.0|[1.81296644406015...|
|(453,[0,1,2,10,37...|         7.0|[0.42895177708633...|       7.0|[1.88424604086132...|
|(453,[0,1,2,11,37...