## Multi-Process One-Thread

In [1]:
import multiprocessing
from multiprocessing import Process

multiprocessing ch·∫°y paralell k·∫øt h·ª£p concurency

In [3]:
# Check number of available cores
print("Number of cpu cores: ", multiprocessing.cpu_count())

Number of cpu cores:  20


In [4]:
import pandas as pd
from time import sleep, time

# Example of a processing function
def process_dataframe(chunk_id, chunk_data: pd.DataFrame):
    print(f"Processing chunk {chunk_id}")
    sleep(5)
    print(f"The chunk {chunk_id} has been processed successfully!")

In [5]:
# Make a sample dataframe
data = [
    ['tom', 10], ['nick', 15],
    ['juli', 14], ['peter', 20],
    ['jason', 27], ['anna', 11]
]

# Create the pandas DataFrame
df = pd.DataFrame(data, columns=['Name', 'Age'])

# Devide the dataframe into chunks
chunk_size = 2
chunks = [df[i:i+chunk_size].to_numpy() for i in range(0, len(df), chunk_size)]

# Mark the starting point to measure processing time
start = time()

procs = []
#l·∫∑p qua c√°c chunk, m·ªói chunk ƒë∆∞a v√†o 1 proc
# kh√¥ng ƒë·ªìng ƒë·ªÅu do ƒë√¥i khi corelogic ƒëang b·∫≠n x·ª≠ l√Ω c√°i kh√°c ch∆∞a x·ª≠ l√Ω k·ªãp h√†m process_dataframe
# tuy nhi√™n M·ªói l·∫ßn Process(...) ƒë∆∞·ª£c t·∫°o v√† start(), h·ªá ƒëi·ªÅu h√†nh s·∫Ω g√°n ti·∫øn tr√¨nh ƒë√≥ v√†o m·ªôt logic core c·ª• th·ªÉ 
# (ho·∫∑c lu√¢n chuy·ªÉn gi·ªØa c√°c core n·∫øu c·∫ßn), nh∆∞ng vi·ªác g√°n v√†o core n√†o l√† do h·ªá ƒëi·ªÅu h√†nh quy·∫øt ƒë·ªãnh.
for i, chunk in enumerate(chunks):
  # Define our process but not yet start
  proc = Process(target=process_dataframe, args=(i, chunk,))
  # Start the process
  proc.start()
  # Investigate process ID
  print(f"Process ID: {proc.pid}")
  # Manage all process definitions in a list
  procs.append(proc)

# Stop all processes to prevent resource scarcity
# imagine zombie processes
# ch·ªù c√°c proc x·ª≠ l√Ω xong h·∫øt ƒë·ªÉ tr√°nh t√¨nh tr·∫°ng zombie proc
for proc in procs:
    proc.join() # Wait for the process to finish

# Report total elapsed time
# n·∫øu kh√¥ng c√≥ join ph√≠a tr√™n ƒë√¥i khi d√≤ng d∆∞·ªõi c√≤n ch·∫°y xong tr∆∞·ªõc
print(f"Total time: {time() - start}s")

Process ID: 24243
Processing chunk 0
Process ID: 24248


Processing chunk 1
Processing chunk 2
Process ID: 24251
The chunk 0 has been processed successfully!
The chunk 1 has been processed successfully!
The chunk 2 has been processed successfully!
Total time: 5.08407735824585s


In [7]:
##############################################
# The 2nd way to do multiprocess is via Pool #
##############################################
from multiprocessing import Pool

# Define a pool of processes
NUM_PROCESSES = 2 # c√≥ 2 proc ƒëc t·∫°o ra
pool = Pool(NUM_PROCESSES)

procs = []
for i, chunk in enumerate(chunks):
    procs.append(
        # The main process does not need to wait for this function
        pool.apply_async(process_dataframe, args=(i, chunk))
    )

for proc in procs:
    proc.get() # Wait for the process to finish

Processing chunk 0Processing chunk 1



The chunk 1 has been processed successfully!The chunk 0 has been processed successfully!

Processing chunk 2
The chunk 2 has been processed successfully!


In [6]:
##############################################
# The 3rd way to do multiprocess is via Map #
##############################################
inputs = [(i, chunk) for i, chunk in enumerate(chunks)]

# If you have one arg only, please use `.map` instead
outputs = pool.starmap(
    process_dataframe,
    inputs
)

In [7]:
# Uhm... you can see that sometimes, two strings are concatnated w/o any `/n` ( race condition problem)
# this is because all the processes writes to stdout at the same time, we need
# to find a way to prevent other processes write to it while one is doing
from multiprocessing import Lock

lock = Lock()

