< [Data Preprocessing and Visualization](../ica04/Data_Preprocessing_and_Visualization.ipynb) | Contents (TODO) |  [Distance and Similarity](../ica06/Distance_and_Similarity.ipynb) >

<a href="https://colab.research.google.com/github/stephenbaek/bigdata/blob/master/in-class-assignments/ica05/Supervised_Learning.ipynb"><img align="left" src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open in Colab" title="Open and Execute in Google Colaboratory"></a>

# Introduction to Supervised Learning for Big Data

In this example, we will take a look at the issues regarding supervised learning in the context of big data. Especially, the computational speed is the major concern we will address here.

Before we begin, here's a simple trick you can use to measure the time elapsed for an operation.

In [None]:
import time

In [None]:
start_time = time.time()
time.sleep(3)  # an operation you want to evaluate
elapsed_time = time.time() - start_time
print('Elapsed time: {} seconds'.format(elapsed_time))

With this simple trick in hands, let's measure how long it takes to solve a linear system.

First, let us consider matrices $X\in \mathbb{R}^{N\times d}$ and $Y \in \mathbb{R}^{N\times 1}$ for some positive integers $N$ and $d < N$.

In [None]:
import numpy as np

In [None]:
N = 10000
d = 500
X = np.random.normal(loc=0, scale=1, size=[N, d])
Y = np.random.normal(loc=0, scale=1, size=[N, 1])

print(X)
print(Y)

For a linear system of equations $Y = XA$, the least square solution to this system is known as:

\begin{equation*}
A = ((X^\top X)^{-1}X^\top)Y
\end{equation*}

To compute this, a straightforward approach would be to (1) compute $X^\top X$ first, (2) take the inverse $(X^\top X)^{-1}$, (3) multiply $X^\top$ to the result, and finally (4) multiply $Y$. The following is an analysis of how much of computational time is requred for each of the steps.

In [None]:
start_time = time.time()
XTX = np.matmul(X.T, X)
XTX_elapsed_time = time.time() - start_time
print('Elapsed time for XTX: {} seconds'.format(XTX_elapsed_time))

start_time = time.time()
inv = np.linalg.inv(XTX)
inv_elapsed_time = time.time() - start_time
print('Elapsed time for the inverse: {} seconds'.format(inv_elapsed_time))

start_time = time.time()
invXT = np.matmul(inv, X.T)
invXT_elapsed_time = time.time() - start_time
print('Elapsed time for the inverse times XT: {} seconds'.format(invXT_elapsed_time))

start_time = time.time()
A = np.matmul(invXT, Y)
A_elapsed_time = time.time() - start_time
print('Elapsed time for the inverse times XT times Y: {} seconds'.format(A_elapsed_time))

print('Total: {} seconds'.format(XTX_elapsed_time + inv_elapsed_time + invXT_elapsed_time + A_elapsed_time))

Now, a simple trick can make a huge difference in computational time. Consider the same equation as above, but this time, let us switch the order of computation a little bit.

\begin{equation*}
A = (X^\top X)^{-1}(X^\top Y)
\end{equation*}

That is, this time, we are going to (1) compute $X^\top X$ first, (2) take the inverse $(X^\top X)^{-1}$, (3) compute $X^\top Y$, and finally (4) multiply $(X^\top X)^{-1}$ and $X^\top Y$. Steps (1) and (2) are the same, but (3) and (4) is in different order. Let's take a look at how much time is required to compute the solution with this strategy.

In [None]:
start_time = time.time()
XTX = np.matmul(X.T, X)
XTX_elapsed_time = time.time() - start_time
print('Elapsed time for XTX: {} seconds'.format(XTX_elapsed_time))

start_time = time.time()
inv = np.linalg.inv(XTX)
inv_elapsed_time = time.time() - start_time
print('Elapsed time for the inverse: {} seconds'.format(inv_elapsed_time))

start_time = time.time()
XTY = np.matmul(X.T, Y)
XTY_elapsed_time = time.time() - start_time
print('Elapsed time for XTY: {} seconds'.format(XTY_elapsed_time))

