# STA 141B Data & Web Technologies for Data Analysis

### Lecture 6, 10/15/24, Concurrency


### Announcements

 - Midterm this Thursday. 

### Last week's Topics

 - `numpy`
 - `pandas`

### Reading Data

Pandas provides functions for reading (and writing) a variety of common formats. Most of their names begin with `read_`. For instance, we can read the dogs data from a CSV file:

In [162]:
dogs = pd.read_csv("../data/dogs_full.csv")

In [163]:
dogs.head()

Unnamed: 0,breed,group,datadog,popularity_all,popularity,lifetime_cost,intelligence_rank,longevity,ailments,price,food_cost,grooming,kids,megarank_kids,megarank,size,weight,height
0,Border Collie,herding,3.64,45,39.0,20143.0,1.0,12.52,2.0,623.0,324.0,weekly,low,1.0,29.0,medium,,20.0
1,Border Terrier,terrier,3.61,80,61.0,22638.0,30.0,14.0,0.0,833.0,324.0,weekly,high,2.0,1.0,small,13.5,
2,Brittany,sporting,3.54,30,30.0,22589.0,19.0,12.92,0.0,618.0,466.0,weekly,medium,3.0,11.0,medium,35.0,19.0
3,Cairn Terrier,terrier,3.53,59,48.0,21992.0,35.0,13.84,2.0,435.0,324.0,weekly,high,4.0,2.0,small,14.0,10.0
4,Welsh Springer Spaniel,sporting,3.34,130,81.0,20224.0,31.0,12.49,1.0,750.0,324.0,weekly,high,5.0,4.0,medium,,18.0


### Inspecting Data

Series and data frames provide many of the same methods and attributes as NumPy arrays.

For a data frame, the `.dtypes` attribute gives the column types.

The type "object" means some non-numeric Python object, often a string.

In [164]:
dogs.dtypes

breed                 object
group                 object
datadog              float64
popularity_all         int64
popularity           float64
lifetime_cost        float64
intelligence_rank    float64
longevity            float64
ailments             float64
price                float64
food_cost            float64
grooming              object
kids                  object
megarank_kids        float64
megarank             float64
size                  object
weight               float64
height               float64
dtype: object

There are also several methods for quickly summarizing data.

In [165]:
dogs.describe()

Unnamed: 0,datadog,popularity_all,popularity,lifetime_cost,intelligence_rank,longevity,ailments,price,food_cost,megarank_kids,megarank,weight,height
count,87.0,172.0,87.0,91.0,132.0,135.0,148.0,146.0,87.0,87.0,87.0,86.0,159.0
mean,2.603678,87.122093,44.0,19819.538462,40.924242,10.956741,1.216216,876.815068,489.597701,43.954023,43.942529,44.97093,19.089623
std,0.570288,50.205335,25.258662,3102.475382,19.60356,1.995742,1.54981,461.172524,204.266894,25.288065,25.278153,35.52707,6.0124
min,0.99,1.0,1.0,12653.0,1.0,6.29,0.0,283.0,270.0,1.0,1.0,5.0,5.0
25%,2.185,43.75,22.5,17816.5,27.0,9.7,0.0,587.25,324.0,22.5,22.5,17.5,14.0
50%,2.71,87.5,44.0,20087.0,42.0,11.29,1.0,795.0,466.0,44.0,44.0,35.0,19.0
75%,3.035,130.25,65.5,21798.0,54.25,12.365,2.0,1042.25,466.0,65.5,65.5,62.5,24.125
max,3.64,173.0,87.0,26686.0,80.0,16.5,9.0,3460.0,1349.0,87.0,87.0,175.0,32.0


First, get the string columns (`object`), then describe

In [168]:
dogs.select_dtypes(include = ["object"]).describe()

Unnamed: 0,breed,group,grooming,kids,size
count,172,172,112,112,172
unique,172,7,3,3,3
top,Border Collie,terrier,weekly,high,medium
freq,1,28,88,67,60


In [169]:
dogs.select_dtypes(include = ["int64"]).describe()

Unnamed: 0,popularity_all
count,172.0
mean,87.122093
std,50.205335
min,1.0
25%,43.75
50%,87.5
75%,130.25
max,173.0


### Aggregation

Pandas also provides several methods for aggregating data, such as `.mean()`, `.median()`, `.std()`, and `.value_counts()`. They ignore missing values by default.

