## python 并发

In [None]:
import gmaps

In [14]:
import googlemaps
from datetime import datetime

gmaps = googlemaps.Client(key='AIzaSyCVGBTl7MVOMV0ABcego34cD9-uC-y2gqM')

# Geocoding an address
geocode_result = gmaps.geocode('1600 Amphitheatre Parkway, Mountain View, CA')
print(geocode_result)


TransportError: HTTPSConnectionPool(host='maps.googleapis.com', port=443): Max retries exceeded with url: /maps/api/geocode/json?address=1600+Amphitheatre+Parkway%2C+Mountain+View%2C+CA&key=AIzaSyCVGBTl7MVOMV0ABcego34cD9-uC-y2gqM (Caused by NewConnectionError('<requests.packages.urllib3.connection.VerifiedHTTPSConnection object at 0x1043fecc0>: Failed to establish a new connection: [Errno 65] No route to host',))

In [None]:
# Look up an address with reverse geocoding
reverse_geocode_result = gmaps.reverse_geocode((40.714224, -73.961452))

# Request directions via public transit
now = datetime.now()
directions_result = gmaps.directions("Sydney Town Hall",
                                     "Parramatta, NSW",
                                     mode="transit",
                                     departure_time=now)

In [16]:
from gmaps import Geocoding
# api = Geocoding(api_key='AIzaSyCVGBTl7MVOMV0ABcego34cD9-uC-y2gqM')
api = Geocoding()
geocoded = api.geocode('Warsaw')[0]

print("{:>25s}, {:6.2f}, {:6.2f}".format(
    geocoded['formatted_address'],
    geocoded['geometry']['location']['lat'],
    geocoded['geometry']['location']['lng'],
))




ConnectionError: HTTPSConnectionPool(host='maps.googleapis.com', port=443): Max retries exceeded with url: /maps/api/geocode/json?address=Warsaw&sensor=false (Caused by NewConnectionError('<requests.packages.urllib3.connection.VerifiedHTTPSConnection object at 0x1044036d8>: Failed to establish a new connection: [Errno 65] No route to host',))

In [None]:
# gmaps 简单地将地址或者地点转换成坐标

In [None]:
# 不使用线程 直接遍历

In [None]:
import time
from gmaps import Geocoding
# api = Geocoding(api_key='AIzaSyCVGBTl7MVOMV0ABcego34cD9-uC-y2gqM')
api = Geocoding()

PLACES = ('', '', '', '', '')



In [None]:
def fetch_place(place):
    geocoded = api.geocode(place)[0]
    
    print("{:>25s}, {:6.2f}, {:6.2f}".format(
        geocoded['formatted_address'],
        geocoded['geometry']['location']['lat'],
        geocoded['geometry']['location']['lng'],
        ))
    
def main():
    for place in PLACES:
        fetch_place(place)

In [None]:
if __name__ == "__main__":
    started = time.time()
    main()
    elapsed = time.time() - started
    
    print()
    print("time elapsed: {:.2f}s".format(elapsed))

In [None]:
# 可以看到时间大概在3s 左右

In [None]:
# 每一项使用一个线程

In [None]:
from threading import Thread

def main():
    threads = []
    for place in PLACES:
        thread = Thread(target=fetch_place, args=[place])
        thread.start()
        threads.append(thread)
        
    while threads:
        threads.pop().join()

In [17]:
# 线程是有益的，可以看到时间是 1s 左右

In [18]:
# 使用线程池 构建一个严格定义大小的工作线程池，处理所有的并行工作

In [19]:
from queue import Queue, Empty
from threading import Thread

In [20]:
THREAD_POOL_SIZE = 4

