# Threading

In [19]:
import threading
import traceback
import time

# 實際要執行的函式
def sleep(i):
    time.sleep(i)
    return i

output_dict = {}  # 輸出要考慮順序問題

# semaphore: threading 內建的計數器
semaphore = threading.Semaphore(2)  # 限制最多2個線程

# 給多線程執行的函式
def func(i):
    try:
        print(f"第{i}次")
        semaphore.acquire()
        
        output = sleep(i)
    except Exception:
        traceback.print_exc()
    finally:
        output_dict[i] = output
        semaphore.release()

threads = []
for i in range(8):  # 跑8次
    t = threading.Thread(target=func, args=(i,))  # 每次sleep i秒
    t.start()
    threads.append(t)
for thread in threads:
    thread.join()

第0次
第1次
第2次
第3次
第4次
第5次
第6次
第7次


In [20]:
output_dict

{0: 0, 1: 1, 2: 2, 3: 3, 4: 4, 5: 5, 6: 6, 7: 7}

In [None]:
# https://superfastpython.com/threading-in-python/

In [1]:
# ref: https://blog.gtwang.org/programming/python-threading-multithreaded-programming-tutorial/
import threading
import time

for i in range(3):
  print("Main thread:", i)
  print(threading.active_count())
  time.sleep(1)

for i in range(5):
  print("Child thread:", i)
  print(threading.active_count())
  time.sleep(1)

print(threading.active_count())
print("Done.")

Main thread: 0
6
Main thread: 1
6
Main thread: 2
6
Child thread: 0
6
Child thread: 1
6
Child thread: 2
6
Child thread: 3
6
Child thread: 4
6
6
Done.


In [2]:
# ref: https://blog.gtwang.org/programming/python-threading-multithreaded-programming-tutorial/
## the output is weird because the print is not protected 
import threading
import time

# 子執行緒的工作函數
def job():
  for i in range(5):
    print("Child thread:", i)
    time.sleep(1)

# 建立一個子執行緒
t = threading.Thread(target = job)
print(threading.active_count())

# 執行該子執行緒
t.start()
print(threading.active_count())

# 主執行緒繼續執行自己的工作
for i in range(3):
  print("Main thread:", i)
  time.sleep(1)

# 等待 t 這個子執行緒結束，主程式菜會繼續執行
t.join()

print("Done.")

6
Child thread: 0
7
Main thread: 0
Child thread: 1
Main thread: 1
Main thread: 2
Child thread: 2
Child thread: 3
Child thread: 4
Done.


In [14]:
# multi-thread
import threading
import time

# 子執行緒的工作函數
def job(num):
  print("Thread", num)
  time.sleep(1)

# 建立 5 個子執行緒
threads = []
for i in range(5):
  threads.append(threading.Thread(target = job, args = (i,)))
  threads[i].start()
  print(threading.active_count())

# 主執行緒繼續執行自己的工作
for i in range(3):
  print("Main thread:", i)
  time.sleep(1)

# 等待所有子執行緒結束
for i in range(5):
  threads[i].join()

print("Done.")

Thread 0
7
Thread 1
8
Thread 2
9
Thread 3
10
Thread 4
11
Main thread: 0
Main thread: 1
Main thread: 2
Done.


In [25]:
# build in OOP
import threading
import time

# 子執行緒類別
class MyThread(threading.Thread):
  def __init__(self, num):
    # super().__init__()
    threading.Thread.__init__(self)
    self.num = num

  def run(self):
    print("Thread", self.num)
    time.sleep(1)

# 建立 5 個子執行緒
threads = []
for i in range(5):
  threads.append(MyThread(i))
  threads[i].start()

# 主執行緒繼續執行自己的工作
for i in range(3):
  print("Main thread:", i)
  time.sleep(1)

# 等待所有子執行緒結束
for i in range(5):
  threads[i].join()

print("Done.")

Thread 0
Thread 1
Thread 2
Thread 3
Thread 4
Main thread: 0
Main thread: 1
Main thread: 2
Done.


In [2]:
# Queue
import time
import threading
import queue

# Worker 類別，負責處理資料
class Worker(threading.Thread):
  def __init__(self, queue, num):
    # threading.Thread.__init__(self)
    super().__init__()
    self.queue = queue
    self.num = num
  
  def run(self):
    while self.queue.qsize() > 0:
      # 取得新的資料
      msg = self.queue.get()

      # 處理資料
      print("Worker %d: %s" % (self.num, msg))
      time.sleep(1)

# 建立佇列
my_queue = queue.Queue()

# 將資料放入佇列
for i in range(10):
  my_queue.put("Data %d" % i)

# 建立兩個 Worker, 自動從queue取得資料
my_worker1 = Worker(my_queue, 1)
my_worker2 = Worker(my_queue, 2)

# 讓 Worker 開始處理資料
my_worker1.start() # call run() function
my_worker2.start() # call run() function
print(threading.active_count())

# 等待所有 Worker 結束
my_worker1.join()
my_worker2.join()

print("Done.")

Worker 1: Data 0
Worker 2: Data 1
8
Worker 1: Data 2
Worker 2: Data 3
Worker 2: Data 4
Worker 1: Data 5
Worker 2: Data 6Worker 1: Data 7

Worker 2: Data 8
Worker 1: Data 9
Done.


In [3]:
# Lock
# only one thread
import time
import threading
import queue