In [None]:
dogs.median(numeric_only=True)

In [None]:
dogs["price"].median()

In [None]:
dogs["group"].value_counts() # like R's table() with 1 arg

For counting one group against another (crosstabulating), use `pd.crosstab()`.

In [None]:
pd.crosstab(dogs["group"], dogs["kids"]) # like R's table() with 2+ arg

### Applying Functions

You can also use Pandas to apply your own aggregation functions to columns or rows.

* `.apply()` applies a function column-by-column or row-by-row.
* `.applymap()` applies a function element-by-element.

This is another way to vectorizing code, but only works for DataFrame. 


In [None]:
def spread(x):
    '''Returns spread. Input is a single column (or row)'''
    return x.max() - x.min()
    
dogs.select_dtypes(include = ["float64", "int64"]).apply(spread)

### Grouping

Use the `.groupby()` method to group data before computing aggregate statistics.

In [None]:
dogs.head()

In [None]:
dogs.groupby("group").mean(numeric_only=True).reset_index()

By default, the groups become the index. You can keep them as regular columns by setting `as_index = False` when grouping.

In [None]:
dogs.groupby("group", as_index = False).mean(numeric_only=True)

You can group by multiple columns.

In [None]:
dogs.groupby(["group", "kids"]).mean(numeric_only=True).head()

On groups, the `.apply()` method computes group-by-group. It is the most general form of two other methods:

* `.agg()`, which applies a function to each group to compute summary statistics
* `.transform()`, which applies a function to each group to compute transformations (such as standardization)

## Tidying a Dataset

Do Americans prefer low fat milk over whole milk?

