# 07.2 - Pyspark use-case
_aka_  
__We get rid of our training wheels__

Let's do a micro-project. Before you can use the data to train a model, you will need to cross databases, join registries, add columns, replace data...

So far in this course we have been using complete and clean datasets.


We have a collection of databases (just 2, for the sake of time) on the "sparkfiles" directory. The goal is to create a final dataset on top of which we can run some ML model.

The goal is to predict the housing __SalePrice__ target based on other KPIs we have available.

The __houses_A-L.csv__ database is the listing of houses from neighbourhoods whose name starts in the range A to L. It was collected by one team at your company. The __houses_M-Z.csv__ was collected by another team. let's take a look and see if we can use a union on both databases.

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName('adpro').getOrCreate()

In [None]:
dfal = spark.read.csv("../sparkfiles/houses_A-L.csv")
dfal.printSchema()

Unlike pandas, spark needs a little more guidance when reading from csv files.

In [None]:
dfal = spark.read.option("header", True).csv("../sparkfiles/houses_A-L.csv")
dfal.printSchema()

In [None]:
dfal.show(5)

Usually, in a company enviroment, you don't keep data in csvs. The company usually resorts to a "data lake" solution: a large database open to the members of the company with tiers of security. Imagine that you work in customer relations. The GDPR allows you to have access to customer information, provided you sign a [Non-Disclosure Agreement (NDA)](https://en.wikipedia.org/wiki/Non-disclosure_agreement). Breeching such an agreemnt has harsh consequences.

Technical teams often do not have access to customer information, but to activity generated by customers in the assets of the company. For example, at a telco company, technical teams have access to how many calls are being made through a cell tower (a company asset), but no idea who is doing them.

So, it is possible for both the customer and the technical table to reside in the same data lake, but with diferent permission settings. Thus, the data lake is __GDPR compliant__.

Pyspark offers the possibility to register a temporary table on the cluster side. We will first register our two tables, to simulate we are accessing a company's data lake. Only, instead of several TB of information, we are doing kB tables for the sake of speed.

In [None]:
dfal.registerTempTable("houses_a_l")

In [None]:
dfmz = spark.read.option("header", True).csv("../sparkfiles/houses_M-Z.csv")
dfmz.printSchema()

Someone placed a ";" instead of a ","

In [None]:
dfmz = spark.read.option("header", True).option("delimiter", ";").csv("../sparkfiles/houses_M-Z.csv")
dfmz.printSchema()

In [None]:
dfmz.registerTempTable("houses_m_z")

At this moment, your entry point to the datalake would could be done via SQL. If you are unfamiliar with SQL, don't worry. The advantage of pyspark is hat you can select the entire table and do just a small preview.

In [None]:
df1 = spark.sql("SELECT * FROM houses_a_l LIMIT 5")
df1.show()

In [None]:
df2 = spark.sql("SELECT * FROM houses_m_z LIMIT 5")
df2.show()

<div class="alert alert-info">
    <b>Question: how can we combine the two tables in a spark dataframe and can we do it right away?<b>
</div>

Even though our prediction target is the __SalePrice__, it is not important for it to be at the end. Pyspark rarely cares about column order. Since our columns have a 1-1 match between both dataframes, we can easily do:

<div class="alert alert-danger">
    <b>We now want the entire table, so we need to rerun the SQL query<b>
</div>

In [None]:
df1 = spark.sql("SELECT * FROM houses_a_l")
df2 = spark.sql("SELECT * FROM houses_m_z")

In [None]:
df1 = df1.select(sorted(df1.columns))
df2 = df2.select(sorted(df2.columns))

In [None]:
df1.show(3)

In [None]:
df2.show(3)

<div class="alert alert-warning">
    <b>Remember: tables will usually have hundreds of columns. It is too hard to change them by hand.<b>
</div>
        
It is now safe to union the dfs.

In [None]:
df = df1.union(df2)

You can make SQL queries on top of __df__:

In [None]:
df.registerTempTable("houses")

spark.sql("SELECT * FROM houses WHERE kitchenqual='Gd'").show(3)

And since we have python, we can do aggregations. Remember, aggregations are done cluster-side. Returning tens of lines is easier than returning millions:

In [None]:
import pandas as pd
import seaborn as sns

In [None]:
dfpandas = df.groupBy("TotalRooms").count().toPandas()
dfpandas.head()

<div class="alert alert-danger">
    <b>'count' is a reserved pandas name. Never call a column the name of a pandas operation!<b>
</div>

In [None]:
dfpandas.columns = ["TotalRooms", "Counts"]

In [None]:
dfpandas.dtypes

TotalRooms is a string instead of an integer! We need to take notice of the types of the columns going forwards!

In [None]:
dfpandas['TotalRooms'] = pd.to_numeric(dfpandas['TotalRooms'])

In [None]:
dfpandas.sort_values(by='TotalRooms', ascending=True)