def process_dataframe(chunk_id, chunk_data: pd.DataFrame):
    lock.acquire() #process ƒëang x√©t ƒëang c√≥ quy·ªÅn c·∫ßm kh√≥a
    # do ƒëang c·∫ßm kh√≥a n√™n ƒë√¢y l√† th·∫±ng duy nh·∫•t ƒë∆∞·ª£c ghi v√†o terminal
    #M·ªói khi b·∫°n d√πng lock.acquire(), b·∫°n ƒëang n√≥i v·ªõi ch∆∞∆°ng tr√¨nh r·∫±ng ‚Äúƒëo·∫°n m√£ ti·∫øp theo c·∫ßn ƒë∆∞·ª£c th·ª±c hi·ªán theo 
    # c√°ch an to√†n, ch·ªâ c√≥ m·ªôt lu·ªìng duy nh·∫•t ƒë∆∞·ª£c ph√©p thao t√°c v√†o t√†i nguy√™n n√†y t·∫°i m·ªôt th·ªùi ƒëi·ªÉm‚Äù.
    print(f"Processing chunk {chunk_id}")
    sleep(5)
    print(f"The chunk {chunk_id} has been processed successfully!")
    lock.release() #process ƒëang x√©t nh·∫£ kh√≥a ra

# Make a sample dataframe
data = [
    ['tom', 10], ['nick', 15],
    ['juli', 14], ['peter', 20],
    ['jason', 27], ['anna', 11]
]

# Create the pandas DataFrame
df = pd.DataFrame(data, columns=['Name', 'Age'])

# Devide the dataframe into chunks
chunk_size = 2
chunks = [df[i:i+chunk_size] for i in range(0, len(df), chunk_size)]

# Mark the starting point to measure processing time
start = time()

procs = []
for i, chunk in enumerate(chunks):
  # Define our process but not yet start
  proc = Process(target=process_dataframe, args=(i, chunk,))
  # Start the process
  proc.start()
  # Manage all process definitions in a list
  procs.append(proc)

# Stop all processes to prevent resource scarcity
# imagine zombie processes
for proc in procs:
    proc.join()

# Report total elapsed time
print(f"Total time: {time() - start}s")

Processing chunk 0
The chunk 0 has been processed successfully!
Processing chunk 1
The chunk 1 has been processed successfully!
Processing chunk 2
The chunk 2 has been processed successfully!
Total time: 15.079243898391724s


## Multi-Threaded

Python provides the same APIs to multiprocessing with `start()` and `join()`.

Thread c√πng chia s·∫ª v√πng nh·ªõ v·ªõi nhau khi n√≥ c√πng trong process ( proc c√≥ th·ªÉ t·∫°o nhi·ªÅu thread) 
GIL ( Khi c√≥ 1 proc c√≥ kh·∫£ nƒÉng excute nhi·ªÅu task, m·ªói task s·∫Ω ƒë∆∞·ª£c x·ª≠ l√Ω b·ªüi c√°c thread -> x·∫£y ra t√¨nh tr·∫°ng thread x·ª≠ l√Ω task n√†y nh·∫£y sang task kh√°c -> c√πng c·∫≠p nh·∫≠t variable X)

üß† GIL (Global Interpreter Lock) l√† g√¨?

GIL l√† m·ªôt c∆° ch·∫ø trong CPython (tr√¨nh th√¥ng d·ªãch Python ph·ªï bi·∫øn nh·∫•t) ƒë·∫£m b·∫£o r·∫±ng ch·ªâ m·ªôt thread Python ƒë∆∞·ª£c th·ª±c thi t·∫°i m·ªôt th·ªùi ƒëi·ªÉm, ngay c·∫£ khi b·∫°n s·ª≠ d·ª•ng nhi·ªÅu thread.

ü§î T·∫°i sao l·∫°i c√≥ GIL?

CPython kh√¥ng thread-safe khi truy c·∫≠p v√†o c√°c ƒë·ªëi t∆∞·ª£ng Python trong b·ªô nh·ªõ.

GIL gi√∫p ƒë∆°n gi·∫£n h√≥a vi·ªác qu·∫£n l√Ω b·ªô nh·ªõ v√† garbage collection.

Nh∆∞ng ƒë·ªïi l·∫°i, n√≥ g√¢y ra h·∫°n ch·∫ø v·ªõi c√°c ch∆∞∆°ng tr√¨nh ƒëa lu·ªìng (multi-threading), ƒë·∫∑c bi·ªát khi x·ª≠ l√Ω t√°c v·ª• CPU-bound.

üìå Khi n√†o GIL kh√¥ng ph·∫£i l√† v·∫•n ƒë·ªÅ?

V·ªõi I/O-bound tasks (nh∆∞ ƒë·ªçc ghi file, g·ªçi API, ch·ªù m·∫°ng...), GIL kh√¥ng ·∫£nh h∆∞·ªüng nhi·ªÅu.

V·ªõi CPU-bound tasks (t√≠nh to√°n n·∫∑ng), GIL l√†m multi-threading k√©m hi·ªáu qu·∫£. L√∫c n√†y, n√™n d√πng multiprocessing (ƒëa ti·∫øn tr√¨nh) thay v√¨ ƒëa lu·ªìng.