def worker(work_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
            fetch_place(item)
            work_queue.task_done()
            
def main():
    work_queue = Queue()
    for place in PLACES:
        work_queue.put(place)
        
    threads = [
        Thread(target=worker, args=(work_queue,))
        for _ in range(THREAD_POOL_SIZE)
    ]
    
    for thread in threads:
        thread.start()
        
    work_queue.join()
    
    while threads:
        threads.pop().join()

In [21]:
#  需要调整 参数 获得更好的资源 时间的平衡 耗时大约1.2

In [22]:
# 使用双向队列 将结果的展示抽离出来

In [24]:
def present_result(geocoded):
    print("{:>25s}, {:6.2f}, {:6.2f}".format(
        geocoded['formatted_address'],
        geocoded['geometry']['location']['lat'],
        geocoded['geometry']['location']['lng'],
        ))
    
def worker(work_queue, result_queue):
    while not work_queue.empty():
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
            result_queue.put(
                fetch_place(item))
            work_queue.task_done()
            
def main():
    for place in PLACES:
        work_queue.put(place)
        
    threads = [
        Thread(target=worker, args=(work_queue, result_queue))
        for _ in range(THREAD_POOL_SIZE)
    ]
    
    for thread in threads:
        thread.start()
        
    work_queue.join()
    
    while threads:
        threads.pop().join()
        
    while not result_queue.empty():
        present_result(result_queue.get())
        

In [25]:
# 时间大约是 1.3

In [26]:
# 处理错误与速率限制 改进worker 和 main 函数

In [None]:
def worker(work_queue, results_queue):
    while True:
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
            try:
                result = fetch_place(item)
            except Exception as err:
                results_queue.put(err)
            else:
                results_queue.put(result)
            finally:
                work_queue.task_done()

In [None]:
def main():
    work_queue = Queue()
    results_queue = Queue()
    
    for place in PLACES:
        work_queue.put(place)
        
    threads = [
        Thread(target=worker, args=(work_queue, result_queue))
        for _ in range(THREAD_POOL_SIZE)
    ]
    
    for thread in threads:
        thread.start()
        
    work_queue.join()
    
    while threads:
        threads.pop().join()
        
    while not result_queue.empty():
        result = results_queue.get()
        
        if isinstance(result, Exception):
            raise result
        
        present_result(result_queue.get())
        

In [27]:
# 由于提供的服务有速率的限制 因此 改进 使用 令牌桶

In [29]:
from threading import Lock

class Throttle:
    def __init__(self, rate):
        self._consume_lock = Lock()
        self.rate = rate
        self.tokens = 0
        self.last = 0
        
    def consume(self, amount=1):
        with self._consume_lock:
            now = time.time()
            
        # 时间测量在第一令牌请求上初始化以避免初始突发
        if self.last == 0:
            self.last = now
            
        elapsed = now - self.last
        
        if int(elapsed * self.rate):
            self.tokens += int(elapsed * self.rate)
            self.last = now
            
        self.tokens = (
            self.rate
            if self.tokens > self.rate
            else self.tokens
        )
        
        if self.tokens >= amount:
            self.tokens -= amount
        else:
            amount = 0
            
        return amount
    
            
        

In [30]:
# throttle 的实例化 直接 Throttle(10) 即可 等待每个项目 直到throttle 释放一个新的令牌为止

In [31]:
def worker(work_queue, results_queue, throttle):
    while True:
        try:
            item = work_queue.get(block=False)
        except Empty:
            break
        else:
            while not throttle.consume():
                pass
            
            try:
                result = fetch_place(item)
            except Exception as err:
                results_queue.put(err)
            else:
                results_queue.put(result)
            finally:
                work_queue.task_done()
    

# 多进程

必须设置线程池和通信队列，处理来自线程的异常， 在提供速率限制功能时也需要考虑线程安全。 因此 线程 只适用于执行IO绑定任务

多进程 彼此独立的Python进程没有GIL 的限制，可以有更好的资源利用率。用来处理CPU 密集任务。

In [34]:
from multiprocessing import Process, Value, Array


def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = - a[i]
        
        
if __name__ == "__main__":
    num = Value('d', 0.0)
    arr = Array('i', range(10))
    
    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()
    
    print(num.value)
    print(arr[:])

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]


multiprocessing.dummpy 模块，复制 multiprocessing API，使用多个线程，而不是派生或者产生新进程

In [35]:
from multiprocessing import Pool as ProcessPool
from multiprocessing.dummy import Pool as ThreadPool


def main(use_threads=False):
    if use_threads:
        pool_cls = ThreadPool
    else:
        pool_cls = ProcessPool
        
    with pool_cls(POOL_SIZE) as pool:
        results = pool.map(fetch_place, PLACES)
        
    for result in results:
        present_result(result)

In [None]:
后续还有 异步编程...