# Threads, Processes, and Concurrency


## Threading

- Typically, **concurrency** is created so that we can do some task while I/O is happening (e.g., a server can start processing a new request while waiting for data from a previous request to arrive)

- We can create objects that appear to be running independently, but simultaneously

- The job of threading is to enable an application to be responsive

- CPython, the default implementation of Python, has a Global Interpreter Lock (GIL), which prevents your application from doing two things at once, but rather, the CPU time is being rationed across your threads

    What that means: Python does not natively support parallel computing with multiple simultaneous threads. 


### Simple Threading Example

In [8]:
import threading
import time

def worker(thread_id):
    print(f"Thread {thread_id} started.")
    time.sleep(2)  # Simulating some work
    print(f"Thread {thread_id} finished.")

# Create and start multiple threads
threads = []
for i in range(5):
    thread = threading.Thread(target=worker, args=(i,))
    threads.append(thread)
    thread.start()

# Wait for all threads to complete
for thread in threads:
    thread.join()

print("All threads completed.")

Thread 0 started.Thread 1 started.

Thread 2 started.
Thread 3 started.
Thread 4 started.
Thread 1 finished.Thread 3 finished.
Thread 0 finished.
Thread 4 finished.
Thread 2 finished.

All threads completed.


- The `worker` function represents the task that each thread will execute. It takes a `thread_id` as an argument and simulates some work by sleeping for 2 seconds.

- We create a list called `threads` to store the thread objects.

- Inside the loop, we create a new thread using `threading.Thread`, specifying the target function (`worker`) and its arguments (`i`).

- We append each thread to the `threads` list and start it using the `start()` method.

- After starting all the threads, we use a loop to wait for each thread to complete using the `join()` method. This ensures that the main thread waits for all the worker threads to finish before proceeding.

- Finally, we print a message indicating that all threads have completed.

In [1]:
from threading import Thread

class InputReader(Thread):
    """Thread example, extends Thread class"""

    def run(self):
        """
        Whatever is in the run method (or called from
        it) is executed in a separate thread
        """
        self.line_of_text = input('Enter some text: ')

input('Are you ready? When you hit return the thread will start.')
thread = InputReader() # create thread object
thread.start() # cf. thread.run() for no concurrency

count, result = 1, 1

while thread.is_alive():
    result = count * count
    count += 1

print(f'calculated squares up to {count:,} * {count:,} = {result:,}')
print(f'while you typed "{thread.line_of_text}"')

calculated squares up to 15,599,133 * 15,599,133 = 243,332,919,153,424
while you typed "gekk"


### Think of threads like workers who are extremely efficient at "multi-tasking" by being an expert at switching task contexts


### But...the main problem with threads is also their primary advantage–shared memory

- all threads have access to all the memory

- what if two threads access the same data?

- synchronization is the solution, but it's tricky

- bugs due to incorrect synchronization can be very difficult to find due to ordering issues

- one solution is to force communication between threads to occur using a data structure that has built in locking, such as `queue.Queue`

- disadvantages could be outweighed by the fact that shared memory is FAST, except for the GIL

### Repeat A Task Every So Often...
...using a Timer (subclass of thread)

In [10]:
from threading import Timer, Event
 
def every_so_often():
    if not done.is_set():
        print('Do the thing you want to every so often')
        Timer(5.0, every_so_often).start()
 
done = Event()
Timer(5.0, every_so_often).start()
 
for count in range(100_000_000):
    prod = count * count
    if count % 10_000_000 == 0:
        print('multiplied up to', count)
 
done.set()

multiplied up to 0
multiplied up to 10000000
multiplied up to 20000000
multiplied up to 30000000
multiplied up to 40000000
multiplied up to 50000000
multiplied up to 60000000
Do the thing you want to every so often
multiplied up to 70000000
multiplied up to 80000000
multiplied up to 90000000


### **Thread Synchronization** and Object Sharing

When multiple threads access shared resources concurrently, synchronization mechanisms are necessary to prevent race conditions and ensure data integrity. Let's revisit `BankAccount` class for thread synchronization using locks

The `lock` attribute is an instance of threading.Lock, which is used to synchronize access to the shared balance attribute.

In [11]:
import threading