üì¶ M·ªôt s·ªë c√°ch ‚Äúv∆∞·ª£t qua‚Äù GIL:

S·ª≠ d·ª•ng multiprocessing thay v√¨ threading

D√πng th∆∞ vi·ªán C m·ªü r·ªông (nh∆∞ NumPy, ho·∫∑c c√°c th∆∞ vi·ªán Cython cho ph√©p th·∫£ GIL)

Chuy·ªÉn sang tr√¨nh th√¥ng d·ªãch kh√¥ng c√≥ GIL, nh∆∞:

PyPy (d√π hi·ªán v·∫´n c√≥ GIL)

Jython (ch·∫°y tr√™n JVM, kh√¥ng c√≥ GIL)

IronPython (ch·∫°y tr√™n .NET CLR)

In [8]:
from threading import Thread

my_threads = []
for i, chunk in enumerate(chunks):
    my_thread = Thread(target=process_dataframe, args=(i, chunk,))
    my_thread.start()
    my_threads.append(my_thread)

for my_thread in my_threads:
    my_thread.join()

Processing chunk 0
The chunk 0 has been processed successfully!
Processing chunk 1
The chunk 1 has been processed successfully!
Processing chunk 2
The chunk 2 has been processed successfully!


However, the way they share data is different. Let's take a look at the example below

In [13]:
from threading import Lock

# Define a lock to prevent race condition,
# which is multiple updates into the same variable
lock = Lock()

# Share data
shared_data = 0

def increment_function():
    global shared_data
    with lock: # This is equal to acquire() + release()
        shared_data += 1

my_threads = []
for _ in range(3):
    my_thread = Thread(target=increment_function)
    my_thread.start()
    my_threads.append(my_thread)

for my_thread in my_threads:
    my_thread.join()

print(f"Current value of shared_data: {shared_data}")

Current value of shared_data: 3


In [14]:
# Another way to access shared_data is via Queue
from queue import Queue

# Initialize the queue
shared_queue = Queue()

def increment_function():
    shared_queue.put(1)

my_threads = []
for _ in range(3):
    my_thread = Thread(target=increment_function)
    my_thread.start()
    my_threads.append(my_thread)

for i, my_thread in enumerate(my_threads):
    my_thread.join()

shared_data = 0
while not shared_queue.empty():
    shared_data += shared_queue.get()

print(f"Current value of shared_data: {shared_data}")

Current value of shared_data: 3


In [None]:
# Using a global variable is not a piece of cake in multiprocessing
# as in multithreading
import multiprocessing

# Define a shared value between oprocesses
shared_variable = multiprocessing.Value('i', 0)

def worker_function():
    global shared_variable
    with shared_variable.get_lock():
        shared_variable.value += 1

procs = []
for _ in range(3):
  # Define our process but not yet start
  proc = Process(target=worker_function)
  # Start the process
  proc.start()
  # Manage all process definitions in a list
  procs.append(proc)

for proc in procs:
    proc.join()

print(f"Current value of shared_data: {shared_data}")

In [None]:
# Or using queue
from multiprocessing import Queue
from multiprocessing import Process

def worker_function(shared_queue):
    shared_queue.put(1)

shared_queue = Queue()

procs = []
for _ in range(3):
    my_proc = Process(target=worker_function, args=(shared_queue,))
    my_proc.start()
    procs.append(my_proc)

for proc in procs:
    proc.join()

# The main process retrieves data from the queue
# and aggregate the result
result = 0
while not shared_queue.empty():
    result += shared_queue.get(1)

print(f"Results from multiprocessing queue: {result}")

## One-Process One-Thread
### V√†o event_loop_demo ƒë·ªÉ test k·ªπ

In [15]:
# Let's say you have one process with one thread only, how to deal with it?
# AsyncIO helps us to do it
#NOTE: CH·ªà N√äN D√ôNG AWAIT CHO C√ÅC T√ÅC V·ª§ I/O
import asyncio

#coroutine
async def download_my_1st_data_func():
    print("Starting my 1st data func...")
    await asyncio.sleep(2)
    print("Completed my 1st data func!")

async def download_my_2nd_data_func():
    print("Starting my 2nd data func...")
    await asyncio.sleep(2)
    print("Completed my 2nd data func!")

def main():
    loop = asyncio.get_event_loop() # n∆°i ch·ª©a task 1 thread c·∫ßn th·ª±c hi·ªán
    loop.run_until_complete(asyncio.gather(
        download_my_1st_data_func(),
        download_my_2nd_data_func(),
    ))
    loop.close()

# Run the event loop
main()

# YOU WILL MEET SOME WEIRD ERRORS HERE DUE TO IPYTHON NOTEBOOK,
# LET'S MOVE TO A PYTHON SCRIPT

RuntimeError: This event loop is already running

Starting my 1st data func...
Starting my 2nd data func...
Completed my 1st data func!
Completed my 2nd data func!
