# Overview

Previously we have seen examples of running the k-means algorithm provided by scikit-learn. In this notebook we are going to look at the Apache Spark MLib implimentation instead. Unlike the models packaged with scikit-learn, Apache Spark models are built to be distributed and can parallelize calculations. This fact can cause some headaches due to the assumptions/design that the spark framework asserts. We will cover this is the gotcha section.

It assumes you have already read the following notebooks:
- [Install Apache Spark Prerequisites](Install%20Apache%20Spark%20Prerequisites.ipynb)
- [Spark Pi - The Hello World Example For Apache spark](Spark%20Pi%20-%20The%20Hello%20World%20Example%20For%20Apache%20spark.ipynb)
- [Intro To Koalas](Intro%20To%20Koalas.ipynb)
- <a href="../Cluster%20Analysis/K-Means.ipynb">Cluster Analysis/K-Means</a>

The instructions are basically the same as [Running Scikit-Learn Apache Spark](Running%20Scikit-Learn%20Apache%20Spark.ipynb) once you get the kubernetes stuff setup.

## Adjenda
1. Create SparkContext
2. Create Web Server To Host Data
3. Load The Data
8. Prepare Worker Nodes
9. Submit Python Code To Spark Cluster
10. Cleanup Spark and Kubernetes

## Gotchas

### Spark Doesnt Support Nested Parallelism

Apache Spark doesn't support any form of nesting in terms of spark managed parallelism. Distributed operations can be initialized only by the driver (ie. not in a worker process created by the driver). This includes access to distributed data structures, like Spark DataFrame, and execution of parallel processing functions, like training an MLlib algorithm.

If you tried to have a spark worker create and train an MLlib algorithm you would see the following error pop up:

```
AttributeError: Cannot load _jvm from SparkContext. Is SparkContext initialized?
```

