In [7]:
import multiprocessing
class MyFancyClass(object):
    """单个消息传递给单个工作程序"""
    def __init__(self, name):
        self.name=name
    
    def do_something(self):
        proc_name=multiprocessing.current_process().name
        print('Doing someting fancy in {} for {}'.format(proc_name, self.name))

def worker(q):
    obj = q.get()
    obj.do_something()

if __name__=='__main__':
    queue = multiprocessing.Queue()
    print('queue:', queue)
    p = multiprocessing.Process(target=worker,args=(queue,))
    print('p', p)
    p.start()
    queue.put(MyFancyClass('Fancy Dan'))
    # wait for the worker to finnsh
    print('queue:', queue)
    queue.close()
    queue.join_thread()
    print('queue', queue)
    p.join() # 处理结果之前等待所有任务完成

queue: <multiprocessing.queues.Queue object at 0x110061128>
p <Process(Process-7, initial)>
Doing someting fancy in Process-7 for Fancy Dan
queue: <multiprocessing.queues.Queue object at 0x110061128>
queue <multiprocessing.queues.Queue object at 0x110061128>


In [10]:
# 从JoinableQueue使用数据并将多个数据传递回父进程
import  multiprocessing
import time
class  Consumer(multiprocessing.Process):
    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue
    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get() # 从队列获取任务
            if next_task is None:
                # Poison pill means shutdown 
                print(f'{proc_name} Existing')
                self.task_queue.task_done()  # 完成任务
                break
            print(f"{proc_name}:{next_task}") 
            answer = next_task()   # 
            self.task_queue.task_done()
            self.result_queue.put(answer)
        return

class Task(object):
    def __init__(self, a, b):
        self.a = a
        self.b = b
    def __call__(self):
        time.sleep(0.1)  # pretend to take some time to do the work
        return f"{self.a} * {self.b} = {self.a * self.b}"
    def __str__(self):
        return f"{self.a} * {self.b}"



In [11]:
tasks = multiprocessing.JoinableQueue()  # establish communication queues
results = multiprocessing.Queue()

# start consumers 
num_consumers = multiprocessing.cpu_count() * 2
print(f"Creating {num_consumers} consumers")
consumers  = [Consumer(tasks, results) for i in range(num_consumers)]
for w in consumers:
    w.start()

# Enqueue jobs
num_jobs = 10
for i in range(num_jobs):
    tasks.put(Task(i, i))

# Add a posion pill for each consumer
for i  in range(num_consumers):
    tasks.put(None)

# Wait for all of the tasks to finish 
tasks.join()

# Start printing results
while num_jobs:
    result = results.get()
    print("Result", result)
    num_jobs -= 1

Creating 8 consumers
Consumer-16:0 * 0
Consumer-17:1 * 1
Consumer-19:3 * 3
Consumer-18:2 * 2
Consumer-20:4 * 4
Consumer-21:5 * 5
Consumer-22:6 * 6
Consumer-23:7 * 7
Consumer-16:8 * 8
Consumer-17:9 * 9
Consumer-18 Existing
Consumer-19 Existing
Consumer-20 Existing
Consumer-21 Existing
Consumer-22 Existing
Consumer-23 Existing
Consumer-17 Existing
Consumer-16 Existing
Result 0 * 0 = 0
Result 1 * 1 = 1
Result 2 * 2 = 4
Result 3 * 3 = 9
Result 6 * 6 = 36
Result 7 * 7 = 49
Result 5 * 5 = 25
Result 4 * 4 = 16
Result 9 * 9 = 81
Result 8 * 8 = 64


In [None]:
# 将logging中不同级别的日志记录到不同文件中
import logging

logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG)

levels = ("DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL")
for level in levels:
    handler = logging.FileHandler(f"/tmp/level-{level.lower()}.log")
    handler.setLevel(getattr(logging, level))
    logger.addHandler(handler)

def add_module_handler(logger, level=logging.DEBUG):
    handler = logging.FileHandler(
        f"/tmp/module-{logger.name.replace('.', '-')}.log"
    )
    handler.setLevel(level)
    logger.addHandler(handler)

In [None]:
# 在其他模块中调用
# utils.py
import logging

from project import add_module_handler

logger = logging.getLogger(__name__)
add_module_handler(logger)

def func2():
    logger.debug("debug called from utils.func2()")
    logger.critical("critical called from utils.func2()")
    
# base.py
import logging

from project import add_module_handler

logger = logging.getLogger(__name__)
add_module_handler(logger)

def func1():
    logger.debug("debug called from base.func1()")
    logger.critical("critical called from base.func1()")

In [None]:
>>> from pprint import pprint
>>> import project
>>> from project import base, utils

>>> project.logger
<Logger project (DEBUG)>
>>> base.logger, utils.logger
(<Logger project.base (DEBUG)>, <Logger project.utils (DEBUG)>)
>>> base.logger.handlers
[<FileHandler /tmp/module-project-base.log (DEBUG)>]
>>> pprint(base.logger.parent.handlers)
[<FileHandler /tmp/level-debug.log (DEBUG)>,
 <FileHandler /tmp/level-info.log (INFO)>,
 <FileHandler /tmp/level-warning.log (WARNING)>,
 <FileHandler /tmp/level-error.log (ERROR)>,
 <FileHandler /tmp/level-critical.log (CRITICAL)>]
>>> base.func1()
>>> utils.func2()  

$ cat /tmp/level-debug.log 
debug called from base.func1()
critical called from base.func1()
debug called from utils.func2()
critical called from utils.func2()

$ cat /tmp/level-critical.log 
critical called from base.func1()
critical called from utils.func2()

$ cat /tmp/module-project-base.log
debug called from base.func1()
critical called from base.func1()

$ cat /tmp/module-project-utils.log 
debug called from utils.func2()
critical called from utils.func2()