### Visualization

In [None]:
# define binarizer
# threshold 넘으면 1, 아니면 0
BI = Binarizer(threshold = 500000, inputCol = 'price', outputCol = 'price_bin')

# define string indexer
# makes string as a number, for example, belgium is 1 and france is 0
SI_lab = StringIndexer(inputCol = 'price_bin', outputCol = 'label')

# define one hot encoder for categorical features
catColumns = ['waterfront', 'view', 'condition', 'grade', 'renovated']
catColumnsIDX = [col + "_IDX" for col in catColumns]
SI_cat = StringIndexer(inputCols = catColumns, outputCols = catColumnsIDX)

# define vector assembler for categorical features
# combines a given list of columns into a single vector column
VA_cat = VectorAssembler(inputCols = catColumnsIDX, outputCol = 'catFeatures')

# define vector assembler for numeric features
numColumns = ['sqft_living', 'sqft_lot', 'sqft_above', 'sqft_basement', 'age', 'bedrooms', 'bathrooms', 'floors']
VA_num = VectorAssembler(inputCols = numColumns, outputCol = 'numFeatures')


### Session 2

The classes of all columns can defined manually with the `StructType` and `StructField` command. The latter has three parameters:
- `name` of the column
- `dataType` of the column
- `nullable`which defines whether the column can be null: true/false

In [None]:
# define the schemas
transactionSchema  = StructType([StructField('_c0', IntegerType(), True),
                                StructField('InvoiceNo', StringType(), True),
                                StructField('StockCode', StringType(), True),
  # change the datatype of InvoiceDate from string to timestamp
invoices = invoices.withColumn("InvoiceDate", F.to_timestamp("InvoiceDate", "M/d/yyyy H:m"))                              StructField('Quantity', IntegerType(), True)])

In [19]:
# change the datatype of InvoiceDate from string to timestamp
invoices = invoices.withColumn("InvoiceDate", F.to_timestamp("InvoiceDate", "M/d/yyyy H:m"))

In [None]:
# option 2: pyspark DataFrame functions
totalSold_2 = transactions.groupBy("StockCode") \
                            .agg({"Quantity": "sum"}) \
                            .withColumnRenamed("sum(Quantity)", "totalQuantitySold") \
                            .filter("totalQuantitySold > 25000") \
                            .sort("totalQuantitySold", ascending=False)

In [None]:
# get number of returned deliveries
df_ret = transactions.join(inventory, "StockCode") \
                    .select("InvoiceNo", (transactions.Quantity * inventory.UnitPrice).alias("Revenue")) \
                    .groupBy("InvoiceNo") \
                    .sum("Revenue") \
                    .withColumnRenamed("sum(Revenue)", "TotalRevenue") \
                    .filter("TotalRevenue < 0") \
                    .join(invoices, "InvoiceNo") \
                    .groupBy("CustomerID") \
                    .count() \
                    .withColumnRenamed("count", "nbReturned")# check
df_ret.show(5)

In [180]:
# check
df_ret.show(5)

+----------+----------+
|CustomerID|nbReturned|
+----------+----------+
|     13282|         3|
|     13610|         2|
|     15555|         4|
|     15271|         1|
|     14157|         1|
+----------+----------+
only showing top 5 rows



In [None]:
# define function
def add_one(var):
    var_new = var + 1
    return(var_new)

# wrap in udf
add_one_udf = udf(add_one, returnType=LongType())

# create new column
df = df.withColumn("x2", add_one_udf(df.value))

In [269]:
# check
df.show()

+---+-----+-------+---+
| id|value|and_one| x2|
+---+-----+-------+---+
|  A|    5|      6|  6|
|  B|   67|     68| 68|
|  C|  567|    568|568|
+---+-----+-------+---+



### Regrssion

In [None]:
read data -> create basetable -> pipeline

In [None]:
# remove the observations containing missing values
houses = houses.dropna('any')

# Keyword 'any' removes the row if any value of that row is NULL
# Keyword 'all' removes the row only if all values of that row are NULL

<h4> Pipelines </h4>
   
    - Because we are working with Big Data processing infrastructure (using distributed processing) the way we code is slightly different than other data processing tools. Here the infrastructure has inherrent built-in functionality that optimizes the way our code is processed. 
    - In short it means: the program will choose which steps to do when and how they are distributed over the nodes. 
    - In order for this to be done efficiently we need to give as many instructions as possible at the same time. This way the machine can decide how to divide an conquer. This is done by using pipelines. 
    - Each step in a pipeline is called a pipeline stage.

