## Global Interpreter Lock aka GIL

GIL allows only one process to have control on the Python interpreter which means only one process is executed at any point of time. 

https://realpython.com/python-gil/

## Multithreading

**Single thread, single process**

In [None]:
import urllib.request

In [None]:
urls = [
  'http://www.python.org',
  'https://docs.python.org/3/',
  'https://docs.python.org/3/whatsnew/3.7.html',
  'https://docs.python.org/3/tutorial/index.html',
  'https://docs.python.org/3/library/index.html',
  'https://docs.python.org/3/reference/index.html',
  'https://docs.python.org/3/using/index.html',
  'https://docs.python.org/3/howto/index.html',
  'https://docs.python.org/3/installing/index.html',
  'https://docs.python.org/3/distributing/index.html',
  'https://docs.python.org/3/extending/index.html',
  'https://docs.python.org/3/c-api/index.html',
  'https://docs.python.org/3/faq/index.html'
  ]

In [None]:
%%time
results = []
for url in urls:
    with urllib.request.urlopen(url) as src:
        results.append(src)

*************************************************************

In [None]:
import urllib.request
from concurrent.futures import ThreadPoolExecutor 
# ThreadPoolExecutor gives us a pool of threads and we can submit tasks to this pool. 
# The pool would assign tasks to the available threads and schedule them to run

In [None]:
%%time

with ThreadPoolExecutor(4) as executor:
    # The map method allows multiple calls to a provided function, 
    # passing each of the items in an iterable to that function
    results = executor.map(urllib.request.urlopen, urls)

In [None]:
%%time

with ThreadPoolExecutor(8) as executor:
    results = executor.map(urllib.request.urlopen, urls)

In [None]:
%%time

with ThreadPoolExecutor(13) as executor:
    results = executor.map(urllib.request.urlopen, urls)

In [None]:
%%time

with ThreadPoolExecutor(16) as executor:
    results = executor.map(urllib.request.urlopen, urls)

The multithreading library is lightweight, shares memory, responsible for responsive UI and is used well for I/O bound applications. 

Multiple threads live in the same process in the same space, each thread will do a specific task, have its own code, own stack memory, instruction pointer, and share heap memory. 

Multiple threads can significantly speed up many tasks that are IO-bound. IO-bound programs spend most of the time waiting for input/output. Multithreading is very useful in scenarios like webscrapping. We can do multiple fetches concurrently and processing each page as it returns.

# Multi-processing

Multiprocessing uses multiple processes. 

Multiprocessing spawns each process with is own interpreter and assigns a separate memory space for it,  so the GIL(Global Interpreter library) does not hold things back. 

There are 2 main objects in multiprocessing to implement parallel execution of a function: The `Pool` Class and the `Process` Class.

If you spawn more processes than what your CPU can handle at a time, you will notice your performance starting to drop. This is because the operating system now has to do more work swapping processes in and out of the CPU cores since you have more processes than cores. 

In [None]:
import multiprocessing as mp
print("Number of processors: ", mp.cpu_count())

In [None]:
# We see exmample of process class here
import os
from multiprocessing import Process, current_process
 
# function that will be run in parallel 
def doubler(number): 
    """
    A doubling function that can be used by a process
    """
    result = number * 2
    proc_name = current_process().name
    print('{0} doubled to {1} by: {2}'.format(
        number, result, proc_name))
 
 
if __name__ == '__main__':
    numbers = [5, 10, 15, 20, 25, 100]
    procs = []
    # Instatiate process object with function name and input data(args)
    proc = Process(target=doubler, args=(7,))  
 
    for index, number in enumerate(numbers):
        print("Begin multiprocessing")
        proc = Process(target=doubler, args=(number,))
        procs.append(proc)
        # Start the process/function execution
        proc.start()  
    print('Multi process submission complete')
 
    proc = Process(target=doubler, name='Test', args=(2,))
    proc.start()
    procs.append(proc)
 
    for proc in procs:
        # Tell the parent application that processes are complete.
        # Otherswise the processes will remain idle and will not terminate using up resources
        proc.join() 
        

* https://sebastianraschka.com/Articles/2014_multiprocessing.html
* https://www.journaldev.com/15631/python-multiprocessing-example

# Dask DataFrame

Dask is an Open Source library that  provides abstractions over NumPy Arrays, Pandas Dataframes and regular lists, allowing you to run operations on them in parallel, using multicore processing.

https://docs.dask.org/en/latest/

A **Dask DataFrame** is a large parallel DataFrame composed of many smaller Pandas DataFrames, split along the index. These Pandas DataFrames may live on disk for larger-than-memory computing on a single machine, or on many different machines in a cluster. One Dask DataFrame operation triggers many operations on the constituent Pandas DataFrames.

In [None]:
from IPython.display import Image
Image("dask-dataframe.png")

In [None]:
import dask.dataframe as ddf
import pandas as pd

In [None]:
dask_df = ddf.read_csv('random_people.csv')

In [None]:
dask_df.shape

Dask dataframe doesn't know how many records are in your data without first reading through all of it.

In [None]:
pandas_df = pd.read_csv('random_people.csv')

In [None]:
pandas_df.shape

In [None]:
%%time
dask_df['new_salary'] = dask_df['salary']**2
# dask_df.head()

In [None]:
%%time
pandas_df['new_salary'] = pandas_df['salary']**2
# pandas_df.head()

In [None]:
%%time
df1 = dask_df[dask_df['new_salary']>5000]
# df1.head()

In [None]:
%%time
df2 = pandas_df[pandas_df['new_salary']>5000]
# df2.head()

In [None]:
pandas_df.salary.value_counts()

In [None]:
# dask_df.salary.value_counts()
dask_df.salary.value_counts().compute()

Dask DataFrames aggregations and apply's are lazy. To trigger computation we need to use `.compute()` method

**Features of Dask**
* One can use Kubernetes to launch Dask workers
* Dask-Yarn deploys Dask on YARN clusters, such as are found in traditional Hadoop installations. Dask-Yarn provides an easy interface to quickly start, scale, and stop Dask clusters natively from Python.
* Dask.distributed is a lightweight library for distributed computing in Python. It extends both the concurrent.futures and dask APIs to moderate sized clusters.

* https://matthewrocklin.com/blog/work/2017/07/03/scaling
* https://www.anaconda.com/wp-content/uploads/2019/03/2018-11-Dask_CheatSheet-1.pdf
* https://ipython-books.github.io/511-performing-out-of-core-computations-on-large-arrays-with-dask/
* https://matthewrocklin.com/blog/work/2017/01/12/dask-dataframes
* https://www.youtube.com/watch?v=Q7XyGfS84l0&t=6s

## PySpark DataFrames

PySpark is the Python API  for Spark. PySpark DataFrames are
* **Immutable in nature** : We can create DataFrame once but canâ€™t change it. And we can transform a DataFrame  after applying transformations.
* **Lazy Evaluations**: Which means that a task is not executed until an action is performed.
* **Distributed**: PySpark DataFrames are distributed in nature.

In [None]:
from pyspark import SQLContext
from pyspark import SparkContext
sc = SparkContext()
sqlContext = SQLContext(sc)

In [None]:
from pyspark.sql import SparkSession

In [None]:
df = sqlContext.read.csv("random_people.csv",header=True)

In [None]:
df.show()

In [None]:
df.printSchema()

In [None]:
# df.select('name','salary').show()
df.select('name','salary')

* https://mapr.com/blog/spark-101-what-it-what-it-does-and-why-it-matters/
* https://dzone.com/articles/pyspark-dataframe-tutorial-introduction-to-datafra