# STA 141B Data & Web Technologies for Data Analysis

### Lecture 6, 10/19/23, Concurrency


### Announcements

 - HW 2 is published. 
 - Midterm this Thursday. 

### Last week's topics

 - Numpy
 - Pandas

### Today's Topics

 - Concurrency
     - Threads and Processes
     - I/O-Concurrency
         - `threading`
     - CPU-Concurrency (Parallelization)
         - `multiprocessing` 
         - `Spark`
 
### References
- [SuperFastPython](https://superfastpython.com/thread-vs-process/) by Jason Brownlee
- [An introduction to parallel programming](https://sebastianraschka.com/Articles/2014_multiprocessing.html) by Sebastian Raschka
- [Speed Up Your Python Program With Concurrency](https://realpython.com/python-concurrency/) by Jim Anderson
- [An Intro to Threading in Python](https://realpython.com/intro-to-python-threading/) by Jim Anderson
- [3 Methods for Parallelization in Spark](https://towardsdatascience.com/3-methods-for-parallelization-in-spark-6a1a4333b473) by Ben Weber

### Threads and Processes

A computer can have multiple CPUs, each CPU has multiple cores (e.g., two quad-core CPUs). 
All the CPUs are connected to memory (e.g., 64G memory). 
CPU cores can execute in parallel. 

<div>
<img src="../images/fig1.png" alt="Drawing" style="width: 1000px;"/>
</div>

A __process__ is the *operating system’s spawned and controlled entity that encapsulates an executing application* ([Breshears: The Art of Concurrency](https://amzn.to/3J74TRr)). 

A __thread__ is a path of execution which belongs to a process. 

Each thread belongs to a process. In single-threaded processes, the process contains one thread. In multithreaded processes, the process contains more than one thread, and the process is accomplishing a number of things at the same time. 

Threads can share memory within a process. This means that functions executed in new threads can access the same data and state. These might be global variables or data shared via function arguments. As such, sharing state between threads is straightforward.

Threads are sometimes called lightweight processes because they have their own stack but can access shared data. 

<div>
<img src="../images/fig3.png" alt="Drawing" style="width: 1000px;"/>
</div>

On the other hand, processes are 'share nothing', i.e., they independently execute without sharing memory or state. This makes it easier to turn into a distributed application, but typically, sharing data between processes requires explicit mechanisms.

<div>
<img src="../images/fig4.png" alt="Drawing" style="width: 1000px;"/>
</div>

Python allows to execute code using the principle of global interpreter lock (GIL). This means that only one thread can be executed at a time. This simplifies implementation, but makes it more difficult to execute code concurrently. 

Today, we will explore the advantages of executing multiple processes and threads and discuss under what circumstances which approach is most adequate. 

There are two major kind of tasks, that will slow down your program: CPU-bound and I/O-bound.

I/O-bound tasks cause your program to slow down because it frequently must wait for input/output (I/O) from some external resource. They arise when your program interacts with other sources, i.e., when your are *requesting* data from another source. 

<div>
<img src=https://files.realpython.com/media/IOBound.4810a888b457.png style="width: 1700px;"/>
</div>

CPU-bound tasks are those that require a lot of *computational* effort to complete. 

<div>
<img src=https://files.realpython.com/media/CPUBound.d2d32cb2626c.png style="width: 1700px;"/>
</div>

We will use threads and the module `threading` for I/O-bound tasks and processes and the module `multiprocessing` for CPU-bound tasks.

### I/O-Concurrency

We’ll start with a non-concurrent version of this a I/O-bound task. Namely, we will use `request` to request data from a [website](https://anson.ucdavis.edu/~kramling/). For `requets.Session`, see [here](https://requests.readthedocs.io/en/latest/user/advanced/) and [docs](https://requests.readthedocs.io/en/latest/api/?#requests.Session). 

In [None]:
import requests, time

def download_site(url, session):
    session.get(url) # fetch information from url 
    # ... do something ... 
    
def download_all_sites(sites):
    session = requests.Session()
    [download_site(url, session) for url in sites]
        
def task():
    sites = ["https://anson.ucdavis.edu/~kramling/"] * 80
    start_time = time.time()
    download_all_sites(sites)
    print(time.time() - start_time)

In [None]:
task()

We will now use concurrent threads that accomplish the same task, retrieving information by executing `requests.get` more efficient. 

In [None]:
import concurrent.futures, threading

thread_local = threading.local() # instantiates thread to create local data (here: session-attr.)

def download_site(url):
    session = get_session()
    session.get(url)
    # ... do something ... 

def download_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
        executor.map(download_site, sites)

def get_session():
    '''Create a new requests.Session if there is none in thread_local'''
    if not hasattr(thread_local, "session"): 
        thread_local.session = requests.Session()
    return thread_local.session

In [None]:
task()

We have created a `concurrent.futures.ThreadPoolExecutor`. It creates five threads that are run concurrently. 

Also, each thread will become its own separate `requests.Session`. This is one of the interesting and difficult issues with threading. Because the operating system is in control of switching between threads, any data that is shared between the threads needs to be protected, or thread-safe. Unfortunately `requests.Session` is not thread-safe. If untreated, *race conditions* can produce hard-to-detect bugs. 

Here, we use `threading.local()` to instantiate an object that looks like a global but is specific to each individual thread. 

The code above is faster than the non-concurrent version, because the I/O-bound has been circumvented. 

<div>
<img src=https://files.realpython.com/media/Threading.3eef48da829e.png style="width: 1600px;"/>
</div>

### CPU-Concurrency (Parallelization)

The above example of concurrency run only on a single CPU. This is due to the GIL. The `multiprocessing` module breaks down that barrier and runs code across multiple CPUs. 

It does this by creating a new instance of the Python interpreter (a new process) to run on each CPU and then farming out part of your program to run on it.  Bringing up a separate Python interpreter is not as fast as starting a new thread in the current Python interpreter. 

In [None]:
import multiprocessing

session = None
multiprocessing.set_start_method("fork", True) # new process will be a copy from previous process

def set_global_session():
    global session
    if not session:
        session = requests.Session()

def download_site(url):
    session.get(url)
    # ... do something ... 

def download_all_sites(sites):
    with multiprocessing.Pool(initializer=set_global_session, processes = 1) as pool: # change processes!
        pool.map(download_site, sites)
    
task()

We may even be faster than with `threading`, since we are running up to 8 processes in parallel, but `multiprocessing` cannot scale beyond the number of cores in your local machine. 
The default argument `processes` of `multiprocessing.pool.Pool` ([docs](https://docs.python.org/3/library/multiprocessing.html?#module-multiprocessing.pool)), is the number of available cores on the machine. 

Remember that each process has its own memory space (share-nothing). That means that they cannot share things like a `requests.Session` object. We don’t want to create a new Session each time the function is called, you want to create one for each process, which calls `request.get` multiple times after another. 

The `initializer` function parameter is built for just this case. We initialize a global `session` variable to hold the single `requests.Session` *for each process*. Because each process has its own memory space, the global for each one will be different.

<div>
<img src=https://files.realpython.com/media/MProc.7cf3be371bbc.png style="width: 1600px;"/>
</div>

This course will deal with fetching information from the web, usually via requests. While these requests will usually be bound by third partys who maintain the servers we are requesting from as well, they are in essence I/O-bound task. `multiprocessing` however is useful for CPU-bound tasks. Consequently, lets consider a computational problem. 

For the purposes of our example, we’ll use a somewhat silly function to create something that takes a long time to run on the CPU. This function computes the sum of the squares of each number from 0 to the passed-in value:

In [None]:
def problem(number):
    return sum(i * i for i in range(number))

def find_sums(numbers):
    for number in numbers:
        problem(number)

def task():
    numbers = [5_000_000 + x for x in range(20)]
    start_time = time.time()
    find_sums(numbers)
    print(time.time() - start_time)

In [None]:
task()

This code calls `cpu_bound` 20 times with a different large number each time. It does all of this on a single thread in a single process on a single CPU. The execution timing diagram looks like this:

<div>
<img src=https://files.realpython.com/media/CPUBound.d2d32cb2626c.png style="width: 1600px;"/>
</div>

Since there is no I/O waiting time, `threading` will not speed up this problem. We can however speed the computation by using our multiple cores. 

In [None]:
# multiprocessing.set_start_method("fork", True) # has already been set
def find_sums(numbers):
    pool = multiprocessing.Pool()
    return pool.map(problem, numbers)

In [None]:
task()

This code is similar as for the I/O-bound problem, but here you don’t need to worry about the `requests.Session` object. Notably, the speed-up is not equal to the number of cores, as each process has to set up its own Python interpreter. 

<div>
<img src=https://files.realpython.com/media/CPUMP.69c1a7fad9c4.png style="width: 1600px;"/>
</div>

While this codes is easy and fast. All the single processes are automatically taken care of with `multiprocessing.Pool`. The results returned by `problem` are gathered by `multiprocessing.map` as a <kbd>list</kbd> type. 

In [None]:
find_sums(range(0, 4)) # returns a list

However, many solution require communication between the processes. This can add some complexity to your solution that a non-concurrent program would not need to deal with.

A `multiprocessing.Queue` object provides a mechanism to pass data between a parent process and the descendent processes of it. It adheres the *first in first out* principle. We can retrieve with the `multiprocessing.get` method and set with the `multiprocessing.put` method. 

<div>
<img src=https://media.geeksforgeeks.org/wp-content/uploads/multiprocessing-python-4.png style="width: 1400px;"/>
</div>

In [None]:
q = multiprocessing.Queue()
def myfun(q, i): 
    q.put(i)

In [None]:
# initialize process
processes = [multiprocessing.Process(target=myfun, args = (q, i)) for i in range(4)]
processes

In [None]:
!ps aux | grep "[1]4753"

The processess are initialized but not yet executed. 

In [None]:
# run processes
for process in processes:
    process.start()

In [None]:
# join processes
for process in processes:
    process.join()

The main purpose of `multiprocessing.join` ([docs](https://docs.python.org/3/library/multiprocessing.html?#multiprocessing.Process.join)) is to ensure that a child process has completed before the main process does anything that depends on the work of the child process.

In [None]:
processes

The value `exitcode=0` means that the process has been completed successfully, without error ([docs](https://docs.python.org/3/library/multiprocessing.html?#multiprocessing.Process.exitcode)). 

In [None]:
q.empty()

In [None]:
[q.get() for process in processes]

In [None]:
q.empty()

While `multiprocessing` allows you to steer the processes directly, many statistical problems are already implemented and ready for parallel computing, e.g. via `Spark`. 

### Spark

Apache Spark is a computational engine that works with huge sets of data by processing them in parallel and batch systems. Spark is written in Scala, and [PySpark](https://www.dominodatalab.com/data-science-dictionary/pyspark) was released to support the collaboration of Spark and Python. 

The key data type used in PySpark is the Spark dataframe. This object can be thought of as a table distributed across a cluster, and has functionality that is similar to dataframes in `pandas`. If you want to do distributed computation using `PySpark`, then you’ll need to perform operations on Spark dataframes and not other Python data types.

Here we explore how to perform tasks using `PySpark`.

In [None]:
!pip install pyspark

We consider a simple regression model for predicting house prices. Lets consider the non-serialized version first: 

In [None]:
import numpy as np
import pandas as pd

In [None]:
# load the california housing data set
from sklearn.datasets import fetch_california_housing
housing = fetch_california_housing()

# convert to a Pandas Data Frame
housing_pd = pd.DataFrame(data= np.c_[housing['data'],housing['target']], 
                          columns= np.append(housing['feature_names'], 'target')).sample(frac=1)

In [None]:
housing_pd.shape

In [None]:
housing_pd.head(5)

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext('local')
spark = SparkSession(sc)

In [None]:
# convert to a Spark data frame
housing_sp = spark.createDataFrame(housing_pd)

In [None]:
# convert to vector representation for MLlib
assembler = VectorAssembler(inputCols=housing_sp.schema.names, 
                            outputCol="features" )
housing = assembler.transform(housing_sp).select('features', 'target') 

In [None]:
# linear regresion with Spark
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

In [None]:
# linear regression with penalization
lr = LinearRegression(labelCol="target", featuresCol="features", 
                      elasticNetParam = 1.0,
                      standardization = False, 
                      fitIntercept=False)

In [None]:
lrparamGrid = (ParamGridBuilder()
               .addGrid(lr.regParam, [0.001, 0.01, 0.1, 0.5, 1.0, 2.0])
               .build())

In [None]:
# Evaluate model
lrevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="target", metricName="mse")

In [None]:
# Create 5-fold CrossValidator
lrcv = CrossValidator(estimator = lr,
                      estimatorParamMaps = lrparamGrid,
                      evaluator = lrevaluator,
                      numFolds = 5)

In [None]:
# Run cross validations
lrcvModel = lrcv.fit(housing)

See [docs](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.LinearRegressionTrainingSummary.html). 

In [None]:
# Get Model Summary Statistics
lrcvSummary = lrcvModel.bestModel.summary
lrcvSummary.meanSquaredError

In [None]:
lrcvModel.bestModel.coefficients

### Summary 

- There are I/O- and CPU-bound problems
- Use `threading` for I/O-bound problems, `multiprocessing` for CPU-bound problems
- Communication between processes is cumbersome
- For many CPU-bound tasks, there may be implemented solutions. 