start_time = time.time()
A = np.matmul(inv, XTY)
A_elapsed_time = time.time() - start_time
print('Elapsed time for the inverse times XTY: {} seconds'.format(A_elapsed_time))

print('Total: {} seconds'.format(XTX_elapsed_time + inv_elapsed_time + XTY_elapsed_time + A_elapsed_time))

Notice the significant reduction of computation time?

### Assignment
- Which step shows the greatest difference?
- Why?
- Fix $d = 500$ but try to increase $N$ from 10,000 to 20,000, 50,000, and 100,000. How does the computation time chanbge? Is there any trend?
- Fix $N = 10000$ but increase $d$ from 500 to 1,000, 2,000, and 5,000. How does the computation time change? Is there any trend?

### Note: Advanced Profiling

Measuring times for running operations part by part is called profiling. Using `time` library is quite simple, but sometimes we may need some more advanced method. For example, you may have already noticed that the computation time of the same code can vary each time you run the code.

One way of profiling your code is by using `%timeit` tag in front of the line you want to evaluate. For example:
```python
%timeit inv = np.linalg.inv(XTX)
```
runs `inv = np.linalg.inv(XTX)` multiple times and take the average and standard deviation of the computation time.

Another way of doing it is by using `%prun` tag in front of the line. For instance:
```python
%prun inv = np.linalg.inv(XTX)
```
will provide more in-depth breakdown of the process. If you are, however, not so familiar with computer programing, `%prun` might be too much, as it gives too detailed information. In this case, you should just be fine with `%timeit` or the `time.time()` method.

In [None]:
%timeit inv = np.linalg.inv(XTX)

In [None]:
%prun inv = np.linalg.inv(XTX)

## Distributed Optimization

Despite the computational trick above, it may still be difficult to compute the pseudo-inverse ($(X^\top X)^{-1}X^\top Y$) due to limitations in the memory space, etc. In fact, many big data analytics frameworks (including Spark) use distributed optimization strategies. Below, we are going to see an example of distributed gradient descent algorithm to get a feel of how things work.

