<a href="https://colab.research.google.com/github/vadhri/hpc-notebook/blob/main/python_multi_processing.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

### Mutiprocessing

In [24]:
import multiprocessing as mp

def adder():
  a = 0
  for i in range(100):
    a += i
    if i%10==0:
      print (f"adder = {a}\n")

adder_process = mp.Process(target=adder, args=[])

def mul(__name__):
  a = 1
  for i in range(1,100):
    a = i*i
    if i%10==0:
      print (f"mul = {a} \n")

mul_process = mp.Process(target=mul, args=["mul_adder_process"])

adder_process.start()
mul_process.start()
adder_process.join()
mul_process.join()


adder = 0

adder = 55

adder = 210
mul = 100 

adder = 465

mul = 400 


adder = 820

adder = 1275
mul = 900 

adder = 1830



adder = 2485
adder = 3240

adder = 4095

mul = 1600 

mul = 2500 

mul = 3600 

mul = 4900 

mul = 6400 

mul = 8100 



### Logger using Queue

https://docs.python.org/3/howto/logging-cookbook.html#logging-to-a-single-file-from-multiple-processes

In [58]:
import multiprocessing as mp
import logging
import logging.handlers

def listener_configurer():
    root = logging.getLogger()
    h = logging.handlers.RotatingFileHandler('mptest.log', 'a', 300, 10)
    f = logging.Formatter('%(asctime)s %(processName)-10s %(name)s %(levelname)-8s %(message)s')
    h.setFormatter(f)
    root.addHandler(h)

# This is the listener process top-level loop: wait for logging events
# (LogRecords)on the queue and handle them, quit when you get a None for a
# LogRecord.
def listener_process(queue, configurer):
    configurer()
    while True:
        try:
            record = queue.get()
            if record is None:  # We send this as a sentinel to tell the listener to quit.
                break
            logger = logging.getLogger(record.name)
            logger.handle(record)  # No level or filter logic applied - just do it!
        except Exception:
            import sys, traceback
            print('Whoops! Problem:', file=sys.stderr)
            traceback.print_exc(file=sys.stderr)

def worker_configurer(queue):
    h = logging.handlers.QueueHandler(queue)  # Just the one handler needed
    root = logging.getLogger()
    root.addHandler(h)
    # send all messages, for demo; no other level or filter logic applied.
    root.setLevel(logging.DEBUG)

def adder(name, queue, configurer):
  configurer(queue)
  logger = logging.getLogger()

  a = 0
  for i in range(5):
    a += i
    if i:
      logger.log(logging.DEBUG, f"adder {a}")

    print('Worker finished: %s' % name)


queue = mp.Queue(-1)
listener = mp.Process(target=listener_process,
                                    args=(queue, listener_configurer))
listener.start()

adder_process = mp.Process(target=adder, args=["adder", queue, worker_configurer])

def mul(name, queue, configurer):
  configurer(queue)
  logger = logging.getLogger()

  a = 1
  for i in range(1,10):
    a = i*i
    if i:
      logger.log(logging.DEBUG, f"mul {a}")
    print('Worker finished: %s' % name)

mul_process = mp.Process(target=mul, args=["mul", queue, worker_configurer])

adder_process.start()
mul_process.start()
adder_process.join()
mul_process.join()
queue.put_nowait(None)
listener.join()

Worker finished: adder


DEBUG:root:adder 1
DEBUG:root:mul 1


Worker finished: adder


DEBUG:root:adder 3
DEBUG:root:adder 1


Worker finished: adder
Worker finished: mul

DEBUG:root:adder 3
DEBUG:root:adder 6
DEBUG:root:mul 1





DEBUG:root:mul 4
DEBUG:root:adder 6


Worker finished: mulWorker finished: adder


DEBUG:root:mul 4
DEBUG:root:adder 10





DEBUG:root:adder 10
DEBUG:root:mul 9


Worker finished: adder

DEBUG:root:mul 9


Worker finished: mul



DEBUG:root:mul 16


Worker finished: mul

DEBUG:root:mul 16





DEBUG:root:mul 25


Worker finished: mul

DEBUG:root:mul 25





DEBUG:root:mul 36
DEBUG:root:mul 36


Worker finished: mul


DEBUG:root:mul 49


Worker finished: mul

DEBUG:root:mul 49





DEBUG:root:mul 64
DEBUG:root:mul 64


Worker finished: mul


DEBUG:root:mul 81


Worker finished: mul


DEBUG:root:mul 81


In [61]:
!cat mptest* > mptest.total.log
!sort mptest.total.log

cat: mptest.total.log: input file is output file
2025-02-23 05:17:24,553 Process-126 root DEBUG    mul 1
2025-02-23 05:17:24,560 Process-125 root DEBUG    adder 1
2025-02-23 05:17:24,583 Process-125 root DEBUG    adder 3
2025-02-23 05:17:24,605 Process-125 root DEBUG    adder 6
2025-02-23 05:17:24,616 Process-126 root DEBUG    mul 4
2025-02-23 05:17:24,632 Process-125 root DEBUG    adder 10
2025-02-23 05:17:24,643 Process-126 root DEBUG    mul 9
2025-02-23 05:17:24,679 Process-126 root DEBUG    mul 16
2025-02-23 05:17:24,696 Process-126 root DEBUG    mul 25
2025-02-23 05:17:24,705 Process-126 root DEBUG    mul 36
2025-02-23 05:17:24,716 Process-126 root DEBUG    mul 49
2025-02-23 05:17:24,728 Process-126 root DEBUG    mul 64
2025-02-23 05:17:24,741 Process-126 root DEBUG    mul 81


### Shared array access across processes

In [69]:
# Run cells above before this.
import multiprocessing as mp
import logging
import logging.handlers
import ctypes

queue = mp.Queue(-1)
listener = mp.Process(target=listener_process,
                                    args=(queue, listener_configurer))
listener.start()

# create a shared Array.
shared_array = mp.Array(ctypes.c_int, 2)  # Shared array of integers
shared_array[0] = 0
shared_array[1] = 1

def adder(name, queue, configurer, a_shared):
  configurer(queue)
  logger = logging.getLogger()

  for i in range(5):
    a_shared[0] += i
    if i:
      logger.log(logging.DEBUG, f"adder {a_shared[0]}")


adder_process = mp.Process(target=adder, args=["adder", queue, worker_configurer, shared_array])

def mul(name, queue, configurer, a_shared):
  configurer(queue)
  logger = logging.getLogger()

  for i in range(1,6):
    a_shared[1] = i*i
    if i:
      logger.log(logging.DEBUG, f"mul {a_shared[1]}")

mul_process = mp.Process(target=mul, args=["mul", queue, worker_configurer, shared_array])

adder_process.start()
mul_process.start()
adder_process.join()
mul_process.join()
queue.put_nowait(None)
listener.join()

print("Updated shared array:", list(shared_array))


DEBUG:root:adder 1
DEBUG:root:adder 1
DEBUG:root:adder 3
DEBUG:root:adder 3
DEBUG:root:adder 6
DEBUG:root:adder 10
DEBUG:root:adder 6
DEBUG:root:mul 1
DEBUG:root:adder 10
DEBUG:root:mul 4
DEBUG:root:mul 1
DEBUG:root:mul 9
DEBUG:root:mul 16
DEBUG:root:mul 4
DEBUG:root:mul 25
DEBUG:root:mul 9
DEBUG:root:mul 16
DEBUG:root:mul 25


Updated shared array: [10, 25]