class BankAccount:
    def __init__(self, balance):
        self.balance = balance
        self.lock = threading.Lock()

    def deposit(self, amount):
        with self.lock:
            new_balance = self.balance + amount
            self.balance = new_balance
            print(f"Deposit: {amount}, New Balance: {self.balance}")

    def withdraw(self, amount):
        with self.lock:
            if self.balance >= amount:
                new_balance = self.balance - amount
                self.balance = new_balance
                print(f"Withdrawal: {amount}, New Balance: {self.balance}")
            else:
                print("Insufficient funds.")

account = BankAccount(1000)

def perform_transactions():
    account.deposit(500)
    account.withdraw(200)

threads = []
for _ in range(5):
    thread = threading.Thread(target=perform_transactions)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

Deposit: 500, New Balance: 1500
Withdrawal: 200, New Balance: 1300
Deposit: 500, New Balance: 1800
Withdrawal: 200, New Balance: 1600
Deposit: 500, New Balance: 2100
Withdrawal: 200, New Balance: 1900
Deposit: 500, New Balance: 2400
Withdrawal: 200, New Balance: 2200
Deposit: 500, New Balance: 2700
Withdrawal: 200, New Balance: 2500


- The `deposit` and `withdraw` methods use the `with` statement to acquire the lock before accessing the shared `balance` attribute. This ensures that only one thread can modify the balance at a time, preventing race conditions.

- The `perform_transactions` function performs a series of transactions (deposit and withdrawal) on the bank account.

- We create multiple threads, each executing the `perform_transactions` function concurrently.

- The `join()` method is used to wait for all the threads to complete before the program exits.

### Concurrency: Executing multiple tasks or processes simultaneously, but not necessarily in parallel.

“Concurrency is two lines of customers ordering from a single cashier (lines take turns ordering), while Parallelism is two lines of customers ordering from two cashiers (each line with its own cashier)”

- Each cashier is a processing unit

- Each customer is a task that needs to be taken care of

### Parallelism: Executing multiple tasks or processes simultaneously and in parallel, utilizing multiple CPU cores
- GIL Lock in Python limits true parallelism in Python, even if you have multiple threads open
- Ways around this include relying on multiple processes instead of multiple threads 


## Multiprocessing

#### Processes are separate instances of a program that run independently and have their own memory space.

- the Python `multiprocessing` library is designed for cases where CPU-bound jobs needs to happen in parallel and multiple cores are available
advantages

- separate memory space for each process

- code is usually straightforward compared to threads

- **avoids GIL** limitation

- eliminates synchronization (assuming no shared memory)

In [13]:
## RUN FROM COMMAND LINE -> See multiprocess.py
# Don't run this in Jupyter, run from command line
from multiprocessing import Process, cpu_count
import time
import os

class MuchCPU(Process):
    def run(self):
        print(os.getpid())
        for i in range(200000000):
            pass

if __name__ == '__main__':
    print('Running...')
    procs = [MuchCPU() for f in range(cpu_count())]
    t = time.time()
    for p in procs:
        p.start()
    
    for p in procs:
        p.join()
    
    print('work took {} seconds'.format(time.time() - t))

Running...
work took 0.07891678810119629 seconds


Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/homebrew/Cellar/python@3.11/3.11.8/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.8/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 132, in _main
    self = reduction.pickle.load(from_parent)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: Can't get attribute 'MuchCPU' on <module '__main__' (built-in)>
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "/opt/homebrew/Cellar/python@3.11/3.11.8/Frameworks/Python.framework/Versions/3.11/lib/python3.11/multiprocessing/spawn.py", line 122, in spawn_main
    exitcode = _main(fd, parent_sentinel)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/homebrew/Cellar/python@3.11/3.11.8/F

## Multiprocessing Pools

- pools abstract away the overhead of figuring out what code is running in main process and what code is running in subprocess

    abstraction restricts the number of places that code in different processes 
    interact with each other, making it easier to keep track of

- pools also hide the passing of data between processes

- using a pool looks much like a function call–you pass data into a function, it's executed in another process or processes, and when the work is complete, a value is returned

- under the hood, a lot of work is being done to support this–objects in one process are being pickled (serialized) and passed into a pipe, then another process retrieves data from the pipe and unpickles it. 

    Work is done in the subprocess and a result is produced. The result is pickled and passed into a pipe. Eventually, the original process unpickles it and returns it.

### Big Prime multiprocessing
--> Head to `multi_pool.py` in PROJECTS folder to try it out 
- Will not work in notebook

In [None]:
import random
import math
import os
from multiprocessing.pool import Pool

