### Processes vs Threads

In [None]:
def do_something(thread_number):
  print(f'Working {thread_number} ...')
  time.sleep(0.15)
  print(f'Thread {thread_number} was finished.')

In [None]:
# Python GIL (Global Interpreter Lock): apenas um thread é executado por vez.
# Dessa forma, o desempenho do processo single-threaded e do processo multi-threaded será o mesmo em python e isso se deve ao GIL.
# Para contornar essa limitação, é preciso recorrer ao Jython, Cython, Iron Phyton, ou usar processos, ao invés de threads, visto que
# para processos não há essa limitação.
# Um detalhe importante: quando você faz a cópia de um processo (fork), além das instruções, 
# o pool de dados em memória é também copiado para o novo processo (não compartilham).
# Com threads é diferente, pois quando você cria threads elas compartilham o espaço de memória, ou seja, os dados são compartilhados entre elas.
# Essas diferenças tornam o uso de processos mais exigentes em memória.
import time
from threading import Thread

for i in range(5):
  t = Thread(target=do_something, args=(i,))
  t.start()

Working 0 ...
Working 1 ...
Working 2 ...Working 3 ...
Working 4 ...

Thread 0 was finished.
Thread 1 was finished.
Thread 3 was finished.
Thread 2 was finished.
Thread 4 was finished.


Ha 3 formas de iniciar um processo em Python: spawn (apenas Linux), fork e forkserver.
A diferença entre eles está na forma que a cópia do espaço de memória é feita.

- Fork: é uma simples cópia do espaço de memória (Default no Unix). 

Fork()-ing the parent processes and continuing with the same processes image in both parent and child. This method is fast, but potentially unreliable when parent state is complex

- Spawn: não copia tudo. Apenas o que for necessário. Isso permite economizar um pouco de memória. É um pouco mais lento para iniciar. (Default no Windows e MacOS)

- Forkserver: é um modo intermediário que tenta salvar um pouco de memória e sendo um pouco mais rápido. Quando o processo é iniciado é feito um forkserver que fica salvo em uma região da memória. A partir dessa região, são feitos forks. Então, apenas a memória é clonada.
When the program starts and selects the forkserver start method, a server process is started. 
From then on, whenever a new process is needed, the parent process connects to the server and requests that it fork a new process. Available on Unix platforms.
It consists of a separate Python server with that has a relatively simple state and which is fork()-ed when a new processes is needed. This method combines the speed of Fork()-ing with good reliability (because the parent being forked is in a simple state).
If you want something to be inherited by child processes from the parent, this must be specified in the forkserver state.
https://stackoverflow.com/questions/63424251/multiprocessing-in-python-what-gets-inherited-by-forkserver-process-from-paren

In [None]:
import multiprocessing
from multiprocessing import Process

In [None]:
if __name__ == '__main__':
  multiprocessing.set_start_method('spawn')
  for i in range(5):
    p = Process(target=do_something, args=(i,))
    p.start()

### IPC - Interprocess Communication

Can be done through messages and memory sharing.

### Memory Sharing

In [None]:
# No exemplo, a seguir, vamos testar o compartilhamento de memória entre threads.
import json
import urllib.request
import time

# global variable
finished_count = 0

def count_letters(url, frequency_table, thread_num, count=True):
  """Without mutex"""
  print(f'counting letters {thread_num}...')
  response = urllib.request.urlopen(url)
  content = str(response.read())
  for item in content:
    if item.lower() in frequency_table:
      frequency_table[item.lower()] += 1
  if count:
    global finished_count
    finished_count += 1

def count_letters_with_mutex(url, frequency_table, thread_num, mutex, count=True):
  """Using mutex to avoid race conditions"""
  print(f'counting letters with mutex {thread_num}...')
  response = urllib.request.urlopen(url)
  content = str(response.read())
  mutex.acquire()
  for item in content:
    if item.lower() in frequency_table:
      frequency_table[item.lower()] += 1
  if count:
    global finished_count
    finished_count += 1
  mutex.release()