The USDA publishes data about dairy production. We can answer the question with the [Milk Sales Dataset](https://www.ers.usda.gov/webdocs/DataFiles/48685/fluidmilk.xlsx?v=5010.6).

Many of Python's visualization packages expect [tidy data](https://vita.had.co.nz/papers/tidy-data.pdf), which means:

1. Each feature must have its own column.
2. Each observation must have its own row.
3. Each value must have its own cell.

Let's tidy up the Milk Sales Dataset so we can make a line plot that shows how milk sales have changed over time.

In [None]:
import numpy as np
import pandas as pd

In [None]:
milk = pd.read_excel("../data/fluidmilk.xlsx")
milk.head()

In [None]:
milk = pd.read_excel("../data/fluidmilk.xlsx", skiprows = 1)
milk.head()

In [None]:
milk.columns

In [None]:
milk.columns = milk.columns.str.replace('\n', '')
milk.head()

In [None]:
milk = milk.rename(columns=lambda x: x.strip(' 012'))
milk.head()

In [None]:
milk = milk.rename(columns = {'Unnamed:': 'Year'})
milk.head()

In [None]:
milk.columns.values[[2,3,5,6]] = np.array(['Reduced', 'Low', 
                                            'Flavored Whole', 'Flavored Other'])

In [None]:
milk.head()

In [None]:
milk.dtypes

In [None]:
milk = milk.set_index('Year')  

In [None]:
milk.head()

In [None]:
milk = pd.read_excel("../data/fluidmilk.xlsx", skiprows = 1)
milk.columns = milk.columns.str.replace('\n', '')
milk = milk.rename(columns=lambda df: df.strip(' 12'))
milk.columns.values[[0,2,3,5,6]] = np.array(['Year', 'Reduced', 'Low', 
                                             'Flavored Whole', 'Flavored Other'])
milk = milk[:-4] # get rid of the last four rows
milk = milk.set_index("Year") 
milk.head()

In [None]:
milk = milk.stack() 
milk

In [None]:
milk.index

In [None]:
milk = milk.reset_index()
milk

In [None]:
milk.columns.values[[False, True, True]] = np.array(["Kind", "Sales"])

In [None]:
milk.head()

In [None]:
milk.tail()

### Today's Topics

 - Concurrency
     - Threads and Processes
     - I/O-Concurrency
         - `threading`
     - CPU-Concurrency (Parallelization)
         - `multiprocessing` 
         - `Spark`
 
### References
- [SuperFastPython](https://superfastpython.com/thread-vs-process/) by Jason Brownlee
- [An introduction to parallel programming](https://sebastianraschka.com/Articles/2014_multiprocessing.html) by Sebastian Raschka
- [Speed Up Your Python Program With Concurrency](https://realpython.com/python-concurrency/) by Jim Anderson
- [An Intro to Threading in Python](https://realpython.com/intro-to-python-threading/) by Jim Anderson
- [3 Methods for Parallelization in Spark](https://towardsdatascience.com/3-methods-for-parallelization-in-spark-6a1a4333b473) by Ben Weber

### Threads and Processes

A computer can have multiple CPUs, each CPU has multiple cores (e.g., two quad-core CPUs). 
All the CPUs are connected to memory (e.g., 64G memory). 
CPU cores can execute in parallel. 

<div>
<img src="./source/fig1.png" alt="Drawing" style="width: 1000px;"/>
</div>

A __process__ is the *operating system’s spawned and controlled entity that encapsulates an executing application* ([Breshears: The Art of Concurrency](https://amzn.to/3J74TRr)). 

A __thread__ is a path of execution which belongs to a process. 

Each thread belongs to a process. In single-threaded processes, the process contains one thread. In multithreaded processes, the process contains more than one thread, and the process is accomplishing a number of things at the same time. 

Threads can share memory within a process. This means that functions executed in new threads can access the same data and state. These might be global variables or data shared via function arguments. As such, sharing state between threads is straightforward.

Threads are sometimes called lightweight processes because they have their own stack but can access shared data. 

<div>
<img src="./source/fig3.png" alt="Drawing" style="width: 1000px;"/>
</div>

On the other hand, processes are 'share nothing', i.e., they independently execute without sharing memory or state. This makes it easier to turn into a distributed application, but typically, sharing data between processes requires explicit mechanisms.

<div>
<img src="./source/fig4.png" alt="Drawing" style="width: 1000px;"/>
</div>

Python allows to execute code using the principle of global interpreter lock (GIL). This means that only one thread can be executed at a time. This simplifies implementation, but makes it more difficult to execute code concurrently. 

Today, we will explore the advantages of executing multiple processes and threads and discuss under what circumstances which approach is most adequate. 

There are two major kind of tasks, that will slow down your program: CPU-bound and I/O-bound.

I/O-bound tasks cause your program to slow down because it frequently must wait for input/output (I/O) from some external resource. They arise when your program interacts with other sources, i.e., when your are *requesting* data from another source. 

<div>
<img src=https://files.realpython.com/media/IOBound.4810a888b457.png style="width: 1700px;"/>
</div>

CPU-bound tasks are those that require a lot of *computational* effort to complete. 

<div>
<img src=https://files.realpython.com/media/CPUBound.d2d32cb2626c.png style="width: 1700px;"/>
</div>

We will use threads and the module `threading` for I/O-bound tasks and processes and the module `multiprocessing` for CPU-bound tasks.

### I/O-Concurrency

We’ll start with a non-concurrent version of this a I/O-bound task. Namely, we will use `request` to request data from a [website](https://anson.ucdavis.edu/~kramling/). For `requets.Session`, see [here](https://requests.readthedocs.io/en/latest/user/advanced/) and [docs](https://requests.readthedocs.io/en/latest/api/?#requests.Session). 

In [1]:
import requests, time

def download_site(url, session):
    session.get(url) # fetch information from url 
    # ... do something ... 
    
def download_all_sites(sites):
    session = requests.Session()
    [download_site(url, session) for url in sites]
        
def task(): 
    sites = ["https://anson.ucdavis.edu/~kramling/"] * 80
    start_time = time.time()
    download_all_sites(sites)
    print(time.time() - start_time)

In [2]:
task()

0.764664888381958


We will now use concurrent threads that accomplish the same task, retrieving information by executing `requests.get` more efficient. 

In [3]:
import concurrent.futures, threading

thread_local = threading.local() # instantiates thread to create local data (here: session-attr.)

def download_site(url):
    session = get_session()
    session.get(url)
    # ... do something ... 

def download_all_sites(sites):
    with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
        executor.map(download_site, sites)

def get_session():
    '''Create a new requests.Session if there is none in thread_local'''
    if not hasattr(thread_local, "session"): 
        thread_local.session = requests.Session()
    return thread_local.session

In [4]:
task()

0.14759087562561035


We have created a `concurrent.futures.ThreadPoolExecutor`. It creates five threads that are run concurrently. 

Also, each thread will become its own separate `requests.Session`. This is one of the interesting and difficult issues with threading. Because the operating system is in control of switching between threads, any data that is shared between the threads needs to be protected, or thread-safe. Unfortunately `requests.Session` is not thread-safe. If untreated, *race conditions* can produce hard-to-detect bugs. 

Here, we use `threading.local()` to instantiate an object that looks like a global but is specific to each individual thread. 

The code above is faster than the non-concurrent version, because the I/O-bound has been circumvented. 

<div>
<img src=https://files.realpython.com/media/Threading.3eef48da829e.png style="width: 1600px;"/>
</div>

### CPU-Concurrency (Parallelization)

The above example of concurrency run only on a single CPU. This is due to the GIL. The `multiprocessing` module breaks down that barrier and runs code across multiple CPUs. 

It does this by creating a new instance of the Python interpreter (a new process) to run on each CPU and then farming out part of your program to run on it.  Bringing up a separate Python interpreter is not as fast as starting a new thread in the current Python interpreter. Regarding `.set_start_method`, see [here](https://docs.python.org/3/library/multiprocessing.html#contexts-and-start-methods). 

In [5]:
import multiprocessing
multiprocessing.cpu_count()

8

In [6]:
session = None
multiprocessing.set_start_method("fork", True) # new process will be a copy from previous process

def set_global_session():
    global session
    if not session:
        session = requests.Session()

def download_site(url):
    session.get(url)
    # ... do something ... 

def download_all_sites(sites):
    with multiprocessing.Pool(initializer=set_global_session, processes = 8) as pool: # change processes!
        pool.map(download_site, sites)
    
task()

0.2803640365600586


We may even be faster than with `threading`, since we are running up to 8 processes in parallel, but `multiprocessing` cannot scale beyond the number of cores in your local machine. 
The default argument `processes` of `multiprocessing.pool.Pool` ([docs](https://docs.python.org/3/library/multiprocessing.html?#module-multiprocessing.pool)), is the number of available cores on the machine. 

Remember that each process has its own memory space (share-nothing). That means that they cannot share things like a `requests.Session` object. We don’t want to create a new Session each time the function is called, you want to create one for each process, which calls `request.get` multiple times after another. 

The `initializer` function parameter is built for just this case. We initialize a global `session` variable to hold the single `requests.Session` *for each process*. Because each process has its own memory space, the global for each one will be different.

<div>
<img src=https://files.realpython.com/media/MProc.7cf3be371bbc.png style="width: 1600px;"/>
</div>

This course will deal with fetching information from the web, usually via requests. While these requests will usually be bound by third partys who maintain the servers we are requesting from as well, they are in essence I/O-bound task. `multiprocessing` however is useful for CPU-bound tasks. Consequently, lets consider a computational problem. 

For the purposes of our example, we’ll use a somewhat silly function to create something that takes a long time to run on the CPU. This function computes the sum of the squares of each number from 0 to the passed-in value:

In [7]:
def problem(number):
    return sum(i * i for i in range(number))

def find_sums(numbers):
    for number in numbers:
        problem(number)

def task():
    numbers = [5_000_000 + x for x in range(20)]
    start_time = time.time()
    find_sums(numbers)
    print(time.time() - start_time)

In [8]:
task()

6.951680898666382


This code calls `cpu_bound` 20 times with a different large number each time. It does all of this on a single thread in a single process on a single CPU. The execution timing diagram looks like this:

<div>
<img src=https://files.realpython.com/media/CPUBound.d2d32cb2626c.png style="width: 1600px;"/>
</div>

Since there is no I/O waiting time, `threading` will not speed up this problem. We can however speed the computation by using our multiple cores. 

In [9]:
# multiprocessing.set_start_method("fork", True) # has already been set
def find_sums(numbers):
    pool = multiprocessing.Pool()
    return pool.map(problem, numbers)

In [10]:
task()

1.599679946899414


This code is similar as for the I/O-bound problem, but here you don’t need to worry about the `requests.Session` object. Notably, the speed-up is not equal to the number of cores, as each process has to set up its own Python interpreter. 

<div>
<img src=https://files.realpython.com/media/CPUMP.69c1a7fad9c4.png style="width: 1600px;"/>
</div>

While this codes is easy and fast. All the single processes are automatically taken care of with `multiprocessing.Pool`. The results returned by `problem` are gathered by `multiprocessing.map` as a <kbd>list</kbd> type. 

In [11]:
find_sums(range(0, 4)) # returns a list

[0, 0, 1, 5]

However, many solution require communication between the processes. This can add some complexity to your solution that a non-concurrent program would not need to deal with.

A `multiprocessing.Queue` object provides a mechanism to pass data between a parent process and the descendent processes of it. It adheres the *first in first out* principle. We can retrieve with the `multiprocessing.get` method and set with the `multiprocessing.put` method. 

<div>
<img src=https://media.geeksforgeeks.org/wp-content/uploads/multiprocessing-python-4.png style="width: 1400px;"/>
</div>

In [12]:
q = multiprocessing.Queue()
def myfun(q, i): 
    q.put(i)

In [13]:
# initialize process
processes = [multiprocessing.Process(target=myfun, args = (q, i)) for i in range(4)]
processes

[<Process name='Process-25' parent=8580 initial>,
 <Process name='Process-26' parent=8580 initial>,
 <Process name='Process-27' parent=8580 initial>,
 <Process name='Process-28' parent=8580 initial>]

In [15]:
!ps aux | grep "[8]580"

peter             8580   7.9  0.7 35486988  58624   ??  Ss    6:37PM   0:09.09 /Users/peter/opt/anaconda3/bin/python -m ipykernel_launcher -f /Users/peter/Library/Jupyter/runtime/kernel-d7b33bd9-2084-42ef-b03c-d4c5de79bf2f.json


The processess are initialized but not yet executed. 

In [16]:
# run processes
for process in processes:
    process.start()

In [17]:
# join processes
for process in processes:
    process.join()

The main purpose of `multiprocessing.join` ([docs](https://docs.python.org/3/library/multiprocessing.html?#multiprocessing.Process.join)) is to ensure that a child process has completed before the main process does anything that depends on the work of the child process.

In [18]:
processes

[<Process name='Process-25' pid=8636 parent=8580 stopped exitcode=0>,
 <Process name='Process-26' pid=8637 parent=8580 stopped exitcode=0>,
 <Process name='Process-27' pid=8638 parent=8580 stopped exitcode=0>,
 <Process name='Process-28' pid=8639 parent=8580 stopped exitcode=0>]

The value `exitcode=0` means that the process has been completed successfully, without error ([docs](https://docs.python.org/3/library/multiprocessing.html?#multiprocessing.Process.exitcode)). 

In [19]:
q.empty()

False

In [20]:
[q.get() for process in processes]

[0, 1, 2, 3]

In [21]:
q.empty()

True

While `multiprocessing` allows you to steer the processes directly, many statistical problems are already implemented and ready for parallel computing, e.g. via `Spark`. 

### Spark

Apache Spark is a computational engine that works with huge sets of data by processing them in parallel and batch systems. Spark is written in Scala, and [PySpark](https://www.dominodatalab.com/data-science-dictionary/pyspark) was released to support the collaboration of Spark and Python. 

The key data type used in PySpark is the Spark dataframe. This object can be thought of as a table distributed across a cluster, and has functionality that is similar to dataframes in `pandas`. If you want to do distributed computation using `PySpark`, then you’ll need to perform operations on Spark dataframes and not other Python data types.

Here we explore how to perform tasks using `PySpark`.

In [None]:
!pip install pyspark

We consider a simple regression model for predicting house prices. Lets consider the non-serialized version first: 

In [22]:
import numpy as np
import pandas as pd

In [23]:
# load the california housing data set
from sklearn.datasets import fetch_california_housing
housing = fetch_california_housing()

# convert to a Pandas Data Frame
housing_pd = pd.DataFrame(data= np.c_[housing['data'],housing['target']], 
                          columns= np.append(housing['feature_names'], 'target')).sample(frac=1)

In [24]:
housing_pd.shape

(20640, 9)

In [25]:
housing_pd.head(5)

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude,target
5016,2.2054,47.0,5.271293,1.223975,895.0,2.823344,34.0,-118.33,1.215
963,6.3132,18.0,6.735363,0.990632,1395.0,3.266979,37.69,-121.91,2.592
11352,2.6442,23.0,4.134259,1.032407,1149.0,5.319444,33.75,-117.92,1.563
18324,5.9277,38.0,6.356502,1.002242,1148.0,2.573991,37.44,-122.13,4.466
8767,3.7778,35.0,6.035928,0.988024,391.0,2.341317,33.81,-118.39,4.875


In [26]:
from pyspark.ml.feature import VectorAssembler
from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession

sc = SparkContext('local')
spark = SparkSession(sc)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/10/10 18:41:29 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [27]:
# convert to a Spark data frame
housing_sp = spark.createDataFrame(housing_pd)

  for column, series in pdf.iteritems():
  for column, series in pdf.iteritems():


In [28]:
features = housing_sp.schema.names[:]
target = features.pop()

In [29]:
# convert to vector representation for MLlib
assembler = VectorAssembler(inputCols=features, outputCol="features" )
housing = assembler.transform(housing_sp).select('features', 'target') 

See [docs](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.LinearRegression.html). 

In [30]:
# linear regresion with Spark
from pyspark.ml.regression import LinearRegression
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.ml.evaluation import RegressionEvaluator

In [31]:
# linear regression with penalization
lr = LinearRegression(labelCol="target", featuresCol="features", 
                      elasticNetParam = 1.0, # lasso / l1-penalization
                      standardization = True, 
                      fitIntercept=False)

In [32]:
lrparamGrid = (ParamGridBuilder()
               .addGrid(lr.regParam, [0.001, 0.01, 0.1, 0.5, 1.0, 2.0])
               .build())

In [33]:
# Evaluate model
lrevaluator = RegressionEvaluator(predictionCol="prediction", labelCol="target", metricName="mse")

In [34]:
# Create 5-fold CrossValidator
lrcv = CrossValidator(estimator = lr,
                      estimatorParamMaps = lrparamGrid,
                      evaluator = lrevaluator,
                      numFolds = 5)

In [35]:
# Run cross validations
lrcvModel = lrcv.fit(housing)

24/10/10 18:41:36 WARN TaskSetManager: Stage 0 contains a task of very large size (1698 KiB). The maximum recommended task size is 1000 KiB.


[Stage 0:>                                                          (0 + 1) / 1]

24/10/10 18:41:37 WARN InstanceBuilder$JavaBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.VectorBLAS
24/10/10 18:41:37 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS
24/10/10 18:41:37 WARN InstanceBuilder$NativeBLAS: Failed to load implementation from:dev.ludovic.netlib.blas.ForeignLinkerBLAS


                                                                                

24/10/10 18:41:38 WARN TaskSetManager: Stage 1 contains a task of very large size (1698 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 18:41:38 WARN TaskSetManager: Stage 2 contains a task of very large size (1698 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 18:41:38 WARN TaskSetManager: Stage 3 contains a task of very large size (1698 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 18:41:38 WARN TaskSetManager: Stage 4 contains a task of very large size (1698 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 18:41:38 WARN TaskSetManager: Stage 5 contains a task of very large size (1698 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 18:41:38 WARN TaskSetManager: Stage 6 contains a task of very large size (1698 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 18:41:38 WARN TaskSetManager: Stage 7 contains a task of very large size (1698 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 18:4

24/10/10 18:41:41 WARN TaskSetManager: Stage 59 contains a task of very large size (1698 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 18:41:41 WARN TaskSetManager: Stage 60 contains a task of very large size (1698 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 18:41:41 WARN TaskSetManager: Stage 61 contains a task of very large size (1698 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 18:41:42 WARN TaskSetManager: Stage 62 contains a task of very large size (1698 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 18:41:42 WARN TaskSetManager: Stage 63 contains a task of very large size (1698 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 18:41:42 WARN TaskSetManager: Stage 64 contains a task of very large size (1698 KiB). The maximum recommended task size is 1000 KiB.
24/10/10 18:41:42 WARN TaskSetManager: Stage 65 contains a task of very large size (1698 KiB). The maximum recommended task size is 1000 KiB.
24/10/

See [docs](https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.ml.regression.LinearRegressionTrainingSummary.html). 

In [36]:
# Get Model Summary Statistics
lrcvSummary = lrcvModel.bestModel.summary
lrcvSummary.meanSquaredError

0.6239342564332631

In [37]:
lrcvModel.bestModel.coefficients

DenseVector([0.437, 0.0145, -0.1131, 0.4914, 0.0, -0.0041, -0.0224, -0.0066])

In [38]:
housing_sp.schema.names

['MedInc',
 'HouseAge',
 'AveRooms',
 'AveBedrms',
 'Population',
 'AveOccup',
 'Latitude',
 'Longitude',
 'target']

### Summary 

- There are I/O- and CPU-bound problems
- Use `threading` for I/O-bound problems, `multiprocessing` for CPU-bound problems
- Communication between processes is cumbersome
- For many CPU-bound tasks, there may be implemented solutions. 