def prime_factor(value, level=0):
    factors = []
    if level:
        print('    ' * level, 'prime_factor(', value, ', ', level, ') ', os.getpid(), sep='')
    for divisor in range(2, value - 1):
        quotient, remainder = divmod(value, divisor)
        if not remainder:
            factors.extend(prime_factor(divisor, level + 1))
            factors.extend(prime_factor(quotient, level + 1))
            break
    else:
        factors = [value]
    return factors

if __name__ == '__main__': # distiguishes between running and importing
    pool = Pool()

    to_factor = [
        random.randint(40_000_000, 80_000_000) 
                for _ in range(64)
    ]
    print(to_factor)
    results = pool.map(prime_factor, to_factor)
    for value, factors in zip(to_factor, results):
        print("The factors of {} are {}".format(value, factors))
    #print(results)

## another example...
we'll need to install `pip install futures` here


In [None]:
### Go to get_news.py under PROJECTS

### Maybe have pythontutor.com  handy...

### file download managers...

In [None]:
import os
import urllib.request
 
from concurrent.futures import ThreadPoolExecutor
from concurrent.futures import as_completed
 
def downloader(url):
    """
    Downloads the specified URL and saves it to disk
    """
    req = urllib.request.urlopen(url)
    filename = os.path.basename(url)
    ext = os.path.splitext(url)[1]
    if not ext:
        raise RuntimeError('URL does not contain an extension')
 
    with open(filename, 'wb') as file_handle:
        while True:
            chunk = req.read(1024)
            if not chunk:
                break
            file_handle.write(chunk)
    msg = 'Finished downloading {filename}'.format(filename=filename)
    return msg
 
def main(urls):
    """
    Create a thread pool and download specified urls
    """
    with ThreadPoolExecutor(max_workers=5) as executor:
        futures = [executor.submit(downloader, url) for url in urls]
        for future in as_completed(futures):
            print('sik', future.result())

urls = ["https://www.irs.gov/pub/irs-pdf/f1040.pdf",
        "https://www.irs.gov/pub/irs-pdf/f1040a.pdf",
        "https://www.irs.gov/pub/irs-pdf/f1040ez.pdf",
        "https://www.irs.gov/pub/irs-pdf/f1040es.pdf",
        "https://www.irs.gov/pub/irs-pdf/f1040sb.pdf"]
if __name__ == '__main__':
    main(urls)

### Asynchronous Programming with `asyncio`

Asynchronous programming allows for concurrent execution of tasks without the need for explicit threading or process management. 

Python's `asyncio` module provides a framework for writing **asynchronous code using coroutines and event loops**. 

Asyncio - best modern version of threading in python

Threading - multiple worker threads, but can be cumbersome and can bloat code

Processing - Concurrency → extra processes

Here's an example that demonstrates the use of `asyncio` -> async_news.py


In [15]:
import asyncio

async def fetch_data(url):
    print(f"Fetching data from {url}")
    await asyncio.sleep(2)  # Simulating network delay
    return f"Data from {url}"

async def main():
    urls = ["https://example.com", "https://example.org", "https://example.net"]
    tasks = []
    for url in urls:
        task = asyncio.create_task(fetch_data(url))
        tasks.append(task)

    results = await asyncio.gather(*tasks)
    for result in results:
        print(result)

asyncio.run(main())

RuntimeError: asyncio.run() cannot be called from a running event loop

Explanation for above code:

- The `fetch_data` function is defined as an asynchronous function using the `async def` syntax. It takes a `url` as an argument and simulates fetching data from that URL by sleeping for 2 seconds using `asyncio.sleep()`.

- The `main` function is also defined as an asynchronous function. It contains the main logic of the program.

- Inside `main`, we define a list of URLs that we want to fetch data from.

- We create a list called `tasks` to store the asynchronous tasks.

- For each URL, we create an asynchronous task using `asyncio.create_task()`, passing the `fetch_data` coroutine with the URL as an argument. We append each task to the `tasks` list.

- We use `asyncio.gather()` to wait for all the tasks to complete and collect their results. The `` operator is used to unpack the `tasks` list and pass each task as a separate argument to `gather()`.

- The `await` keyword is used to wait for the tasks to complete and retrieve their results.

- Finally, we iterate over the results and print each one.

- The `asyncio.run()` function is used to run the asynchronous `main` function and start the event loop.

## Useful methods of `asyncio`

`asyncio.gather()`  - fast and also preserves the order of outputs