def main():
  frequency_table = {}
  for letter in 'abcdefghijklmnopqrstuvxzwy':
    frequency_table[letter] = 0

  start = time.time()
  for i in range(1000, 1020):
    count_letters(f'http://www.rfc-editor.org/rfc/rfc{i}.txt', frequency_table, i, False)
  end = time.time()

  print(json.dumps(frequency_table, indent=4), end - start)

main()

counting letters 1000...
counting letters 1001...
counting letters 1002...
counting letters 1003...
counting letters 1004...
counting letters 1005...
counting letters 1006...
counting letters 1007...
counting letters 1008...
counting letters 1009...
counting letters 1010...
counting letters 1011...
counting letters 1012...
counting letters 1013...
counting letters 1014...
counting letters 1015...
counting letters 1016...
counting letters 1017...
counting letters 1018...
counting letters 1019...
{
    "a": 80014,
    "b": 16998,
    "c": 48003,
    "d": 40501,
    "e": 140093,
    "f": 26074,
    "g": 19010,
    "h": 36316,
    "i": 79913,
    "j": 2170,
    "k": 6614,
    "l": 38305,
    "m": 31176,
    "n": 135371,
    "o": 84258,
    "p": 32270,
    "q": 2835,
    "r": 75326,
    "s": 79790,
    "t": 103557,
    "u": 27572,
    "v": 10580,
    "x": 4719,
    "z": 1115,
    "w": 14195,
    "y": 13914
} 21.361895322799683


In [None]:
# with thread
# Quando as threads compartilham a memória e escrevem em conjunto um mesmo recurso, podem ocorrer inconsistências nos resultados devido a race conditions.
# É preciso implementar sincronização entre elas para que isso não ocorra.
# Veja como o tempo foi expressivamente menor usando threads!

from threading import Thread

def main_with_thread():
  frequency_table = {}

  for letter in 'abcdefghijklmnopqrstuvxzwy':
    frequency_table[letter] = 0

  start = time.time()
  for i in range(1000, 1020):
    t = Thread(target=count_letters, args=(f'http://www.rfc-editor.org/rfc/rfc{i}.txt', frequency_table, i)).start()
  
  # wait all threads to finish
  print('finished_count', finished_count)
  while finished_count < 20:
    print('finished_count', finished_count)
    time.sleep(0.5)

  end = time.time()

  time.sleep(1)
  print(json.dumps(frequency_table, indent=4), end - start)

main_with_thread()


counting letters 1000...
counting letters 1001...
counting letters 1002...
counting letters 1003...
counting letters 1004...
counting letters 1005...
counting letters 1006...
counting letters 1007...counting letters 1008...
counting letters 1009...

counting letters 1010...
counting letters 1011...
counting letters 1012...
counting letters 1013...
counting letters 1014...counting letters 1015...

counting letters 1016...counting letters 1017...
counting letters 1018...counting letters 1019...finished_count 0
finished_count 0



