<a href="https://colab.research.google.com/github/lcbjrrr/quantai/blob/main/P99_Py_ASync.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Asynchronous programming

Asynchronous programming lets your program start tasks and then move on to other work without waiting for those tasks to finish immediately. It uses techniques like event loops, callbacks, promises, and async/await to handle the results of those tasks later, improving responsiveness and efficiency, especially for I/O-bound operations

# asyncio lib (and async and await)

This Python code demonstrates asynchronous programming with `asyncio`. The `background_task()` coroutine simulates a 3-second task using `asyncio.sleep()`. The `main()` coroutine starts the `background_task()` as a separate task using `asyncio.create_task()` and adds a callback to print a completion message. Importantly, `main()` continues executing other code (simulated by `asyncio.sleep(1)`) *without waiting* for `background_task()` to finish immediately. The `await asyncio.sleep(5)` later in `main()` ensures the background task has time to complete and trigger its callback. Finally, `asyncio.run(main())` within the `go()` function starts the asynchronous event loop, allowing the concurrent execution of `background_task()` and `main()`, showcasing how asynchronous code enables running multiple operations seemingly at the same time.


In [24]:
%%writefile async.py
import asyncio

async def background_task(time):
    await asyncio.sleep(time)
    print("Background task completed!")

async def main():
        print("Starting the main function...")
        t1=asyncio.create_task(background_task(2))  # Start the task without waiting
        t1.add_done_callback(lambda x: print("finished",x))
        t2=asyncio.create_task(background_task(1))  # Start the task without waiting
        t2.add_done_callback(lambda x: print("finished",x))
        print("Continuing the main function...")
        await asyncio.sleep(1)  # Simulate doing something else
        print("Waiting for the completion....")
        await asyncio.wait([t1,t2])
        print("Main function completed!")

def go():
  asyncio.run(main())

go()

Overwriting async.py


In [25]:
!python async.py

Starting the main function...
Continuing the main function...
Waiting for the completion....
Background task completed!
finished <Task finished name='Task-3' coro=<background_task() done, defined at /content/async.py:3> result=None>
Background task completed!
finished <Task finished name='Task-2' coro=<background_task() done, defined at /content/async.py:3> result=None>
Main function completed!


## Multi Threading

Threading is a way to achieve concurrency within a single program by dividing it into multiple independent execution paths called threads. These threads run concurrently within the same process, sharing the same memory space. This allows different parts of a program to execute seemingly simultaneously, improving performance for tasks that can be broken down into independent subtasks, especially on multi-core processors. However, because threads share memory, careful synchronization mechanisms like locks and mutexes are necessary to prevent race conditions and data corruption when multiple threads access and modify shared resources. Unlike asynchronous programming, which uses cooperative multitasking, threads are managed by the operating system (preemptive multitasking), meaning the OS decides when to switch between threads.

In [17]:
import threading
import time

def background_task(t):
    time.sleep(t)
    print(f"Background task completed after {t} seconds!")

def main():
    print("Starting the main function...")

    # Create and start threads for background tasks
    thread1 = threading.Thread(target=background_task, args=(2,))
    thread2 = threading.Thread(target=background_task, args=(1,))

    thread1.start()
    thread2.start()

    print("Continuing the main function...")
    time.sleep(1)  # Simulate doing something else
    print("Waiting for the completion....")

    # Wait for threads to finish
    thread1.join()
    thread2.join()

    print("Main function completed!")


main()

Starting the main function...
Continuing the main function...
Background task completed after 1 seconds!
Waiting for the completion....
Background task completed after 2 seconds!
Main function completed!


***When to use each one:***


| Feature | Threading | Async |
|---|---|---|
| Task Type | CPU-bound | I/O-bound |
| Parallelism | True parallelism (on multi-core) | Concurrency (not true parallelism in Python due to GIL) |
| Resource Usage | Higher (more memory and OS overhead) | Lower (less memory and OS overhead) |
| Complexity | More complex (requires careful synchronization) | Less complex (easier to avoid race conditions) |
| Best For | Computationally intensive tasks | Network and I/O operations |


## Multiprocessing

Multiprocessing in Python enables true parallelism by creating separate processes, each with its own memory space and Python interpreter, effectively bypassing the Global Interpreter Lock (GIL). This allows multiple CPU cores to execute code concurrently, significantly improving performance for CPU-bound tasks by distributing the workload across multiple processors, but at the cost of higher resource usage compared to threading.

In [18]:
import multiprocessing
import time

def worker(t):
    print(f"Process (sleeping for {t}s): Starting...")
    time.sleep(t)  # Simulate some work
    print(f"Process (slept for {t}s): Finished!")

