In [1]:
import sys

In [2]:
a = [1, 2, 3]

In [3]:
b = a

In [4]:
sys.getrefcount(a)

3

# Threading

In [5]:
def counter(n):
    while n >= 0:
        n -= 1

In [6]:
%%time
counter(10**8)

CPU times: user 4.8 s, sys: 0 ns, total: 4.8 s
Wall time: 4.79 s


In [7]:
from threading import Thread

In [8]:
t1 = Thread(target=counter, args=(10**8 // 3,))
t2 = Thread(target=counter, args=(10**8 // 3,))
t3 = Thread(target=counter, args=(10**8 // 3,))

In [9]:
%%time

t1.start()
t2.start()
t3.start()
t1.join()
t2.join()
t3.join()

CPU times: user 7.92 s, sys: 77.9 ms, total: 7.99 s
Wall time: 7.98 s


# Multiprocessing

In [10]:
from multiprocessing import Pool

In [11]:
pool = Pool(processes=6)

In [12]:
%%time

r1 = pool.apply_async(func=counter, args=(10**8 // 6,))
r2 = pool.apply_async(func=counter, args=(10**8 // 6,))
r3 = pool.apply_async(func=counter, args=(10**8 // 6,))
r4 = pool.apply_async(func=counter, args=(10**8 // 6,))
r5 = pool.apply_async(func=counter, args=(10**8 // 6,))
r6 = pool.apply_async(func=counter, args=(10**8 // 6,))

pool.close()
pool.join()

CPU times: user 8.44 ms, sys: 3.97 ms, total: 12.4 ms
Wall time: 841 ms


In [2]:
import numpy as np
from multiprocessing import Process, Manager, Array, Pool, Pipe, cpu_count
import ctypes
import time
import os
from itertools import repeat

%env OMP_NUM_THREADS=1

env: OMP_NUM_THREADS=1


# Does not work

In [14]:
def func(d, i):
    d.append(i)

In [15]:
d = []
proc_list = [Process(target=func, args=(d, i,)) for i in range(12)]

In [16]:
for proc in proc_list:
    proc.start()
    
for proc in proc_list:
    proc.join()

In [17]:
d

[]

# Work

In [18]:
manager = Manager()
d = manager.list()
proc_list = [Process(target=func, args=(d, i,)) for i in range(12)]

In [19]:
for proc in proc_list:
    proc.start()
    
for proc in proc_list:
    proc.join()

In [20]:
list(d)

[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11]

In [21]:
manager.shutdown()

# Ping pong

In [22]:
def proc(idx, n, pipe):
    val = 0
    if idx == 0:
        pipe.send(0)
        
    while val < n:
        val = pipe.recv()
        print(val, os.getpid())
        time.sleep(1)
        val += 1
        
        pipe.send(val)

In [23]:
pipe_l, pipe_r = Pipe()
proc1 = Process(target=proc, args=(0, 10, pipe_l))
proc2 = Process(target=proc, args=(1, 10, pipe_r))

In [24]:
proc1.start()
proc2.start()

proc1.join()
proc2.join()

0 11357
1 11356
2 11357
3 11356
4 11357
5 11356
6 11357
7 11356
8 11357
9 11356
10 11357


# Matmul

In [25]:
n = 4_000
A = np.random.randn(n, n)
B = np.random.randn(n, n)

In [26]:
%%time

C = A @ B

CPU times: user 3.74 s, sys: 31.3 ms, total: 3.77 s
Wall time: 3.81 s


In [27]:
num_proc = cpu_count()

In [28]:
block_size = n // num_proc
num_blocks = n // block_size
block_size += bool(n % num_proc)
i_pos = [min(i * block_size, n) for i in range(num_blocks + 1)]

In [29]:
i_pos

[0, 667, 1334, 2001, 2668, 3335, 4000]

In [30]:
def matmul(A, B, d, i):
    d[i] = A @ B

In [31]:
manager = Manager()
d = manager.dict()
proc_list = [Process(target=matmul, args=(A[i_pos[i]:i_pos[i + 1]], B, d, i)) for i in range(len(i_pos) - 1)]

In [32]:
%%time

for proc in proc_list:
    proc.start()
    
for proc in proc_list:
    proc.join()
    
res = [d[i] for i in range(len(d))]
C_p = np.vstack(res)

manager.shutdown()

CPU times: user 88.1 ms, sys: 205 ms, total: 293 ms
Wall time: 1.56 s


In [33]:
np.linalg.norm(C - C_p)

8.761931539665933e-12

In [34]:
for proc in proc_list:
    print(proc.pid)

11469
11471
11473
11475
11477
11479


# Matmul shared memory

In [35]:
n = 4_000
A = np.random.randn(n, n)
B = np.random.randn(n, n)

In [36]:
num_proc = cpu_count()

In [37]:
block_size = n // num_proc
num_blocks = n // block_size
block_size += bool(n % num_proc)
i_pos = [min(i * block_size, n) for i in range(num_blocks + 1)]

In [38]:
i_pos

[0, 667, 1334, 2001, 2668, 3335, 4000]

In [39]:
C_shared = Array(ctypes.c_double, n * n, lock=False)

In [40]:
C = np.frombuffer(C_shared, dtype=np.float64).reshape(n, n)

In [41]:
def matmul_shared(C_shared, n, i1, i2):
    C = np.frombuffer(C_shared, dtype=np.float64).reshape(n, n)
    C[i1:i2] = A[i1:i2] @ B

In [42]:
proc_list = [Process(target=matmul_shared, args=(C_shared, n, i_pos[i], i_pos[i + 1])) for i in range(len(i_pos) - 1)]

In [43]:
%%time

for proc in proc_list:
    proc.start()
    
for proc in proc_list:
    proc.join()

CPU times: user 3.69 ms, sys: 40.3 ms, total: 44 ms
Wall time: 1.04 s


In [44]:
%%time

C_true = A @ B

CPU times: user 3.8 s, sys: 35 ms, total: 3.84 s
Wall time: 3.83 s


In [45]:
np.linalg.norm(C - C_true)

8.777331769937788e-12

In [46]:
for proc in proc_list:
    print(proc.pid)

11515
11516
11517
11518
11519
11520


In [47]:
def gcd(x, y):
    while (y):
        x, y = y, x % y
    return x

def phi(n):
    cnt = 0
    for i in range(1, n):
        if gcd(i, n) == 1:
            cnt += 1
    return cnt

In [48]:
a = np.random.randint(low=1, high=10**4, size=1000)

In [49]:
%%time

[phi(n) for n in a]

CPU times: user 6.35 s, sys: 0 ns, total: 6.35 s
Wall time: 6.35 s


[4260,
 3744,
 2112,
 760,
 8100,
 960,
 80,
 4896,
 1392,
 2908,
 1600,
 2400,
 4452,
 4416,
 8352,
 2232,
 1920,
 6388,
 128,
 5620,
 1482,
 486,
 78,
 4372,
 4632,
 2784,
 2580,
 3318,
 3240,
 3164,
 32,
 5256,
 5912,
 2872,
 2086,
 2688,
 1680,
 5272,
 512,
 1380,
 3456,
 60,
 1152,
 3852,
 960,
 3960,
 5376,
 712,
 2628,
 2736,
 1344,
 432,
 8112,
 3168,
 5520,
 4608,
 8736,
 6240,
 8536,
 1280,
 1440,
 396,
 3200,
 3168,
 3240,
 1560,
 360,
 3360,
 3440,
 7200,
 3600,
 792,
 3240,
 2688,
 1280,
 636,
 2652,
 1116,
 4536,
 1536,
 7800,
 720,
 1920,
 2688,
 776,
 3984,
 3360,
 8886,
 3180,
 2560,
 3388,
 368,
 1824,
 580,
 1560,
 60,
 9216,
 3180,
 648,
 6720,
 5292,
 4260,
 2856,
 960,
 2904,
 2880,
 1440,
 48,
 4224,
 2904,
 2736,
 960,
 4032,
 2664,
 3280,
 2392,
 5200,
 1944,
 1260,
 2206,
 600,
 3132,
 2256,
 7152,
 2248,
 144,
 768,
 276,
 9160,
 1512,
 3490,
 3440,
 3912,
 4752,
 3088,
 3392,
 4956,
 5712,
 4456,
 3564,
 1300,
 4480,
 1536,
 4390,
 4544,
 480,
 7116,
 4752,


In [50]:
from multiprocessing import Pool

In [51]:
%%time

with Pool(processes=cpu_count()) as pool:
    res = pool.map(phi, a)

CPU times: user 98.4 ms, sys: 75.3 ms, total: 174 ms
Wall time: 1.27 s


In [53]:
res

[4260,
 3744,
 2112,
 760,
 8100,
 960,
 80,
 4896,
 1392,
 2908,
 1600,
 2400,
 4452,
 4416,
 8352,
 2232,
 1920,
 6388,
 128,
 5620,
 1482,
 486,
 78,
 4372,
 4632,
 2784,
 2580,
 3318,
 3240,
 3164,
 32,
 5256,
 5912,
 2872,
 2086,
 2688,
 1680,
 5272,
 512,
 1380,
 3456,
 60,
 1152,
 3852,
 960,
 3960,
 5376,
 712,
 2628,
 2736,
 1344,
 432,
 8112,
 3168,
 5520,
 4608,
 8736,
 6240,
 8536,
 1280,
 1440,
 396,
 3200,
 3168,
 3240,
 1560,
 360,
 3360,
 3440,
 7200,
 3600,
 792,
 3240,
 2688,
 1280,
 636,
 2652,
 1116,
 4536,
 1536,
 7800,
 720,
 1920,
 2688,
 776,
 3984,
 3360,
 8886,
 3180,
 2560,
 3388,
 368,
 1824,
 580,
 1560,
 60,
 9216,
 3180,
 648,
 6720,
 5292,
 4260,
 2856,
 960,
 2904,
 2880,
 1440,
 48,
 4224,
 2904,
 2736,
 960,
 4032,
 2664,
 3280,
 2392,
 5200,
 1944,
 1260,
 2206,
 600,
 3132,
 2256,
 7152,
 2248,
 144,
 768,
 276,
 9160,
 1512,
 3490,
 3440,
 3912,
 4752,
 3088,
 3392,
 4956,
 5712,
 4456,
 3564,
 1300,
 4480,
 1536,
 4390,
 4544,
 480,
 7116,
 4752,


In [56]:
from multiprocessing import Barrier

In [57]:
def func(idx, b, q):
    for _ in range(10):
        if idx == 0:
            time.sleep(5)

        q.append('before ' + str(idx) + ' ' + time.asctime())
        b.wait()
        q.append('after ' + str(idx) + ' ' + time.asctime())

In [58]:
num_proc = 3
b = Barrier(3)
m = Manager()
q = m.list()
proc_list = [Process(target=func, args=[i, b, q]) for i in range(num_proc)]

In [59]:
for proc in proc_list:
    proc.start()
    
for proc in proc_list:
    proc.join()
    
q = list(q)
m.shutdown()

In [60]:
q

['before 1 Thu Feb  9 16:08:29 2023',
 'before 2 Thu Feb  9 16:08:29 2023',
 'before 0 Thu Feb  9 16:08:34 2023',
 'after 0 Thu Feb  9 16:08:34 2023',
 'after 1 Thu Feb  9 16:08:34 2023',
 'after 2 Thu Feb  9 16:08:34 2023',
 'before 1 Thu Feb  9 16:08:34 2023',
 'before 2 Thu Feb  9 16:08:34 2023',
 'before 0 Thu Feb  9 16:08:39 2023',
 'after 0 Thu Feb  9 16:08:39 2023',
 'after 2 Thu Feb  9 16:08:39 2023',
 'after 1 Thu Feb  9 16:08:39 2023',
 'before 2 Thu Feb  9 16:08:39 2023',
 'before 1 Thu Feb  9 16:08:39 2023',
 'before 0 Thu Feb  9 16:08:44 2023',
 'after 0 Thu Feb  9 16:08:44 2023',
 'after 1 Thu Feb  9 16:08:44 2023',
 'after 2 Thu Feb  9 16:08:44 2023',
 'before 1 Thu Feb  9 16:08:44 2023',
 'before 2 Thu Feb  9 16:08:44 2023',
 'before 0 Thu Feb  9 16:08:49 2023',
 'after 0 Thu Feb  9 16:08:49 2023',
 'after 2 Thu Feb  9 16:08:49 2023',
 'before 2 Thu Feb  9 16:08:49 2023',
 'after 1 Thu Feb  9 16:08:49 2023',
 'before 1 Thu Feb  9 16:08:49 2023',
 'before 0 Thu Feb  9 16