finished_count 0
finished_count 5
finished_count 19
{
    "a": 80014,
    "b": 16998,
    "c": 44441,
    "d": 40501,
    "e": 137747,
    "f": 26074,
    "g": 19010,
    "h": 35984,
    "i": 79913,
    "j": 2170,
    "k": 6614,
    "l": 37732,
    "m": 31176,
    "n": 112290,
    "o": 84258,
    "p": 32270,
    "q": 2835,
    "r": 75326,
    "s": 79790,
    "t": 103557,
    "u": 27572,
    "v": 10580,
    "x": 4719,
    "z": 1115,
    "w": 14195,
    "y": 13914

##### Sincronização de Threads com Mutexes Lock

In [None]:
# Mutexes são como cadeados. Quando uma thread estiver operando, há um bloqueio temporário que impede outras threads de executarem. Quando o bloqueio é liberado, outra thread entra em ação.
# É garantido que apenas uma thread pode operar a cada momento. As demais threads ficam em estado de sleep até o lock ser liberado.

import time
from threading import Thread, Lock

class StingySpendy:
  money = 100
  mutex = Lock()

  def stingy(self):
    for i in range(1000000):
      self.mutex.acquire()
      try:
        self.money += 10
      finally:
        self.mutex.release()
    print('Stingy done')

  def spendy(self):
    for i in range(1000000):
      self.mutex.acquire()
      try:
        self.money -= 10
      finally:
        self.mutex.release()
    print('Spendy done')

ss = StingySpendy()
Thread(target=ss.stingy, args=()).start()
Thread(target=ss.spendy, args=()).start()
time.sleep(10)
print('Money in the end:', ss.money)

Stingy done
Spendy done
Money in the end: 100


In [None]:
# Add mutex to count_letters with threads.

from threading import Thread, Lock

def main_with_thread_and_mutex():
  frequency_table = {}
  mutex = Lock()

  for letter in 'abcdefghijklmnopqrstuvxzwy':
    frequency_table[letter] = 0

  start = time.time()
  for i in range(1000, 1020):
    t = Thread(target=count_letters_with_mutex, args=(f'http://www.rfc-editor.org/rfc/rfc{i}.txt', frequency_table, i, mutex)).start()
  
  # wait all threads to finish
  while True:
    mutex.acquire()
    if finished_count == 20:
      print('Finished')
      break
    mutex.release()
    time.sleep(10)
    print('finished_count', finished_count)

  end = time.time()
  print(json.dumps(frequency_table, indent=4), end - start)

main_with_thread_and_mutex()


counting letters with mutex 1000...counting letters with mutex 1001...

counting letters with mutex 1002...
counting letters with mutex 1003...
counting letters with mutex 1004...
counting letters with mutex 1005...
counting letters with mutex 1006...
counting letters with mutex 1007...
counting letters with mutex 1008...
counting letters with mutex 1009...counting letters with mutex 1010...
counting letters with mutex 1011...

counting letters with mutex 1012...
counting letters with mutex 1013...
counting letters with mutex 1014...
counting letters with mutex 1015...counting letters with mutex 1016...

counting letters with mutex 1017...
counting letters with mutex 1018...
counting letters with mutex 1019...
finished_count 20
Finished
{
    "a": 80014,
    "b": 16998,
    "c": 48003,
    "d": 40501,
    "e": 140093,
    "f": 26074,
    "g": 19010,
    "h": 36316,
    "i": 79913,
    "j": 2170,
    "k": 6614,
    "l": 38305,
    "m": 31176,
    "n": 135371,
    "o": 84258,
    "p": 32

##### Joins
É uma outra forma de sincronização de threads que possibilita criar uma relação de dependência entre threads.
Assim, uma thread parent pode entrar em modo de espera até suas threads filhas terminarem de processar alguma task.

In [None]:
import time
from threading import Thread

def child():
  print('Child doing something...')
  time.sleep(5)
  print('child thread is done.')

def parent():
  t = Thread(target=child, args=([]))
  t.start()
  print('Parent thread is waiting using join')
  t.join(timeout=10)
  print('Parent thread can continue')

parent()

Child doing something...Parent thread is waiting using join

child thread is done.
Parent thread can continue


In [None]:
# other exemple: concurrent directory search

import os
from os.path import isdir, join
from threading import Thread, Lock

mutex = Lock()
matches = []

def file_search(root, filename):
  print('Searching in', root)
  child_threads = []
  for file in os.listdir(root):
    full_path = join(root, file)
    if filename in file:
      mutex.acquire()
      matches.append(full_path)
      mutex.release()
    if isdir(full_path):
      #file_search(full_path, filename)
      t = Thread(target=file_search, args=([full_path, filename]))
      t.start()
      child_threads.append(t)
  for t in child_threads:
    t.join()

def main():
  t = Thread(target=file_search, args=(['/content', 'findme.txt']))
  t.start()
  t.join()
  print('matches: ', set(matches))
main()

Searching in /content
Searching in /content/.config
Searching in /content/.config/configurations
Searching in /content/.config/configurationsSearching in /content/.config/logs
Searching in /content/.config/logs/2023.02.09
Searching in
 /content/.config/logs/2023.02.09
Searching in /content/.config/logs
Searching in /content/.config/logs/2023.02.09
Searching in /content/.config/logs/2023.02.09
Searching in /content/.config
Searching in /content/sample_data
Searching in /content/sample_data/minha_pasta
Searching inSearching in /content/sample_data/minha_pasta/.ipynb_checkpoints
 Searching in/content/.config/configurations
 Searching in /content/.config/configurations
/content/sample_data/minha_pasta/.ipynb_checkpoints
Searching in /content/.config/logs
Searching in /content/sample_data/minha_pasta
Searching in /content/sample_data/.ipynb_checkpoints
Searching in /content/sample_data/minha_pasta/.ipynb_checkpoints
Searching in /content/sample_data/.ipynb_checkpoints
Searching in /content/

#### Condition variables (notify and wait)

In [2]:
# Nesse exemplo vamos acrescentar uma variavel para verificar se spendy pode gastar dinheiro ou se tem que esperar o stingy acrescentar dinheiro, pois o saldo não poderá entrar no negativo
# Quando não tiver dinheiro, spendy liberará o lock e esperará pela próxima oportunidade para gastar.
# Para isso usamos Condition. Quando a condição não for atendida, usamos o método wait() para aguardar o sinal de liberação e
# notify() para dar o sinal que permite tentar novamente.

import time
from threading import Thread, Condition

class StingySpendy:
  money = 100
  condition = Condition()

  def stingy(self):
    for i in range(1000000):
      self.condition.acquire()
      try:

        self.money += 10
        # notify (signal) that money was added
        self.condition.notify()

      finally:
        self.condition.release()
    print('Stingy done')

  def spendy(self):
    for i in range(500000):
      self.condition.acquire()
      try:

        # release te lock and wait for more money (signal)
        while self.money < 20:
          self.condition.wait()

        self.money -= 20
        if self.money < 0:
          print('No money!', self.money)
      finally:
        self.condition.release()
    print('Spendy done')

ss = StingySpendy()
Thread(target=ss.stingy, args=()).start()
Thread(target=ss.spendy, args=()).start()
time.sleep(10)
print('Money in the end:', ss.money)

Stingy done
Spendy done
Money in the end: 100


In [12]:
# Another exemple: concurrent directory search using threads waiting group
# Wait Groups (wait and notify_all())

from threading import Thread, Condition, Lock
import os
from os.path import isdir, join

mutex = Lock()
matches = []

class WaitGroup():
  """Class to control a group of threads based on a condition"""

  wait_count = 0 # count waiting threads
  cv = Condition()

  def add(self, count):
    # add thread 
    self.cv.acquire()
    self.wait_count += count
    self.cv.release()
  
  def done(self):
    self.cv.acquire()
    if self.wait_count > 0:
      self.wait_count -= 1
    # check if all threads have finished
    if self.wait_count == 0:
      self.cv.notify_all()
    self.cv.release()

  def wait(self):
    self.cv.acquire()
    # wait threads to finish
    while self.wait_count > 0:
      self.cv.wait()
    self.cv.release()

In [16]:
def file_search(root, filename, wait_group):
  print('Searching in', root)
  for file in os.listdir(root):
    full_path = join(root, file)
    if filename in file:
      mutex.acquire()
      matches.append(full_path)
      mutex.release()
    if isdir(full_path):
      # add thread to group
      wait_group.add(1)
      t = Thread(target=file_search, args=([full_path, filename, wait_group])).start()
  # send signal to notify to waiters that the process was finished
  wait_group.done()


def main():
  wait_group = WaitGroup()
  # add thread to group
  wait_group.add(1)
  t = Thread(target=file_search, args=(['/content', 'README.md', wait_group])).start()
  # waiting to done signal before proceed
  wait_group.wait()
  print('matches: ', set(matches))
main()

Searching in /content
Searching inSearching in /content/sample_data
 /content/.config
Searching in /content/.config/configurations
Searching in /content/.config/logs
Searching in /content/.config/logs/2023.02.09
matches:  {'/content/sample_data/README.md'}