This took me a while to figure out, but it is pointed out [here](https://coderedirect.com/questions/310003/run-ml-algorithm-inside-map-function-in-spark)

This is inconvenient as spark provides a number of useful mechanism for kicking off parallel processes based on our dataset. For example, in the previous notebook we use the groupby(criteria).apply(func) function to kick off a training job for each group of data in parallel. We would not be able to use this api with mlib as the function func is executed on the worker where a spark context is not found.

We can however manage the parallelism outside spark from the driver node. We can do this using multithreading or multiprocessing. Before we get into those topics, we need a refresher on operating system design: An operating system is in charge of running processes. The OS allocates memory and CPU resources for the process. Once allocated the process can utilize the resources as it likes. Multiprocessing is when a program asks the OS to spin up multiple "subprocesses" each with their own separate memory and cpu allocations. Multithreading is when a single process created multiple threads to execute work in parrallel instead of multiple processes. Multithreading allows for threads to share memory and cpu resources allocated to the process. Multiprocessing does not.


# 1. Create SparkContext

In [1]:
# Load a helper module
import importlib.util
spec = importlib.util.spec_from_file_location("spark_helper", "../../../Utilities/spark_helper.py")
spark_helper = importlib.util.module_from_spec(spec)
spec.loader.exec_module(spark_helper)

In [2]:
spark_app_name = "spark-jupyter-mlib"
docker_image = "tschneider/pyspark:v5"
k8_master_ip = "15.4.7.11"
spark_session = spark_helper.create_spark_session(spark_app_name, docker_image, k8_master_ip)
sc = spark_session.sparkContext

Setting SPARK_HOME
/opt/spark

Running findspark.init() function
['/opt/spark/python', '/opt/spark/python/lib/py4j-0.10.9-src.zip', '/usr/lib64/python36.zip', '/usr/lib64/python3.6', '/usr/lib64/python3.6/lib-dynload', '', '/usr/local/lib64/python3.6/site-packages', '/usr/local/lib/python3.6/site-packages', '/usr/lib64/python3.6/site-packages', '/usr/lib/python3.6/site-packages', '/usr/local/lib/python3.6/site-packages/IPython/extensions', '/root/.ipython']

Setting PYSPARK_PYTHON
/usr/bin/python3

Determining IP Of Server
The ip was detected as: 15.4.12.12

Configuring URL for kubernetes master
k8s://https://15.4.7.11:6443

Creating Spark Session

Done!


In [4]:
! kubectl -n spark get pods

NAME                                         READY     STATUS    RESTARTS   AGE
spark-jupyter-mlib-64173c7da4cf89cf-exec-1   1/1       Running   0          2m
spark-jupyter-mlib-64173c7da4cf89cf-exec-2   1/1       Running   0          2m
spark-jupyter-mlib-64173c7da4cf89cf-exec-3   1/1       Running   0          2m


# 2. Setup Datastore

In [15]:
import os

csv_file_name = "nasdaq_2019.csv"
csv_relative_file_path = "../../../Example Data Sets/{0}".format(csv_file_name)
csv_absolute_file_path = os.path.abspath(csv_relative_file_path)
csv_link_path = "/{0}".format(csv_file_name)

if os.path.exists(csv_link_path):
    if os.path.islink(csv_link_path):
        print("Symlink exists at {0}".format(csv_link_path))
    elif os.path.isfile(file):
        print("File exists at {0}".format(csv_link_path))
    else:
        raise Exception("Something is wrong. An object exists where we want to create a symlink.")
else:
    os.symlink(csv_absolute_file_path, csv_link_path)
    print("Symlink created as {0} -> {1}".format(csv_link_path, csv_absolute_file_path))

Symlink exists at /nasdaq_2019.csv


# 3. Create web server to host data

## 3.1. Determine the current working directory. 

Note: There is a trick to doing this inside a jupyter notebook and so we will use a special library to get that information.

In [16]:
import pyprojroot
project_root_dir  = pyprojroot.here()
print(project_root_dir)

/root/ml-training-jupyter-notebooks


## 3.2. Load the module for the webserver from our utilities directory

In [17]:
# Import the module for the web server we wrote
import importlib.util
spec = importlib.util.spec_from_file_location("PythonHttpFileServer", "../../../Utilities/PythonHttpFileServer.py")
PythonHttpFileServer = importlib.util.module_from_spec(spec)
spec.loader.exec_module(PythonHttpFileServer)

## 3.3. Configure logging

In [18]:
# Configure the logger and log level
import logging
logger = logging.getLogger()
logger.setLevel(logging.INFO)

## 3.4. Start the webserver in a new thread

In [19]:
import os

data_sub_dir = "Example Data Sets"
web_root = os.path.join(project_root_dir, data_sub_dir)

if not os.path.exists(web_root):
    raise Exception("The web root for the server does not exist.")

In [20]:
# Start the webserver in a thread so the cell is not stuck in a running state
import threading

var_exists = 'web_server_thread' in locals() or 'web_server_thread' in globals()
if not var_exists:
    web_server_port = 80
    web_server_args = (web_server_port, web_root)
    web_server_thread = threading.Thread(target=PythonHttpFileServer.run_server, args=web_server_args)
    web_server_thread.start()
else:
    print("Web Server thread already exists")
    print("To kill it you need to restart the kernel.")

INFO:root:Starting server on port 80
INFO:root:Web root specified as: /root/ml-training-jupyter-notebooks/Example Data Sets


 * Serving Flask app 'PythonHttpFileServer' (lazy loading)
 * Environment: production
[2m   Use a production WSGI server instead.[0m
 * Debug mode: off


INFO:werkzeug: * Running on http://15.4.12.12:80/ (Press CTRL+C to quit)


# 4. Load The Data

## 4.1. Add the file using Spark Context

In [21]:
ip_address = spark_helper.determine_ip_address()
csv_file_name = "nasdaq_2019.csv"
csv_file_url = "http://{0}:{1}/{2}".format(ip_address, web_server_port, csv_file_name)
print("Uploading file '{0}' to Spark cluster.".format(csv_file_url))
sc.addFile(csv_file_url)

INFO:root:Get /root/ml-training-jupyter-notebooks/Example Data Sets/nasdaq_2019.csv
INFO:werkzeug:15.4.12.12 - - [10/Dec/2021 14:54:20] "GET /nasdaq_2019.csv HTTP/1.1" 200 -


Uploading file 'http://15.4.12.12:80/nasdaq_2019.csv' to Spark cluster.


## 4.2. Use koalas to open the file on spark

Import the utility function to convert a date string to a datetime object from our utilities module

In [22]:
# Import the utilities module we wrote
import importlib.util
spec = importlib.util.spec_from_file_location("utilities", "../../../Utilities/utilities.py")
utilities = importlib.util.module_from_spec(spec)
spec.loader.exec_module(utilities)

# Define a mapping to convert our data field to the correct type
converter_mapping = {
    "date": utilities.convert_date_string_to_date
}

Load our OHCLV data Into a koalas dataframe and pull out a single day in the say way we would in pandas

In [24]:
# Avoid a warning
import os
os.environ["PYARROW_IGNORE_TIMEZONE"] = "1"
os.environ["SPARK_KOALAS_AUTOPATCH"] = "0"

from databricks import koalas
koalas_dataframe = koalas.read_csv(u"file:////nasdaq_2019.csv", converters=converter_mapping)

In [25]:
koalas_dataframe.head()

Unnamed: 0,ticker,interval,date,open,high,low,close,volume
0,AABA,D,2019-07-01,70.9,71.52,70.325,70.57,10234800
1,AAL,D,2019-07-01,33.14,33.6632,32.5301,32.88,8995100
2,AAME,D,2019-07-01,2.43,2.43,2.4,2.4,500
3,AAOI,D,2019-07-01,10.7,10.89,10.01,10.18,883100
4,AAON,D,2019-07-01,50.57,50.985,48.56,49.73,180200


## 4.3. Set some options

In [26]:
koalas.set_option('compute.ops_on_diff_frames', True)

# 5. Submit Python Code To Spark Cluster

In this section of the notebook we are going to apply the kmeans algorithm from sklearn to each date in our koalas_dataframe object.
To do this, we are going to write a function that applies the algorithm to a dataframe; the assumption being the dataframe only contains data related to the same date.
Note: Most of this is a review and reworking of the content contained in 
<a href="../Cluster%20Analysis/K-Means.ipynb">Cluster Analysis/K-Means.ipynb</a>.

## 5.1. Setup Utility Function
We create our data frame for testing based on a subset of our real data.

In [27]:
df_01_02_2019 = koalas_dataframe.loc[koalas_dataframe["date"] == '2019-01-02'].copy()
df_01_02_2019.head()

Unnamed: 0,ticker,interval,date,open,high,low,close,volume
96799,AABA,D,2019-01-02,56.78,58.01,56.47,57.49,10532400
96800,AAL,D,2019-01-02,31.46,32.65,31.05,32.48,5229400
96801,AAME,D,2019-01-02,2.43,2.49,2.43,2.49,1700
96802,AAOI,D,2019-01-02,15.0,16.29,14.85,15.88,478300
96803,AAON,D,2019-01-02,34.57,35.4,34.37,35.07,124800


We then write and test our function

In [28]:
import pandas
import pyspark
from pyspark.sql.functions import pandas_udf, udf
from pyspark.sql.types import *
from databricks import koalas
from pyspark.ml.clustering import KMeans

def perform_kmeans_on_dataframe(df, column_names):
    
    # Create a copy of our dataframe so we can play around
    tmp = df.copy()
    columns = column_names.copy()
    
    # Create our model
    model = KMeans().setK(5).setSeed(42)

    # Do some magic to get the data in the right format for the spark model
    from pyspark.ml.feature import VectorAssembler
    assembler = VectorAssembler(inputCols=column_names, outputCol="features")
    if type(tmp) == koalas.frame.DataFrame:
        model_parameters = assembler.transform(tmp[[*columns]].to_spark())
    elif type(tmp) == pandas.DataFrame:
        tmp = koalas.DataFrame(tmp)
        model_parameters = assembler.transform(tmp[[*columns]].to_spark())

    # Train the model
    trained_model = model.fit(model_parameters)
    
    # Extract the cluster information for the training data
    predictions = trained_model.transform(model_parameters)
    cluster_indices = predictions.select("prediction")
    cluster_indices = koalas.DataFrame(cluster_indices).to_numpy().reshape(-1)
    cluster_indices = koalas.Series(cluster_indices, index=tmp.index.to_numpy())
    tmp["cluster_indices"] = cluster_indices
    cluster_centroids = trained_model.clusterCenters()
    tmp["cluster_centroids"] = tmp["cluster_indices"].apply(lambda i: str(cluster_centroids[i]))

    return tmp


In [29]:
perform_kmeans_on_dataframe(df_01_02_2019, column_names=["open", "close"]).head()



Unnamed: 0,ticker,interval,date,open,high,low,close,volume,cluster_indices,cluster_centroids
96829,ACLS,D,2019-01-02,17.48,18.14,16.92,17.79,284200,0,[12.97707127 13.2648608 ]
96923,ALLK,D,2019-01-02,51.26,53.94,50.115,51.99,151900,3,[66.74787778 67.59067778]
97216,BRPAU,D,2019-01-02,10.55,10.55,10.55,10.55,0,0,[12.97707127 13.2648608 ]
97658,DWFI,D,2019-01-02,22.21,22.24,22.2,22.2,41300,0,[12.97707127 13.2648608 ]
97699,EFAS,D,2019-01-02,15.08,15.08,15.08,15.08,0,0,[12.97707127 13.2648608 ]


In [30]:
perform_kmeans_on_dataframe(df_01_02_2019.to_pandas(), column_names=["open", "close"]).head()



Unnamed: 0,ticker,interval,date,open,high,low,close,volume,cluster_indices,cluster_centroids
96829,ACLS,D,2019-01-02,17.48,18.14,16.92,17.79,284200,1,[13.76070419 14.05760128]
96923,ALLK,D,2019-01-02,51.26,53.94,50.115,51.99,151900,0,[73.51000825 74.42198144]
97216,BRPAU,D,2019-01-02,10.55,10.55,10.55,10.55,0,1,[13.76070419 14.05760128]
97658,DWFI,D,2019-01-02,22.21,22.24,22.2,22.2,41300,1,[13.76070419 14.05760128]
97699,EFAS,D,2019-01-02,15.08,15.08,15.08,15.08,0,1,[13.76070419 14.05760128]


## 5.2. Run Utility Function In Parrallel

Now that the utility function has been tested on smaller dataframes, we can run it on a large data set. We will time ourselves when running against a single day vs all days to show the operations are occurring in parrallel.

In [31]:
# Get a list of the dates in our dataframe
dates = koalas_dataframe["date"].unique().sort_values().to_numpy()
date1 = dates[0]
date1

'2019-01-01'

In [32]:
del koalas_dataframe

In [33]:
del df_01_02_2019

In [34]:
# Define a function to train our kmeans model which will be executed in each thread
import time

def thread_func(params, retries=5):
   
    # Retrieve params
    date = params[0]
    progress_bar = params[1]
    lock = params[2]
    result_file_path = params[3]
    thread_result = None
    try:
        # Load data
        thread_df = koalas.read_csv(u"file:////nasdaq_2019.csv", converters=converter_mapping)
                
        while True:
            try:
                # Train the model
                date_df = thread_df.loc[thread_df["date"] == date]    
                thread_result = perform_kmeans_on_dataframe(date_df, column_names=["open", "close"])

                # Force spark to not be lazy and to do the computation
                thread_result.shape  

                # Return results and timing info to the ThreadHelper
                return thread_result

            except Exception as e:
                retries -= 1
                if retries > 0:
                    time.sleep(1)
                else:
                    raise e
    finally:  
        # Update the progress bar
        lock.acquire()
        if thread_result is not None:
            thread_result.to_pandas().to_csv(result_file_path, mode='a')
        global completed_ops
        completed_ops += 1
        progress_bar.update(completed_ops)
        lock.release()

In [35]:
import progressbar

def create_progress_bar(num_ops):

    progress_bar_widgets = [
        progressbar.Bar('=', '[', ']'), 
        ' ', 
        progressbar.FormatLabel('Processed: %(value)d / {0} ops'.format(num_ops)),
        ' ', 
        progressbar.ETA()
    ]
    return  progressbar.ProgressBar(maxval=num_ops, widgets=progress_bar_widgets)

In [36]:
# Create a ThreadPool and kick off the parrallel training sessions
from multiprocessing.pool import ThreadPool
import itertools
from datetime import datetime
import threading

# Record the start time
start = datetime.now()
print("Starting: {0}".format(start))

# Create objects to help track multithreading progress
num_ops = len(dates[0:1])
bar = create_progress_bar(num_ops*2)
bar.start()
global completed_ops
completed_ops = 0

# Create vars to help with multi-threading
mutex = threading.Lock()
num_threads = 10
thread_pool = ThreadPool(num_threads)

# Create/cleanup a datafile to store results from threads
import os
result_file_path = "results.csv"
if os.path.exists(result_file_path):
    print("Cleaning up results")
    os.remove(result_file_path)

# Create an iterator of params for the thread function
iterator = zip(dates[0:1], 
               itertools.repeat(bar), 
               itertools.repeat(mutex), 
               itertools.repeat(result_file_path))

# Run training sessions for each data in parrallel
results = thread_pool.map(thread_func, iterator)

# Record the end time
calc_end = datetime.now()
calc_diff = (calc_end - start).total_seconds()
print("Ending: {0}".format(calc_end))
print("Total calculation time: {0}s".format(calc_diff))

# Merge the results
print("Load results")
merged_df = koalas.read_csv(result_file_path, converters=converter_mapping)

# Record the end time
merge_end = datetime.now()
merge_diff = (merge_end - calc_end).total_seconds()
print("Ending: {0}".format(merge_end))
print("Total Merge time: {0}s".format(merge_diff)) 

# Print wall time
wall = (merge_end - start).total_seconds()
print("Total wall time: {0}s".format(wall)) 

# Calculate the time per date
date_diff = wall / num_ops
print("Time per date: {0}s".format(date_diff))

# Show the union df
merged_df.loc[merged_df["date"] == dates[0]].head()

[                                         ] Processed: 0 / 2 ops ETA:  --:--:--

Starting: 2021-12-10 15:00:12.819677




Ending: 2021-12-10 15:01:02.082435
Total calculation time: 49.262758s
Load results
Ending: 2021-12-10 15:01:03.907149
Total Merge time: 1.824714s
Total wall time: 51.087472s
Time per date: 51.087472s


Unnamed: 0,ticker,interval,date,open,high,low,close,volume
93620,AABA,D,2019-01-01,57.94,57.94,57.94,57.94,0
93621,AAL,D,2019-01-01,32.11,32.11,32.11,32.11,0
93622,AAME,D,2019-01-01,2.41,2.41,2.41,2.41,0
93623,AAOI,D,2019-01-01,15.43,15.43,15.43,15.43,0
93624,AAON,D,2019-01-01,35.06,35.06,35.06,35.06,0


If anything went wrong, we should ask the SparkContext to kill any abandoned jobs that may be lingering in ghosted threads from the thread pool

In [None]:
sc.cancelAllJobs()

Now we can run a large set of dates using our threadpool

In [None]:
# Create a ThreadPool and kick off the parrallel training sessions
from multiprocessing.pool import ThreadPool
import itertools
from datetime import datetime
import threading

# Record the start time
start = datetime.now()
print("Starting: {0}".format(start))

# Create objects to help track multithreading progress
num_ops = len(dates)
bar = create_progress_bar(num_ops*2)
bar.start()
global completed_ops
completed_ops = 0

# Create vars to help with multi-threading
mutex = threading.Lock()
num_threads = 10
thread_pool = ThreadPool(num_threads)

# Create/cleanup a datafile to store results from threads
import os
result_file_path = "results.csv"
if os.path.exists(result_file_path):
    print("Cleaning up results")
    os.remove(result_file_path)

# Create an iterator of params for the thread function
iterator = zip(dates, 
               itertools.repeat(bar), 
               itertools.repeat(mutex), 
               itertools.repeat(result_file_path))

# Run training sessions for each data in parrallel
results = thread_pool.map(thread_func, iterator)

# Record the end time
calc_end = datetime.now()
calc_diff = (calc_end - start).total_seconds()
print("Ending: {0}".format(calc_end))
print("Total calculation time: {0}s".format(calc_diff))

# Merge the results
print("Load results")
merged_df = koalas.read_csv(u"file:////nasdaq_2019.csv", converters=converter_mapping)

# Record the end time
merge_end = datetime.now()
merge_diff = (merge_end - calc_end).total_seconds()
print("Ending: {0}".format(merge_end))
print("Total Merge time: {0}s".format(merge_diff)) 

# Print wall time
wall = (merge_end - start).total_seconds()
print("Total wall time: {0}s".format(wall)) 

# Calculate the time per date
date_diff = wall / num_ops
print("Time per date: {0}s".format(date_diff))

# Show the union df
merged_df.loc[merged_df["date"] == dates[0]].head()

Note, every time we see the csv file get reloaded, we know that a worker crashed. I was watching the linux OS hosing the kubernetes/spark workers. If the CPU/Memory got pegged at 100% utilization the worker would crash. 

In some cases the driver may run into an issue as follows:

```
Py4JJavaError: An error occurred while calling o689322.createOrReplaceTempView.
: java.lang.OutOfMemoryError: Java heap space
```

For example when merging all the tables.

If anything went wrong, we should ask the SparkContext to kill any abandoned jobs that may be lingering in ghosted threads from the thread pool

In [None]:
sc.cancelAllJobs()

We see that computing every date is almost as fast as computing a single date.

# 6. Cleanup Spark Cluster On Kubernetes

In [None]:
import os
if os.path.exists(csv_link_path) and os.path.islink(csv_link_path):
    os.unlink(csv_link_path)
    print("Deleted symlinked data file")

In [None]:
sc.stop()

In [None]:
! kubectl -n spark get pod