[Gradient descent](https://en.wikipedia.org/wiki/Gradient_descent) is a popular first order optimization algorithm for finding the local minimum. We are not going to delve into what gradient descent algorithm really is, but here is a iterative update formula used by gradient descent algorithm:

$\theta \longleftarrow \theta - \alpha \frac{\partial\mathcal{L}}{\partial \theta}$

where $\theta$ is model parameters ($w$ and $b$ in case of linear regression). $\frac{\partial\mathcal{L}}{\partial \theta}$ is the first order derivative (gradient) of the loss (error) function $\mathcal{L}$ with respect to the model parameter $\theta$.

Recall, the loss function is formulated as:

$\mathcal{L}(w, b) = \frac{1}{N}\sum_{i=1}^N\|x^{(i)}w+b - y^{(i)}\|^2$

If we expand the summation symbol, the loss function will look like this:

$\mathcal{L} = \mathcal{L}^{(1)} + \mathcal{L}^{(2)} + \cdots + \mathcal{L}^{(N)}$

Note $\mathcal{L}^{(i)}$ indicates the $i$-th term in the loss function, corresponding to the $i$-th data point.
When we omit the superscripts for simplicity, each term $\mathcal{L}^{(i)}$ should look like this:

$\mathcal{L}^{(i)}=\frac{1}{N}\|xw+b - y\|^2=\frac{1}{N}\|\hat{y} - y\|^2$

Here a new notation $\hat{y}:= xw+b$ has been introduced to indicate the output predicted by the linear regression model. Under this notation, it takes only simple calculus to compute the first order derivatives:

$\frac{\partial\mathcal{L}^{(i)}}{\partial w} = \frac{2}{N}(\hat{y} - y) x^\top$

$\frac{\partial\mathcal{L}^{(i)}}{\partial b} = \frac{2}{N}(\hat{y} - y)$

Recall that the above derivatives are for the $i$-th term in the loss function, associated only with the $i$-th data point. In other words, the global gradient (the gradient of the entire loss function) is simply the summation of the local gradients (the gradient of $i$-th term), which can be computed independently of the other data points:

$\frac{\partial\mathcal{L}}{\partial W} = \frac{\partial\mathcal{L}^{(1)}}{\partial W} + \frac{\partial\mathcal{L}^{(2)}}{\partial W} + \cdots + \frac{\partial\mathcal{L}^{(N)}}{\partial W}$

This means that, no mater how data is distributed, we can simply compute the $i$-th local gradient at each data point $(i)$ and later on aggregate them to produce the global gradient.

To understand the concept more clearly, here's an example of 10,000 data points splitted into two (simulated) computing nodes. First, let us create a random data set with $N=10,000$ and $d=10%.

In [None]:
N = 10000
d = 10
X = np.random.normal(loc=0, scale=1, size=[N, d])
Y = np.random.normal(loc=0, scale=1, size=[N, 1])

Now, here's how we split the data set.

In [None]:
N_node1 = N//2
N_node2 = N - N_node1

X_node1 = X[:N_node1, :]
Y_node1 = Y[:N_node1]
X_node2 = X[N_node1:, :]
Y_node2 = Y[N_node1:]

Although, we do not physically split the data set, we pretend `X_node1` and `Y_node1` are only accessible from `node1` and `X_node2` and `Y_node2` are only accessible from `node2`. Now, at a `master` node, the following routine will be executed:

In [None]:
# Initialize model parameters
w = np.random.uniform(-1.0, 1.0, [d,1])
b = np.random.uniform(-1.0, 1.0)

MAX_ITER = 1000
learning_rate = 0.01
for i in range(MAX_ITER):
    # Talk to node1 and ask it to compute the local gradient.
    # (Pretend the following three lines are computed on node1)
    err_node1 = np.matmul(X_node1, w) - Y_node1
    dldw_node1 = np.mean(2*err_node1*X_node1, axis=0, keepdims=True)
    dldb_node1 = np.mean(2*err_node1, axis=0, keepdims=True)

    # Simultaneously, talk to node2 and ask the same.
    # (Pretend the following three lines are computed on node2)
    err_node2 = np.matmul(X_node2, w) - Y_node2
    dldw_node2 = np.mean(2*err_node2*X_node2, axis=0, keepdims=True)
    dldb_node2 = np.mean(2*err_node2, axis=0, keepdims=True)

    # Aggregate the gradients by weighting them with the number of data available at each node.
    dldw = (N_node1/N)*dldw_node1 + (N_node2/N)*dldw_node2
    dldb = (N_node1/N)*dldb_node1 + (N_node2/N)*dldb_node2

    # Update the model with the global gradient.
    w -= learning_rate*dldw.T
    b -= learning_rate*dldb.T

    # If the solution does not improve much, break out of the for loop
    if np.mean(dldw) < 1e-06:
        break

Of course, the above is quite simplified implementation of distributed gradient descent. However, with such a simple version, we can see the result is almost the same as the pseudo-inverse solution.

In [None]:
XTX = np.linalg.inv(np.matmul(X.T, X))
XTY = np.matmul(X.T, Y)
w_true = np.matmul(XTX, XTY)
print(w)   # distributed gradient descent solution
print(w_true)  # pseudo-inverse solution (ground truth)

With the above example, I hope the concept is clear now in your mind. Even if you are still not so confident if you could actually right a code like the above from the scratch, you should be completely fine, as far as you have a clear big picture. In fact, the implementation of the distributed optimization and such are taken cared by Spark. Instead, as a data scientist, you just need to have a basic understanding of how it actually works under the hood. So don't worry.

## Linear Regression in Spark

Spark has all the powerful, well-optimized implementation of such distributed optimization methods (and many more) under the hood. In fact, from the user's perspective, 99% of the times, you don't really need to care about what's happening behind the scene. Spark will pick the most suitable optimization algorithm for you and do all the heavy-lifting behind the scene.

To see how it works, let's first configure Spark on Colab. (If you are running this notebook on your local machine and have configured Spark already, you can skip this cell.)

In [None]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q https://www-us.apache.org/dist/spark/spark-2.4.7/spark-2.4.7-bin-hadoop2.7.tgz
!tar xf spark-2.4.7-bin-hadoop2.7.tgz
!pip install -q findspark

import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.7-bin-hadoop2.7"

import findspark
findspark.init()

Now, let's download a dataset to play with. For this tutorial, we will use the [wine quality dataset](https://archive.ics.uci.edu/ml/datasets/Wine+Quality) from the [UCI Machine Learning Repository](https://archive.ics.uci.edu/ml/index.php).

In [None]:
!wget https://archive.ics.uci.edu/ml/machine-learning-databases/wine-quality/winequality-red.csv

In the previous lecture, we learned how to read a CSV file as a Spark DataFrame. Here we repeat what we've learned:

In [None]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.appName('wine').getOrCreate()

In [None]:
df = spark.read.csv('winequality-red.csv', header=True, sep=';', inferSchema=True)
df.show()

Note that `sep` argument is set to `';'`. This is because the wine quality dataset is written in a weird (?) convention where the values are separated by semicolon (;) instead of comma (,).

Now, in order to use Spark ML library, you must first convert columns into a feature vector. For this data set, we are supposed to predict the quality grade of a wine (last column) using features such as acidity, sulfur dioxide contents, pH, density, etc. (all the other columns).

To this end, Spark provides a handy method called `VectorAssembler` to produce a feature vector by assembling DataFrame columns.

In [None]:
from pyspark.ml.feature import VectorAssembler
# Creates a new column called 'features' that contains feature vectors.
assembler = VectorAssembler(inputCols=df.columns[:-1], outputCol="features")
df_vec = assembler.transform(df)
df_vec.show()

Now that we created feature vectors, let us split the dataset into two---a training set and a test set. Spark DataFrame offers you an off-the-shelf method to do so:

In [None]:
df_train, df_test = df_vec.randomSplit([0.7, 0.3])  # 70% of the original data will be used for training and 30% for testing

Now, the actual learning part is quite simple as described below.

In [None]:
from pyspark.ml.regression import LinearRegression
lr = LinearRegression(featuresCol='features', labelCol='quality') 
lr_model = lr.fit(df_train)

After the training is done, results can be found by calling members of the model such as `lr_model.coefficients` or `lr_model.intercept`. For more details, see https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegressionModel

In [None]:
print( lr_model.coefficients )  # slope of the linear equation
print( lr_model.intercept )     # intercept of the line

In addition, you can also view the summary of training by calling members of the model summary (`lr_model.summary`). Again, I'll leave the full list of functions to [this URL](https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#pyspark.ml.regression.LinearRegressionModel), but only show some selected examples instead.

In [None]:
lr_model.summary.residuals

In [None]:
lr_model.summary.rootMeanSquaredError
# lr_model.summary.meanAbsoluteError
# lr_model.summary.meanSquaredError

In [None]:
lr_model.summary.coefficientStandardErrors

In [None]:
lr_model.summary.r2
lr_model.summary.r2adj
# lr_model.summary.pValues

A model trained using Spark can be tested by using `evaluate()` method.

In [None]:
evaluation_summary = lr_model.evaluate(df_test)

Evaluation summary object is essentially the same as the model summary object. In other words, what you've seen above are applicable to the evaluation summary:

In [None]:
evaluation_summary.rootMeanSquaredError

In [None]:
lr_model.summary.totalIterations

< [Data Preprocessing and Visualization](../ica04/Data_Preprocessing_and_Visualization.ipynb) | Contents (TODO) |  [Distance and Similarity](../ica06/Distance_and_Similarity.ipynb) >

<a href="https://colab.research.google.com/github/stephenbaek/bigdata/blob/master/in-class-assignments/ica05/Supervised_Learning.ipynb"><img align="left" src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open in Colab" title="Open and Execute in Google Colaboratory"></a>