def main():
    p1 = multiprocessing.Process(target=worker, args=(2,))  # Task with 2-second sleep
    p2 = multiprocessing.Process(target=worker, args=(1,))  # Task with 1-second sleep
    ps = [p1,p2]

    for p in ps: p.start()

    print("Continuing the main function...")
    time.sleep(1)  # Simulate doing something else
    print("Waiting for the completion....")

    for p in ps: p.join()

    print("All processes completed.")

main()

Process (sleeping for 2s): Starting...
Process (sleeping for 1s): Starting...
Continuing the main function...
Process (slept for 1s): Finished!
Waiting for the completion....
Process (slept for 2s): Finished!
All processes completed.


# Activity

The problem at hand involves analyzing student grades across multiple courses. Given a dataset containing individual student grades and their corresponding courses, the objective is twofold. First, we aim to calculate the mean grade for each distinct course. This provides a measure of central tendency, indicating the average performance of students within each course. Second, we seek to determine the overall average performance across all courses by calculating the mean of the individual course means. This second metric offers a broader perspective on student achievement, summarizing performance across the entire set of courses. The input data is structured as a table with course names and associated numerical grades, and the desired output consists of both the individual course means and the average of these means.



```
    "course":['ADM','ADM','LAW','LAW','ECO','LAW','ADM','LAW','ECO','LAW','ADM','ADM'],
    "grade":[8.6,6,3.6,8.6,8.1,5.4,7.2,8.9,7.8,9.7,7.2,4.2]
```



## Async

### Have a callback and store the result per task

In [77]:
%%writefile async.py
import asyncio
from numpy import mean

import pandas as pd
grades = pd.DataFrame({
    "course":['ADM','ADM','LAW','LAW','ECO','LAW','ADM','LAW','ECO','LAW','ADM','ADM'],
    "grade":[8.6,6,3.6,8.6,8.1,5.4,7.2,8.9,7.8,9.7,7.2,4.2]
})

async def background_task(df,f,t):
  print('background_task...'+str(df[df['course']==f]['grade'].mean()))
  t.append(df[df['course']==f]['grade'].mean())
  print(t)
  return t[-1]

def add_total(tk,t):
  print('add_total...')
  t[0] = t[0] + tk.result()
  print(t[0])

async def main():
  total=[0]
  totals=[]
  tasks = []
  for c in ['ADM','ECO','LAW']:
    t=asyncio.create_task(background_task(grades,c,totals))
    t.add_done_callback(lambda x: add_total(x,total))
    await asyncio.sleep(2)
    tasks.append(t)
  await asyncio.wait(tasks)
  print('Mean:',total[0]/3,mean(totals))

def go():
  asyncio.run(main())

go()

Overwriting async.py


### Not using callback

In [85]:
%%writefile async.py
import asyncio
from numpy import mean

import pandas as pd
grades = pd.DataFrame({
    "course":['ADM','ADM','LAW','LAW','ECO','LAW','ADM','LAW','ECO','LAW','ADM','ADM'],
    "grade":[8.6,6,3.6,8.6,8.1,5.4,7.2,8.9,7.8,9.7,7.2,4.2]
})

async def background_task(df,f):
  print('background_task...'+str(df[df['course']==f]['grade'].mean()))
  return df[df['course']==f]['grade'].mean()


async def main():
  tasks = []
  for c in ['ADM','ECO','LAW']:
    t=asyncio.create_task(background_task(grades,c))
    await asyncio.sleep(2)
    tasks.append(t)
  await asyncio.wait(tasks)
  total = 0
  for t in tasks: total = total + t.result()
  print('Mean:',total/len(tasks))

def go():
  asyncio.run(main())

go()

Overwriting async.py


### Dummy callback

In [87]:
%%writefile async.py
import asyncio
from numpy import mean

import pandas as pd
grades = pd.DataFrame({
    "course":['ADM','ADM','LAW','LAW','ECO','LAW','ADM','LAW','ECO','LAW','ADM','ADM'],
    "grade":[8.6,6,3.6,8.6,8.1,5.4,7.2,8.9,7.8,9.7,7.2,4.2]
})

async def background_task(df,f):
  print('background_task...'+str(df[df['course']==f]['grade'].mean()))
  return df[df['course']==f]['grade'].mean()

def add_total(tk):
  print('finished:',tk,tk.result())

async def main():
  tasks = []
  for c in ['ADM','ECO','LAW']:
    t=asyncio.create_task(background_task(grades,c))
    t.add_done_callback(add_total)
    await asyncio.sleep(2)
    tasks.append(t)
  await asyncio.wait(tasks)
  total = 0
  for t in tasks: total = total + t.result()
  print('Mean:',total/len(tasks))

def go():
  asyncio.run(main())

go()

Overwriting async.py


### Final py version

In [94]:
%%writefile async.py
import asyncio
from numpy import mean