<br> **Pipelines consist of different stages (transformers & estimators)**, some examples:
- **`StringIndexer`**: <br> As a first general step we need to check if there are any text-variabels (usually categorical variables) in the dataset. Not all ML algorithms are able to handle this type of data. That's why it's always good practice to translate textual categories into numerical categories (e.g. A,B,C -> 1,2,3). For our dataset it is not needed.<br> https://spark.apache.org/docs/latest/ml-features.html#stringindexer <br>
- **`OneHotEncoderEstimator`**:<br> Another way to handle categorical labels is by transforming them into a vector with 0's and 1's. <br> https://spark.apache.org/docs/latest/ml-features.html#onehotencoderestimator <br>
- **`VectorIndexer`**: <br> Helps index categorical features in datasets of Vectors. Required for Tree methods.  <br>https://spark.apache.org/docs/latest/ml-features.html#vectorindexer <br>
- **`StandardScaler`**: <br> Transforms a dataset of Vector rows, normalizing each feature to have unit standard deviation and/or zero mean. <br>https://spark.apache.org/docs/latest/ml-features.html#standardscaler <br>
- **`VectorAssembler`**: <br> Transforms a number of input columns into one vector. This is used for combining features in order to train ML models like LR and DTs. <br>https://spark.apache.org/docs/latest/ml-features.html#vectorassembler <br>
- **Full overview**: <br> https://spark.apache.org/docs/latest/ml-features.html<br> Select only what is needed for the data you have at hand.

<br> **Exercice:** Apply several transformers and estimators in the dataset to create the final basetable
<br> **NOTE:** Take into account that some transformers need to be applied to the entire dataset, while others need to be applied to train/test set seperately.

In [None]:
# define the categorical variables
cat_cols = ['waterfront', 'view', 'floors', 'condition', 'grade', 'zipcode', 'renovated', 'bedrooms', 'bathrooms']

# define the assembler
VA_cat = VectorAssembler(inputCols=cat_cols, outputCol="cat_features")

In [None]:
# define indexer
VI = VectorIndexer(inputCol="cat_features", outputCol="cat_features_indexed")

<h5> Define, fit and apply Pipeline on data </h5>

In [None]:
# define pipeline model and fit on data
preprocessing_pipeline = Pipeline(stages=[VA_num, VA_cat, VI]).fit(houses)
# transform data by applying pipeline model on data
preprocessed_data = preprocessing_pipeline.transform(houses)

In [None]:
# select features and labels
preprocessed_data = preprocessed_data.select(["num_features", "cat_features_indexed", "price"])
# rename price to label
preprocessed_data = preprocessed_data.withColumnRenamed("price", "label")

In [23]:
# check
preprocessed_data.show(5)

+--------------------+--------------------+--------+
|        num_features|cat_features_indexed|   label|
+--------------------+--------------------+--------+
|[1180.0,5650.0,11...|[0.0,0.0,0.0,2.0,...|221900.0|
|[2570.0,7242.0,21...|[0.0,0.0,2.0,2.0,...|538000.0|
|[770.0,10000.0,77...|[0.0,0.0,0.0,2.0,...|180000.0|
|[1960.0,5000.0,10...|[0.0,0.0,0.0,4.0,...|604000.0|
|[1680.0,8080.0,16...|[0.0,0.0,0.0,2.0,...|510000.0|
+--------------------+--------------------+--------+
only showing top 5 rows



In [None]:
The StandardScaler should only be performed on the trainingset, because an equal mean and standard deviation between the training- and testset need to be assumed to avoid methodological mistakes.m

In [None]:
# split data in train and test set
train, test = preprocessed_data.randomSplit([0.7, 0.3])

# define scaler
SC = StandardScaler(inputCol="num_features", outputCol="num_features_scaled")

# define assembler
VA = VectorAssembler(inputCols=["cat_features_indexed", "num_features_scaled"], outputCol="features")

In [None]:
# define linear regression model
LR = LinearRegression(featuresCol="features", labelCol="label")

# define decision tree model
DT = DecisionTreeRegressor(featuresCol="features", labelCol="label")

# define random forest model
RF = RandomForestRegressor(featuresCol="features", labelCol="label")

<h5> Define Pipeline for each model and fit on data </h5>

In [None]:
# define linear regression model pipeline and fit on training data
LR_Pipeline = Pipeline(stages=[SC, VA, LR]).fit(train)

# define decision tree model pipeline and fit on training data
DT_Pipeline = Pipeline(stages=[SC, VA, DT]).fit(train)


# define random forest model pipeline and fit on data
RF_Pipeline = Pipeline(stages=[SC, VA, RF]).fit(train)
<h5> Define Pipeline for each model and fit on data </h5>

<h5> Get predictions on test set by applying each model pipeline on test data </h5>

In [None]:
# get predictions of linear regression model on test data
lr_preds = LR_Pipeline.transform(test)

# get predictions of decision tree model on test data
dt_preds = DT_Pipeline.transform(test)

# get predictions of random forest model on test data
rf_preds = RF_Pipeline.transform(test)
<h5> Get predictions on test set by applying each model pipeline on test data </h5>

#### Evaluation -lr

In [None]:
# define evaluator
lrEvaluator = RegressionEvaluator(labelCol="label", predictionCol="prediction") 

# Get different metrics using your created evaluator object
lrsq = lrEvaluator.evaluate(lr_preds, {lrEvaluator.metricName: 'r2'})
lrmae = lrEvaluator.evaluate(lr_preds, {lrEvaluator.metricName: 'mae'})
lrrmse = lrEvaluator.evaluate(lr_preds, {lrEvaluator.metricName: 'rmse'})
lrmse = lrEvaluator.evaluate(lr_preds, {lrEvaluator.metricName: 'mse'})

#### Cross validation