class Worker(threading.Thread):
  def __init__(self, queue, num, lock):
    threading.Thread.__init__(self)
    self.queue = queue
    self.num = num
    self.lock = lock

  def run(self):
    while self.queue.qsize() > 0:
      msg = self.queue.get()

      # 取得 lock
      self.lock.acquire()
      print("Lock acquired by Worker %d" % self.num)

      # 不能讓多個執行緒同時進的工作
      print("Worker %d: %s" % (self.num, msg))
      time.sleep(1)

      # 釋放 lock
      print("Lock released by Worker %d" % self.num)
      self.lock.release()

my_queue = queue.Queue()
for i in range(5):
  my_queue.put("Data %d" % i)

# 建立 lock
lock = threading.Lock()

my_worker1 = Worker(my_queue, 1, lock)
my_worker2 = Worker(my_queue, 2, lock)

my_worker1.start()
my_worker2.start()

my_worker1.join()
my_worker2.join()

print("Done.")

Lock acquired by Worker 1
Worker 1: Data 0
Lock released by Worker 1
Lock acquired by Worker 2
Worker 2: Data 1
Lock released by Worker 2
Lock acquired by Worker 1
Worker 1: Data 2
Lock released by Worker 1
Lock acquired by Worker 2
Worker 2: Data 3
Lock released by Worker 2
Lock acquired by Worker 1
Worker 1: Data 4
Lock released by Worker 1
Done.


In [5]:
# Semaphore
# It allows multiple threads, but the number of threads is limited.
import time
import threading
import queue

class Worker(threading.Thread):
  def __init__(self, queue, num, semaphore):
    threading.Thread.__init__(self)
    self.queue = queue
    self.num = num
    self.semaphore = semaphore

  def run(self):
    while self.queue.qsize() > 0:
      msg = self.queue.get()

      # 取得旗標
      semaphore.acquire()
      print("Semaphore acquired by Worker %d" % self.num)

      # 僅允許有限個執行緒同時進的工作
      print("Worker %d: %s" % (self.num, msg))
      time.sleep(1)

      # 釋放旗標
      print("Semaphore released by Worker %d" % self.num)
      self.semaphore.release()

my_queue = queue.Queue()
for i in range(10):
  my_queue.put("Data %d" % i)

# 建立旗標
semaphore = threading.Semaphore(3)

my_worker1 = Worker(my_queue, 1, semaphore)
my_worker2 = Worker(my_queue, 2, semaphore)
my_worker3 = Worker(my_queue, 3, semaphore)
my_worker4 = Worker(my_queue, 4, semaphore)
my_worker5 = Worker(my_queue, 5, semaphore)

my_worker1.start()
my_worker2.start()
my_worker3.start()
my_worker4.start()
my_worker5.start()

my_worker1.join()
my_worker2.join()
my_worker3.join()
my_worker4.join()
my_worker5.join()

print("Done.")

Semaphore acquired by Worker 1
Worker 1: Data 0
Semaphore acquired by Worker 2
Worker 2: Data 1
Semaphore acquired by Worker 3
Worker 3: Data 2
Semaphore released by Worker 1Semaphore released by Worker 3
Semaphore acquired by Worker 3
Worker 3: Data 5
Semaphore released by Worker 2
Semaphore acquired by Worker 2
Worker 2: Data 6

Semaphore acquired by Worker 1
Worker 1: Data 7
Semaphore released by Worker 3
Semaphore acquired by Worker 3
Worker 3: Data 8
Semaphore released by Worker 1
Semaphore acquired by Worker 1
Worker 1: Data 9
Semaphore released by Worker 2
Semaphore acquired by Worker 4
Worker 4: Data 3
Semaphore released by Worker 4
Semaphore released by Worker 3
Semaphore released by Worker 1
Semaphore acquired by Worker 5
Worker 5: Data 4
Semaphore released by Worker 5
Done.


In [None]:
# # 建立 RLock
# rlock = threading.RLock()

# # 取得 rlock
# rlock.acquire()

# # 不能讓多個執行緒同時進的工作...

# # 重複取得 rlock
# rlock.acquire()

# # 不能讓多個執行緒同時進的工作...

# # 釋放 rlock
# self.rlock.release()

# # 不能讓多個執行緒同時進的工作...

# # 再次釋放 rlock
# self.rlock.release()

In [7]:
# RLock : Reentrant Loc
## ref: https://python-parallel-programmning-cookbook.readthedocs.io/zh_CN/latest/chapter2/07_Thread_synchronization_with_RLock.html
import threading
import time

class Box(object):
    lock = threading.RLock()

    def __init__(self):
        self.total_items = 0

    def execute(self, n):
        Box.lock.acquire()
        self.total_items += n
        Box.lock.release()

    def add(self):
        Box.lock.acquire()
        self.execute(1)
        Box.lock.release()

    def remove(self):
        Box.lock.acquire()
        self.execute(-1)
        Box.lock.release()

## These two functions run n in separate
## threads and call the Box's methods
def adder(box, items):
    while items > 0:
        print("adding 1 item in the box")
        box.add()
        time.sleep(1)
        items -= 1

def remover(box, items):
    while items > 0:
        print("removing 1 item in the box")
        box.remove()
        time.sleep(1)
        items -= 1

## the main program build some
## threads and make sure it works

items = 5
print("putting %s items in the box " % items)
box = Box()
t1 = threading.Thread(target=adder, args=(box, items))
t2 = threading.Thread(target=remover, args=(box, items))
t1.start()
t2.start()

t1.join()
t2.join()
print("%s items still remain in the box " % box.total_items)

putting 5 items in the box 
adding 1 item in the box
removing 1 item in the box
adding 1 item in the box
removing 1 item in the box
removing 1 item in the box
adding 1 item in the box
removing 1 item in the box
adding 1 item in the box
adding 1 item in the boxremoving 1 item in the box

0 items still remain in the box 