import pandas as pd
grades = pd.DataFrame({
    "course":['ADM','ADM','LAW','LAW','ECO','LAW','ADM','LAW','ECO','LAW','ADM','ADM'],
    "grade":[8.6,6,3.6,8.6,8.1,5.4,7.2,8.9,7.8,9.7,7.2,4.2]
})

async def background_task(df,f):
  print('background_task...'+str(df[df['course']==f]['grade'].mean()))
  return df[df['course']==f]['grade'].mean()

def add_total(tk):
  print('finished:',tk,tk.result())

def finalize(tasks):
    total = 0
    for t in tasks:
      total = total + t.result()
    return total/len(tasks)


async def main():
  tasks = []
  for c in ['ADM','ECO','LAW']:
    t=asyncio.create_task(background_task(grades,c))
    t.add_done_callback(add_total)
    await asyncio.sleep(1)
    tasks.append(t)
  await asyncio.wait(tasks)
  print('Mean:',finalize(tasks))

def go():
  asyncio.run(main())

go()

Overwriting async.py


In [95]:
!python async.py

background_task...6.640000000000001
finished: <Task finished name='Task-2' coro=<background_task() done, defined at /content/async.py:10> result=6.640000000000001> 6.640000000000001
background_task...7.949999999999999
finished: <Task finished name='Task-3' coro=<background_task() done, defined at /content/async.py:10> result=7.949999999999999> 7.949999999999999
background_task...7.24
finished: <Task finished name='Task-4' coro=<background_task() done, defined at /content/async.py:10> result=7.24> 7.24
Mean: 7.276666666666666


### Javascript version

In [89]:
%%js
const grades = [
  { course: 'ADM', grade: 8.6 },
  { course: 'ADM', grade: 6 },
  { course: 'LAW', grade: 3.6 },
  { course: 'LAW', grade: 8.6 },
  { course: 'ECO', grade: 8.1 },
  { course: 'LAW', grade: 5.4 },
  { course: 'ADM', grade: 7.2 },
  { course: 'LAW', grade: 8.9 },
  { course: 'ECO', grade: 7.8 },
  { course: 'LAW', grade: 9.7 },
  { course: 'ADM', grade: 7.2 },
  { course: 'ADM', grade: 4.2 },
];

async function backgroundTask(df, course) {
  console.log(`background_task... ${df.filter(row => row.course === course).map(row => row.grade).reduce((acc, val) => acc + val, 0) / df.filter(row => row.course === course).length}`);
  return df.filter(row => row.course === course).map(row => row.grade).reduce((acc, val) => acc + val, 0) / df.filter(row => row.course === course).length;
}

async function addTotal(task) {
  console.log('finished:', task, await task);
}

async function main() {
  const tasks = [];
  const courses = ['ADM', 'ECO', 'LAW'];

  for (const course of courses) {
    const task = backgroundTask(grades, course);
    task.then(addTotal); // Attach callback with `then`
    await new Promise(resolve => setTimeout(resolve, 2000)); // Simulate 2-second delay
    tasks.push(task);
  }

  await Promise.all(tasks); // Wait for all tasks to finish

  const results = await Promise.all(tasks.map(task => task)); // Get results from all tasks
  const total = results.reduce((acc, val) => acc + val, 0);

  console.log('Mean:', total / courses.length);
}

main();

<IPython.core.display.Javascript object>

### Multi Thread

In [6]:
import threading
import queue
import time
from numpy import mean

import pandas as pd

grades = pd.DataFrame({
    "course": ['ADM', 'ADM', 'LAW', 'LAW', 'ECO', 'LAW', 'ADM', 'LAW', 'ECO', 'LAW', 'ADM', 'ADM'],
    "grade": [8.6, 6, 3.6, 8.6, 8.1, 5.4, 7.2, 8.9, 7.8, 9.7, 7.2, 4.2]
})

def calculate_course_mean(df, course, result_queue):
    mean_grade = df[df['course'] == course]['grade'].mean()
    print(f'\nBackground task... Mean grade for {course}: {mean_grade}')
    result_queue.put(mean_grade)  # Put the result in the queue

def finalize(q):
  total = 0
  qty=q.qsize()
  while not q.empty():
    total += q.get()
  return total/qty


result_queue = queue.Queue()  # Create a thread-safe queue
threads = []
courses = ['ADM', 'ECO', 'LAW']

for course in courses:
    thread = threading.Thread(target=calculate_course_mean, args=(grades.copy(), course, result_queue))
    threads.append(thread)
    thread.start()

for thread in threads: thread.join()  # Wait for all threads to finish

print('\nMEANs:',finalize(result_queue))



Background task... Mean grade for ADM: 6.640000000000001

Background task... Mean grade for ECO: 7.949999999999999

Background task... Mean grade for LAW: 7.24

MEANs: 7.276666666666666