`asyncio.ascompleted()` - Speedy but does not preserve order of outputs (gets things done as needed) - so depends on the required context

## Exercises

### Concurrency and Parallelism Exercise: 

Implement a program that fetches data from multiple URLs concurrently using threads and another using processes. Compare the performance of each approach.

Scenario: You are building a web scraper that needs to fetch data from multiple websites simultaneously. Using concurrency and parallelism can significantly speed up the scraping process.

In [None]:
### THREADS IMPLEMENTATION
import concurrent.futures
import requests

def fetch_url(url):
    response = requests.get(url)
    return response.text

def fetch_urls_concurrently(urls):
    with concurrent.futures.ThreadPoolExecutor() as executor:
        futures = [executor.submit(fetch_url, url) for url in urls]
        results = [future.result() for future in concurrent.futures.as_completed(futures)]
    return results

urls = [...]  # List of URLs to fetch
data = fetch_urls_concurrently(urls)

In [None]:
### PROCESSES IMPLEMENTATION
import concurrent.futures
import requests

def fetch_url(url):
    response = requests.get(url)
    return response.text

def fetch_urls_parallel(urls):
    with concurrent.futures.ProcessPoolExecutor() as executor:
        futures = [executor.submit(fetch_url, url) for url in urls]
        results = [future.result() for future in concurrent.futures.as_completed(futures)]
    return results

urls = [...]  # List of URLs to fetch
data = fetch_urls_parallel(urls)

### Thread Synchronization and Object Sharing:
Exercise: Implement a thread-safe counter class that multiple threads can increment concurrently without causing race conditions.

Scenario: You are building a web server that needs to keep track of the number of requests processed. Multiple threads will be accessing and incrementing the counter simultaneously.

In [None]:
import threading

class ThreadSafeCounter:
    def __init__(self):
        self.count = 0
        self.lock = threading.Lock()

    def increment(self):
        with self.lock:
            self.count += 1

    def get_count(self):
        with self.lock:
            return self.count

counter = ThreadSafeCounter()

def worker():
    for _ in range(100000):
        counter.increment()

threads = []
for _ in range(5):
    thread = threading.Thread(target=worker)
    threads.append(thread)
    thread.start()

for thread in threads:
    thread.join()

print("Final count:", counter.get_count())

### Reading/Writing Files Asynchronously:

Exercise: Implement a program that reads multiple large files asynchronously and counts the occurrence of a specific word in each file.

Scenario: You are building a log analysis tool that needs to process multiple large log files efficiently to count the occurrence of a specific error keyword.

In [None]:
import asyncio

async def count_word_in_file(file_path, word):
    async with asyncio.open(file_path, 'r') as file:
        content = await file.read()
        count = content.lower().count(word.lower())
        return count

async def main():
    file_paths = [...]  # List of file paths
    word = "error"
    tasks = [asyncio.create_task(count_word_in_file(file_path, word)) for file_path in file_paths]
    counts = await asyncio.gather(*tasks)

    for file_path, count in zip(file_paths, counts):
        print(f"File: {file_path}, Count: {count}")

asyncio.run(main())

### Calling REST APIs Asynchronously:

Exercise: Implement a program that makes asynchronous requests to a REST API to fetch user data and update a database.

Scenario: You are building a user management system that needs to fetch user data from an external API and update a local database asynchronously.

In [None]:
import asyncio
import aiohttp
import aiosqlite

async def fetch_user_data(session, user_id):
    url = f"https://api.example.com/users/{user_id}"
    async with session.get(url) as response:
        return await response.json()

async def update_user_in_db(db, user_data):
    async with db.execute("INSERT OR REPLACE INTO users (id, name, email) VALUES (?, ?, ?)",
                          (user_data['id'], user_data['name'], user_data['email'])):
        await db.commit()

async def main():
    async with aiosqlite.connect("users.db") as db:
        await db.execute("CREATE TABLE IF NOT EXISTS users (id INTEGER PRIMARY KEY, name TEXT, email TEXT)")

        async with aiohttp.ClientSession() as session:
            user_ids = [...]  # List of user IDs to fetch
            fetch_tasks = [asyncio.create_task(fetch_user_data(session, user_id)) for user_id in user_ids]
            user_data_list = await asyncio.gather(*fetch_tasks)

            update_tasks = [asyncio.create_task(update_user_in_db(db, user_data)) for user_data in user_data_list]
            await asyncio.gather(*update_tasks)

asyncio.run(main())