# 并行任务 -- 线程、进程对比

In [6]:
"""
CPU密集型任务 -- 线程测试

time python thread_worker.py

Result [50584212, 50486575, 50481798, 50460655, 50567586, 50463214, 50522043]
python thread_work.py  5.75s user 0.08s system 100% cpu 5.830 total
"""

import threading
import random

results= []
def compute():
    results.append(
        sum([random.randint(1, 100) for _ in range(1000000)])
    )  

workers = [threading.Thread(target=compute) for _ in range(8)]

for work in workers:
    work.start()
    
for work in workers:
    work.join()

print(f"Result {results}")

Result [50509111, 50518356, 50474609, 50493550, 50490025, 50439531, 50468961, 50501880]


In [None]:
"""
CPU密集型任务 -- 进程测试

time python process_worker.py

Result [50545553, 50493810, 50511435, 50514261, 50432686, 50449326, 50514232, 50545287]
python process_worker.py  9.88s user 0.08s system 716% cpu 1.391 total
"""

import multiprocessing
import random

def compute(n):
    return sum([random.randint(1, 100) for _ in range(1000000)])


pool = multiprocessing.Pool(6)

print(f"Result {pool.map(compute, range(8))}")

# 使用futures模块

1. concurent.futures 模块 有ThreadPoolExecutor、ProcessPoolExecutor两个类

2. 接口类似，不用关心太多细节。切换使用进程或者线程，代码无需做太多更改。


In [None]:
# thread_worker_with_future.py
import random
import threading
from concurrent import futures

def compute():
    return sum([random.randint(1, 100) for _ in range (1000000)])

with futures.ThreadPoolExecutor(8) as executor:
    todo = [executor.submit(compute) for _ in range(8)]

    results = []
    for future in futures.as_completed(todo):
        res = future.result()
        results.append(res)

print(f"Result {results}")

In [None]:
# process_worker_with_future.py
import random
import threading
from concurrent import futures

def compute():
    return sum([random.randint(1, 100) for _ in range (1000000)])

with futures.ProcessPoolExecutor(8) as executor:
    todo = [executor.submit(compute) for _ in range(8)]

    results = []
    for future in futures.as_completed(todo):
        res = future.result()
        results.append(res)




print(f"Result {results}")

# 同步问题

In [7]:
# 加锁
import time
from threading import Lock

class Account:
    
    def __init__(self, money=100):
        self.money = money
        self._lock = Lock()
    
    def save(self, delta):
        with self._lock:
            self.money += delta
    
    def withdraw(self, delta):
        with self._lock:
            self.money -= delta 
            
account = Account(500)

def change(n):
    account.save(n)
    account.withdraw(n)
    
def task(n):
    for _ in range(10000):
        change(n)

tasks = [threading.Thread(target=task, args=(money, )) for money in [300, 400, 500, 600]]

for task in tasks:
    task.start()
for task in tasks:
    task.join()

account.money

500

# 解决死锁 --> 一种解决方案, 获取锁的时候按顺序获取

In [57]:
class Acquire:
    
    def __init__(self, *locks):
        self._locks = sorted(locks, key=lambda x: id(x))
        
    def __enter__(self):
        for lock in self._locks:
            lock.acquire()
    
    def __exit__(self, exe_type, exe_value, trace):
        for lock in reversed(self._locks):
            lock.release()
            
# 一种哲学家就餐问题的jx'j
def philosopher(left, right):
    while True:
        with Acquire(left, right):
            print(f"Thread {threading.currentThread()} is eating...")
            
chopsticks = [threading.Lock() for _ in range(5)]
for i in range(5):
    t = threading.Thread(target=philosopher, args=(chopsticks[i], chopsticks[(i+1) % 5]))
    t.start()
    t.join()

# Celery

In [None]:
from celery import Celery
import time


broker = 'redis://172.17.0.6:6379'  # tasks send
backend = 'redis://172.17.0.6:6379' # results 

app = Celery('task_demo', broker=broker, backend=backend)

@app.task
def add(x, y):
    time.sleep(10)
    return x + y

# 任务调度，获取结果
futures = [add.delay(x, y) for x, y in zip(range(10), range(10, 20))]
for future in futures:
    print(future.ready())
    
results = [f.result for f in futures]

# 分布式爬虫架构


![爬虫架构](./分布式爬虫.png)

In [None]:
"""
反爬
1·代理
2.User-agent

3. 验证码 （相应的包。）（深度学习 -》 验证码识别器） JS， Python
4. 加密 （js混淆）
5. 模拟浏览器获取动态数据
"""

# Ray 分布式实战文档

"""

1.数据准备

- [wiki文档下载链接](https://dumps.wikimedia.org/enwiki/latest/enwiki-latest-pages-articles-multistream27.xml-p53163464p54663464.bz2)

- [wiki数据抽取工具](https://github.com/attardi/wikiextractor)


2.环境配置

- [Miniconda快速搭建Python环境](https://docs.conda.io/en/latest/miniconda.html)

- 安装Ray: pip install ray

3.集群搭建

准备：

虚拟机三台: Node1(172.17.0.1)、Node2(172.17.0.2)、Node3(172.17.0.3)

redis服务：172.17.0.1:6379



- 选择Node1为Head Node： 

 ray start --head --redis-port-6379

- Node2, Node3

 ray start --redis-address 172.17.0.1:6379


4.运行程序

wiki 数据解析到 Node1节点的/data目录

python mapreduce_parell.py 

"""

# Ray相关补充资料

- [加速Pandas - modin](https://github.com/modin-project/modin)

- [超参调整 - Tune](https://ray.readthedocs.io/en/latest/tune.html)