<a href="https://colab.research.google.com/github/noahgift/distributed-computing-explorations/blob/main/Concurrency_Python.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## Concurrency in Python

* *[Watch Video Lesson 6.6:  Use concurrency methods in Python](https://www.safaribooksonline.com/videos/essential-machine-learning/9780135261118/9780135261118-EMLA_01_06_06)*

### Threads

Threads are the beatup Pinto of concurrency in Python.  They lack the ability to scale to multiple cores and often cause performance problems.  Almost always you should choose some other method of concurrency in Python.

*Typically they are used in situations where things are IO bound, not CPU bound.*

![Pinto](https://homeprohub.files.wordpress.com/2013/03/cost-of-window-replacement.jpg)





##### Simple Threading Example

In [None]:
import threading

def fight_club(x):
  
    print(f"Processing Thread# {num}: Calculating punch with attack strength {x} to the {x} power\n")
    return x**x
  
workers = []
for num in range(1,6):
  print(f"Queuing thread # {num}\n")
  thread = threading.Thread(target=fight_club, args=(num,))
  workers.append(thread)
  thread.start()

#### Using the subprocess command

A general purpose way to "Shell Out" to system commands


In [None]:
import subprocess
#res = subprocess.Popen("ls -l", shell=True, stdout=subprocess.PIPE) #not ideal
res = subprocess.Popen(["ls", "-l"], stdout=subprocess.PIPE)
out = res.stdout.readlines()
print(out)
!ls -l

In [None]:
#also worth noting in 3.7 subprocess.run(console_output=True)
!python3 --version

In [None]:
!ls -l

### Multiprocessing

#### Mapping processes to Functions

Processes are forked and run truly parallel (unlike threads)

In [None]:
from multiprocessing import Pool
import datetime
import time
import random

def fight_club(x):
  
    sleep_time = random.randrange(0,3)
    time.sleep(sleep_time)
    timestamp = datetime.datetime.now()
    print(f"Calculating punch with attack strength {x} to the {x} power: @timestamp {timestamp} with sleep {sleep_time}")
    return x**x

if __name__ == '__main__':
    p = Pool(5)
    print(p.map(fight_club, [1, 2, 3, 10, 100]))

In [None]:
from multiprocessing import cpu_count
cpu_count()

#### Process Pool Joined on Queue (Threadlike behavior)

Mimicks Threading interface, but with actual multi-core functionality

In [None]:
from multiprocessing import Process, Queue

def f(q):
    q.put(["armbar", "kimura",  "Mata Leão"])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(f"Grabbing some attacks: {q.get()}")    
    p.join()

### Async IO

#### Async IO in Python Examples

More info here:  https://docs.python.org/3/library/asyncio.html

**Using Python3 Async**

```python
import asyncio

def send_async_firehose_events(count=100):
    """Async sends events to firehose"""

    start = time.time() 
    client = firehose_client()
    extra_msg = {"aws_service": "firehose"}
    loop = asyncio.get_event_loop()
    tasks = []
    LOG.info(f"sending aysnc events TOTAL {count}",extra=extra_msg)
    num = 0
    for _ in range(count):
        tasks.append(asyncio.ensure_future(put_record(gen_uuid_events(), client)))
        LOG.info(f"sending aysnc events: COUNT {num}/{count}")
        num +=1
    loop.run_until_complete(asyncio.wait(tasks))
    loop.close()
    end = time.time()  
    LOG.info("Total time: {}".format(end - start))
  ```

**Using trollius library with Python 2:  DEPRECATED**

```python
"""Generates an Async MetaData call.  Note, this isn't available in Boto3
In [56]: res = all_metadata_async()
In [57]: res
Out[57]: 
[('ami-manifest-path', <Response [200]>),
 ('instance-type', <Response [200]>),
 ('instance-id', <Response [200]>),
 ('iam', <Response [200]>),
 ('local-hostname', <Response [200]>),
 ('network', <Response [200]>),
 ('hostname', <Response [200]>),
 ('ami-id', <Response [200]>),
 ('instance-action', <Response [200]>),
 ('profile', <Response [200]>),
 ('reservation-id', <Response [200]>),
 ('security-groups', <Response [200]>),
 ('metrics', <Response [200]>),
 ('mac', <Response [200]>),
 ('public-ipv4', <Response [200]>),
 ('services', <Response [200]>),
 ('local-ipv4', <Response [200]>),
 ('placement', <Response [200]>),
 ('ami-launch-index', <Response [200]>),
 ('public-hostname', <Response [200]>),
 ('public-keys', <Response [200]>),
 ('block-device-mapping', <Response [200]>)]
"""

import requests
import trollius

def get_metadata_api_urls():
    """Retrieves the api endpoints for metadata"""

    full_urls = {}
    metadata_url = "http://169.254.169.254/latest/meta-data/"
    resp = requests.get(metadata_url)
    urls = resp.content.split()
    for url in urls:
        stripped_url = url.rstrip("/")
        full_urls[stripped_url]=(os.path.join(metadata_url, url))
    return full_urls

def _get(key_url):
    key,url = key_url
    return key, requests.get(url)

def _do_calls(urls):
    loop = trollius.get_event_loop()
    futures = []
    for url in urls:
        futures.append(loop.run_in_executor(None, _get, url))
    return futures

@trollius.coroutine
def call():
    results = []
    futures = _do_calls(get_metadata_api_urls().items())
    for future in futures:
        result = yield trollius.From(future)
        results.append(result)
    raise trollius.Return(results)

def all_metadata_async():
    """Retrieves all available metadata for an instance async"""

    loop = trollius.get_event_loop()
    res = loop.run_until_complete(call())
   ```


### Serverless or FaaS (Functions as a service)

#### AWS Lambda

#####  AWS Lambda and Chalice Example

Standalone Lambda with Chalice:  http://chalice.readthedocs.io/en/latest/

```python
@app.lambda_function()
def send_message(event, context):
    """Send a message to a channel"""

    slack_client = SlackClient(SLACK_TOKEN)
    res = slack_client.api_call(
      "chat.postMessage",
      channel="#general",
      text=event
    )
    return res
```


#### Fn Project

##### Fn Project

![Fn Project](https://camo.githubusercontent.com/aad13cfe0e267f38143fd8cc6816ab8adde37a56/687474703a2f2f666e70726f6a6563742e696f2f696d616765732f666e2d333030783132352e706e67)


*   [FN Project](https://fnproject.io/)
*   [FN Project Python Example](http://fnproject.io/tutorials/python/intro/)



```bash

fn init --runtime python --trigger http pythonfn

```



```python

import fdk
import json


def handler(ctx, data=None, loop=None):
    name = "World"
    if data and len(data) > 0:
        body = json.loads(data)
        name = body.get("name")
    return {"message": "Hello {0}".format(name)}



if __name__ == "__main__":
    fdk.handle(handler)
```







### Large Scale Concurrency Solutions

#### Larger Scale Concurrency



*   [AWS Step Functions with Lambda](https://aws.amazon.com/step-functions/)

![alt text](https://d1.awsstatic.com/product-marketing/Step%20Functions/OrderFullScreen.0e74c2f19d89a9325addb5bd746cd895b2e4c9c2.jpg)

*   [AWS Batch](https://aws.amazon.com/batch/)
![alt text](https://d1.awsstatic.com/Test%20Images/Kate%20Test%20Images/Dilithium_flowchart%20diagrams_v3_kw-02.322877d73eda8ed71a44db216a1d195550befac0.png)

*   [RabbitMQ Worker Farms-IBM Developerworks Article](https://www.ibm.com/developerworks/cloud/library/cl-optimizepythoncloud1/index.html)

![alt text](https://www.ibm.com/developerworks/cloud/library/cl-optimizepythoncloud2/figure1.gif)





### High Level Concurrency Overview for Machine Learning and HPC (High Performance Computing)


#### Diagram of Python Performance Problems


![63,000X Speedup for Matrix Multiply from Standard Python](https://user-images.githubusercontent.com/58792/45932870-37339000-bf38-11e8-8272-bf2addf56df1.png)

Source:  [Dave Patterson, UC Berkeley](https://www2.eecs.berkeley.edu/Faculty/Homepages/patterson.html)

#### Numba

[Numba](http://numba.pydata.org/)

*   open source JIT (Just in Time Compiler)
*   translates a subset of Python and Numpy code into fast machine code
*   Can approach speed of C
*   Can also parallize:  "true threads" and "GPU"





##### Install Numba

In [None]:
!apt-get install nvidia-cuda-toolkit
!pip3 install numba

import os
os.environ['NUMBAPRO_LIBDEVICE'] = "/usr/lib/nvidia-cuda-toolkit/libdevice"
os.environ['NUMBAPRO_NVVM'] = "/usr/lib/x86_64-linux-gnu/libnvvm.so"

##### Use Numba

In [None]:
from numba import (cuda, vectorize)
import numba
import pandas as pd
import numpy as np

In [None]:
def real_estate_df():
    """30 Years of Housing Prices"""

    df = pd.read_csv("https://raw.githubusercontent.com/noahgift/real_estate_ml/master/data/Zip_Zhvi_SingleFamilyResidence.csv")
    df.rename(columns={"RegionName":"ZipCode"}, inplace=True)
    df["ZipCode"]=df["ZipCode"].map(lambda x: "{:.0f}".format(x))
    df["RegionID"]=df["RegionID"].map(lambda x: "{:.0f}".format(x))
    return df

def numerical_real_estate_array(df):
    """Converts df to numpy numerical array"""

    columns_to_drop = ['RegionID', 'ZipCode', 'City', 'State', 'Metro', 'CountyName']
    df_numerical = df.dropna()
    df_numerical = df_numerical.drop(columns_to_drop, axis=1)
    return df_numerical.values

def real_estate_array():
    """Returns Real Estate Array"""

    df = real_estate_df()
    rea = numerical_real_estate_array(df)
    return np.float32(rea)
  
rea = real_estate_array()

##### Use Numba decorator

In [None]:
@numba.jit(nopython=True)
def expmean_jit(rea):
    """Perform multiple mean calculations"""

    val = rea.mean() ** 2
    return val
  
expmean_jit(rea)

##### Multi-threaded numba

True multi-threaded code (Warning will use all cores on anymachine that runs it)

In [None]:
@numba.jit(parallel=True)
def add_sum_threaded(rea):
    """Use all the cores"""

    x,_ = rea.shape
    total = 0
    for _ in numba.prange(x):
        total += rea.sum()  
        print(total)
        
add_sum_threaded(rea)

#### GPU 

Heavily used in Deep Learning

*   NVidia
  - Numba [CUDA GPU ](http://numba.pydata.org/numba-doc/latest/cuda/index.html)
*  AMD
  - Numba [AMD ROC GPU](http://numba.pydata.org/numba-doc/latest/roc/index.html)



##### Use GPU

In [None]:
@vectorize(['float32(float32, float32)'], target='cuda')
def add_ufunc(x, y):
    return x + y
  
def cuda_operation():
    """Performs Vectorized Operations on GPU"""

    x = real_estate_array()
    y = real_estate_array()

    print("Moving calculations to GPU memory")
    x_device = cuda.to_device(x)
    y_device = cuda.to_device(y)
    out_device = cuda.device_array(
        shape=(x_device.shape[0],x_device.shape[1]), dtype=np.float32)
    print(x_device)
    print(x_device.shape)
    print(x_device.dtype)

    print("Calculating on GPU")
    add_ufunc(x_device,y_device, out=out_device)

    out_host = out_device.copy_to_host()
    print(f"Calculations from GPU {out_host}")
    
cuda_operation()

#### TPU

[Tensor Processing Unit](https://cloud.google.com/tpu/docs/tpus)



*   "Google’s custom-developed application-specific integrated circuits (ASICs) used to accelerate machine learning workloads"
*   Available both in colab notebooks and on Google Cloud