In [None]:
sns.barplot(x='TotalRooms', y='Counts', data=dfpandas)

American houses sure are big!

Our final dataframe is now almost ready to be modelled. We need to make sure the variables have their correct type.

In [None]:
df.printSchema()

In [None]:
df.show(3)

Whlist some variables are definetely categorical, others are numerical. We need to enforce the columns with numerical values to be contain numerical values.

In [None]:
import pyspark.sql.functions as F

In [None]:
cdf = df.withColumn("LivingArea", F.col("LivingArea").cast("bigint"))

In [None]:
cdf.printSchema()

Unfortunately, this is not easy to automate.

In [None]:
cols_to_change = ['LivingArea', 'LotArea', 'MoSold', 'NumberBedroom', 'OverallCond', 'SalePrice', 'TotalRooms', 'YearBuilt', 'YearRemodAdd', 'YrSold']

for col in cols_to_change:
    df = df.withColumn(col, F.col(col).cast("bigint"))

df.printSchema()

In [None]:
df.show(3)

In [None]:
df.registerTempTable("housing_market")

We now have a table at our cluster ready to perform training.

Pyspark has a vectorized ML suite. The same functions you know from scikit-learn are adapted to a distributed environment.

The methodology is the same. What differs is what is under the hood. 

---
## Spark MLlib: classification exercise

I would like to thank professor Susana Brandão for the help in this part.

