# (EX) PySpark example with linear regression model

This set of notes covers the estimation and prediction with a linear regression model, and the use of PySpark in order to parallelize and scale in computation.  In addition, the notes provide a brief commentary on privacy preserving machine learning models.

## Linear regression refresher

To fit a linear regression (without any penalty or regularization) can be done via the least-squares estimation:

$$\min_\beta~\ell(\beta) = \|\mathbf{y} - \mathbf{X} \beta \|_2^2,$$

The corresponding analytical solution can be found by setting the partial derivatives to zero:
$$\frac{\partial \ell(\beta)}{\partial \beta} \equiv 0 \qquad \Rightarrow \qquad \widehat{\beta} = (\mathbf{X}^\mathsf{T}\mathbf{X})^{-1} \mathbf{X}^\mathsf{T}\mathbf{y}.$$

*Why do we need to think about parallelization at all?*

Say we have $n$ data points, i.e. $(\mathbf{x}_i, y_i)_{i=1}^n$. Take a look at the dimension of the quantities of interest:
$$\mathbf{X} \in \mathbb{R}^{n \times (d+1)}, \mathbf{y} \in \mathbb{R}^n.$$

The major computations involve three operations:  
1. Multiplication: $\mathbf{X}^\mathsf{T}\mathbf{X} = \sum_{i=1}^n \mathbf{x}_i \mathbf{x}_i^\mathsf{T},$
2. Multiplication: $\mathbf{X}^\mathsf{T}\mathbf{y} = \sum_{i=1}^n \mathbf{x_i} y_i,$
3. Inversion of matrix: $(\mathbf{X}^\mathsf{T}\mathbf{X})^{-1} \mathbf{X}^\mathsf{T}\mathbf{y}.$

*Rationale:*  
- In the case where $n$ is extremely large, the multiplication and summation, steps (1) and (2), are slow.
- Since the inversion of matrix is driven by the size of $\mathbf{X}^\mathsf{T}\mathbf{X}$, which is typically a relatively small $(d+1) \times (d+1)$ matrix.  Step (3) is relatively cheap in terms of computation.


## Coding component with the diamonds dataset as an example

In [None]:
# You typically do not need this

import os
os.environ["JAVA_HOME"] = r"C:\Program Files\Java\jdk-11.0.2"
os.environ["SPARK_HOME"] = r"C:\Program Files\Spark\spark-3.5.5-bin-hadoop3"

import findspark
findspark.init()

### Diamonds dataset

In [None]:
import seaborn as sns
diam = sns.load_dataset('diamonds', cache=True, data_home='dataset/')

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

# reading data
diamonds = (
    spark.read.format('csv')
    .options(header='true', inferSchema='true')
    .load('dataset/diamonds.csv')
    .cache()
)

In [None]:
diamonds.take(5)

In [None]:
# subsetting dataset for analysis
df = (
    diamonds
    .where(diamonds['price'] > 1000)
    .select(['cut', 'color', 'carat', 'clarity', 'price'])
)

In [None]:
df.show(5)

In [None]:
row = diamonds.take(1)

In [None]:
# assigning predictors and response
predictors = ['cut', 'color', 'carat', 'clarity']
categorical = set(['cut', 'color', 'clarity'])
response = 'price'

### Map functions explained - collecting necessary information


The individual (row) component within the sum can be calculated as:

\begin{align}
    \mathbf{x}_i \mathbf{x}_i^\mathsf{T} = 
    \begin{pmatrix} 
    x_{i1}^2 & x_{i1} x_{i2} & \ldots & x_{i1} x_{id} \\
    \vdots & \ddots & \ldots & \vdots \\
    x_{id}x_{i1} & x_{id}x_{i2} & \ldots & x_{id}^2 
    \end{pmatrix},
    \qquad 
    \mathbf{x}_i y_i = 
    \begin{pmatrix} 
    x_{i1}y_i \\ x_{i2} y_i \\ \vdots \\ x_{id} y_i
    \end{pmatrix}.
\end{align}

To keep track of which value corresponds to each column, the `yield` function keeps a record of the column names as keys.

In [None]:
# map functions for step (1) and (2)
def xtx_map(row):
    row = row.asDict()
    for i in predictors:
        (ki, vi) = (i, row[i]) if i not in categorical else (i+"_"+row[i], 1.0)
        for j in predictors:
            (kj, vj) = (j, row[j]) if j not in categorical else (j+"_"+row[j], 1.0)
            yield ((ki, kj), vi * vj)

def xty_map(row):
    row = row.asDict()
    for j in predictors:
        (kj, vj) = (j, row[j]) if j not in categorical else (j+"_"+row[j], 1.0)
        yield (kj, vj * row[response])

In [None]:
row = df.take(1)
row[0]

In [None]:
# inspecting the result from applying xty_map to one row
[a for a in xty_map(row[0])]

In [None]:
# inspecting the result from applying xtx_map to one row
[a for a in xtx_map(row[0])]

In [None]:
xtx_data = (df.rdd
            .flatMap(xtx_map)
            .reduceByKey(lambda a, b: a+b)
            .collect()
           )

In [None]:
xty_data = (df.rdd
            .flatMap(xty_map)
            .reduceByKey(lambda a, b: a+b)
            .collect()
           )

In [None]:
xty_data

### Indexing - keeping track of which columns the values belong to

Individual calculations from distributed resources will return in **any** order, therefore, calculating the index of each column key is necessary.

In [None]:
index = dict(zip([r[0] for r in xty_data], range(len(xty_data))))
p = len(index)

index

In [None]:
# arrange the individual elements back into matrices
import numpy as np

XTY = np.zeros((p, 1))
for (k, v) in xty_data:
  XTY[index[k]] = v

XTX = np.zeros((p,p))
for ((k1,k2),v) in xtx_data:
  XTX[index[k1], index[k2]] = v

In [None]:
XTX.shape, XTY.shape

### Estimation of $\beta$

In [None]:
beta = np.linalg.solve(XTX, XTY)

In [None]:
beta  # beta hat

## Prediction with $\widehat{\beta}$

In [None]:
# digression: some handy shape-conforming functions in numpy
beta.shape
np.squeeze(beta).shape
np.atleast_3d(beta).shape

In [None]:
## a new row contains a set of predictors
def predict(row):
    row = row.asDict()
    pred = 0.0
    for j in predictors:
        (kj, vj) = (j, row[j]) if j not in categorical else (j+"_"+row[j], 1.0)
        pred += vj * beta[index[kj], 0]
    return float(pred)



In [None]:
from pyspark.sql.functions import mean

# calculating error
rmse = np.sqrt(
    df.rdd
    .map(lambda row: (row[response], predict(row)))
    .map(lambda y: (y[1] - y[0])**2)
    .mean()
)

In [None]:
rmse = np.sqrt(
    df.rdd
    .map(lambda row: (row[response] - predict(row))**2)
    .mean()
)

In [None]:
rmse

## User-defined functions (UDF)

In many scenarios, storing the calculations in a new column of the RDD dataframe is advantageous. In a similar way, the computations are not performed until you try to retrieve the results, e.g. via "collect". 

User-defined functions allow for a clean way to achieve this purpose.

In [None]:
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import FloatType

predict_udf = udf(predict, FloatType())

df = df.withColumn("pred", predict_udf(struct(predictors)))


In [None]:
df = df.withColumn("resid", df['pred'] - df['price'])

In [None]:
df.printSchema()

In [None]:
sns.histplot(df.sample(False, 0.1).select('resid').toPandas())