Spark has a [Machine Learning library](https://spark.apache.org/mllib/) that enables you to do machine learning at scale.

__For a simple example ready to go, with numerical variables alone, [check this notebook](../Examples/04-SimpleLinearRegressionPyspark.ipynb).__

Let's "retrieve" the dataframe from our data lake:

In [None]:
hm = spark.sql("SELECT * FROM housing_market")

In [None]:
hm.show(3)

In [None]:
hm.printSchema()

## MLlib pipes
MlLib allow us to create pipes such as those in sklearn.  
Models usually receive arrays (or arrays of arrays) as input. 
The first thing we need to do is to convert the subset of features we are interessed in to a vector.  
We can handle numerical and categorical features differently:  
**numerical** can be imputed   
**categorical** can be encoded 

### Task 1: Start by checking which variables do you need
- we will use the sale price as our target
- we will impute with mean all numerical features,
- we will do one hot encoding on the categorical features

In [None]:
#for non categorical variables, check distributions
nonstring_features = [hm.columns[i] for i in range(len(hm.dtypes)) if hm.dtypes[i][1]!='string']
string_features = [hm.columns[i] for i in range(len(hm.dtypes)) if hm.dtypes[i][1]=='string']

In [None]:
nonstring_features

In [None]:
string_features

We need to extract the __SalePrice__, as that is our target.

In [None]:
target = ['SalePrice']
## This might come in handy later
all_nonstring_columns = nonstring_features.copy()
nonstring_features.remove('SalePrice')

In [None]:
nonstring_features

In [None]:
all_nonstring_columns

In [None]:
target

### Task 2: Indexing categories
https://spark.apache.org/docs/2.2.0/ml-features#stringindexer
Gets a column with a categorical variable, and creates a new one -> with a name of our choice -> that converts the variables into ints  
From the doc:


"Additionally, there are three strategies regarding how StringIndexer will handle unseen labels when you have fit a StringIndexer on one dataset and then use it to transform another:
- throw an exception (which is the default)
- skip the row containing the unseen label entirely
- put unseen labels in a special additional bucket, at index numLabels"

In our case, we are will create a new column for each of the categorical variable and create a new one with the suffix _ix  
Neighborhood - Neighborhood_ix    
KitchenQual - KitchenQual_ix  
Condition1 - Condition1_ix

In [None]:
from pyspark.ml.feature import VectorAssembler, StringIndexer, Imputer, OneHotEncoder
from pyspark.ml import Pipeline

Index string features -> convert strings to numbers. The indexer detects number of different strings in the column and assigns a number to each of the strings.

At this point, if you had 'null' results, you might want to consider additional cleaning, or just drop those elements.

In [None]:
indexers = [StringIndexer(inputCol=c, outputCol=c + '_ix', handleInvalid='keep') for c in string_features]

In [None]:
indexers

In [None]:
pipe_indexers = Pipeline(stages=indexers)
pipe = pipe_indexers.fit(df)

df_indexer = pipe.transform(df)

df_indexer

In [None]:
df_indexer.select("Condition1", "Condition1_ix").distinct().toPandas()

### Task 3: Encoding categories

We are going to create a new column for each of the categorical variables and create a new one with the suffix _cVec  

Neighborhood_ix - Neighborhood_cVec   
KitchenQual_ix - KitchenQual_cVec  
Condition1_ix - Condition1_cVec

This is what is called __one-hot encoding__

In [None]:
encoders = [OneHotEncoder(inputCol=col + "_ix", outputCol=col + "_cVec") for col in string_features]

In [None]:
encoders

In [None]:
pipe_indexers = Pipeline(stages=encoders)
pipe = pipe_indexers.fit(df_indexer)
df_encoders = pipe.transform(df_indexer)

What we have done so far is transforming "Condition1" into "Consition1_cVec":

In [None]:
df_encoders.select("Condition1", "Condition1_ix", "Condition1_cVec").distinct().toPandas()

### Task 4: Impute the numerical variables using a strategy.

We need to tackle missing numerical values. For that we need a "strategy". It could be the median of all values, the mean, a constant value, like zero. We will use the mean and replace inplace. However,

- Imputer only works with doubles or floats, so we need to recast all variables.
- There are two types of strategy, mean and median  
https://spark.apache.org/docs/2.2.0/api/python/pyspark.ml.html#pyspark.ml.feature.Imputer

Let's recast all numeric columns as double.

In [None]:
df_encoders = df_encoders.select([F.col(col).astype("double") if col in all_nonstring_columns else col for col in df_encoders.columns])

In [None]:
df_encoders

Impute the startegy:

In [None]:
imputer = Imputer(strategy='mean', inputCols=nonstring_features, outputCols=nonstring_features)
pipe_imputer = Pipeline(stages=[imputer]) ## stages need to be a sequence!! 
pipe = pipe_imputer.fit(df_encoders)

df_imputer = pipe.transform(df_encoders)

In [None]:
df_imputer

### Task 5: Select the variables we want to set up our model

In [None]:
# the name of the column with a vector with all the assembled features
assembled_col = 'assembled_features'

In [None]:
# String feature cols after indexing
enc_string_features = [c + '_cVec' for c in string_features]
# Feature cols to assemble
assemble_features = enc_string_features + nonstring_features

In [None]:
# Assemble feature values to vector form
assembler = VectorAssembler(inputCols=assemble_features, outputCol=assembled_col)
pipe_assembler = Pipeline(stages=[assembler]) ## stages need to be a sequence!! 
pipe = pipe_assembler.fit(df_imputer)

In [None]:
df_assembled = pipe.transform(df_imputer)

Here we're converting to Pandas so we can have a MarkDown preview

In [None]:
df_assembled.limit(5).toPandas()

So far, this is an example on how to deal with preparing a table to enable it to run be trained at scale. We did the table handling step-by-step. We can just do it in one go:

### All tasks: Single pipe

In [None]:
stages_together = [imputer] + indexers + encoders + [assembler]

In [None]:
stages_together

In [None]:
df = df.select([F.col(col).astype("double") if col in all_nonstring_columns else col for col in df.columns])

In [None]:
pipe_together = Pipeline(stages=stages_together) ## stages need to be a sequence!! 
pipe = pipe_together.fit(df)
df_all_at_once = pipe.transform(df)

In [None]:
df_all_at_once.limit(5).toPandas()

### Now we're ready to use the table

All the steps we did so far were to make sure our numerical representation can be used to train a model. We just perfomed "tidyness" operations so far.

Let's first use a Gradient Boosted Tree classifier, to see if we can predict the price of a house is above a certain threshold, giving the properties of the house.

In [None]:
from pyspark.ml.classification import GBTClassifier

In [None]:
df = df.withColumn("target", (F.col("SalePrice")>100000.0).astype("double"))

In [None]:
gbt = GBTClassifier(
            featuresCol=assembled_col,
            labelCol='target',
            predictionCol='preds',
            lossType='logistic',
            maxDepth=4,
            maxIter=20,
        )

In [None]:
stages_for_train = [imputer] + indexers + encoders + [assembler] + [gbt]

In [None]:
pipe_together = Pipeline(stages=stages_for_train) ## stages need to be a sequence!! 
pipe = pipe_together.fit(df)

In [None]:
df_ex = pipe.transform(df)

In [None]:
df_ex.select("target", "probability", "preds").toPandas().head()

#### Now a linear regression

In [None]:
from pyspark.ml.regression import LinearRegression

In [None]:
ldf = spark.sql("SELECT * FROM housing_market")

In [None]:
ldf.show(3)

In [None]:
ldf.printSchema()

In [None]:
lr = LinearRegression(featuresCol=assembled_col,
                      labelCol='SalePrice',
                      maxIter=10,
                      regParam=0.3,
                      elasticNetParam=0.8)

In [None]:
stages_for_train = [imputer] + indexers + encoders + [assembler] + [lr]

In [None]:
pipe_together = Pipeline(stages=stages_for_train) ## stages need to be a sequence!! 
#lr_df = pipe_together.fit(df)

In [None]:
lr_df = pipe_together.fit(df)

In [None]:
lr_df.stages

In [None]:
lr_df.stages[-1].summary.r2

In [None]:
spark.stop()