In [1]:
# キューによる並列処理制御
# FIFO
# ボトルネックが存在
# キューはメッセージを送る. 
# 洗浄担当プロセス＋乾燥担当プロセス
import multiprocessing as mp

def washer(dishes, output):
    for dish in dishes:
        print('Washing', dish, 'dish')
        output.put(dish)
        
def dryer(input):
    while True:
        dish = input.get()
        print('Drying', dish, 'dish')
        input.task_done()

dish_queue = mp.JoinableQueue()
dryer_proc = mp.Process(target=dryer, args=(dish_queue,))
dryer_proc.daemon = True
dryer_proc.start()

dishes = ['salad', 'bread', 'entree', 'dessert!!!!']
washer(dishes, dish_queue)
dish_queue.join() # Washerはすべての皿が乾燥されたことを知ることができる

Drying salad dish
Drying bread dish
Drying entree dish
Drying dessert!!!! dish
Washing salad dish
Washing bread dish
Washing entree dish
Washing dessert!!!! dish


Process Process-1:
Traceback (most recent call last):
  File "/Users/inoueshinichi/.pyenv/versions/anaconda3-5.0.1/lib/python3.6/multiprocessing/process.py", line 258, in _bootstrap
    self.run()
  File "/Users/inoueshinichi/.pyenv/versions/anaconda3-5.0.1/lib/python3.6/multiprocessing/process.py", line 93, in run
    self._target(*self._args, **self._kwargs)
  File "<ipython-input-1-ef8c99043680>", line 15, in dryer
    dish = input.get()
  File "/Users/inoueshinichi/.pyenv/versions/anaconda3-5.0.1/lib/python3.6/multiprocessing/queues.py", line 94, in get
    res = self._recv_bytes()
  File "/Users/inoueshinichi/.pyenv/versions/anaconda3-5.0.1/lib/python3.6/multiprocessing/connection.py", line 216, in recv_bytes
    buf = self._recv_bytes(maxlength)
  File "/Users/inoueshinichi/.pyenv/versions/anaconda3-5.0.1/lib/python3.6/multiprocessing/connection.py", line 407, in _recv_bytes
    buf = self._recv(4)
  File "/Users/inoueshinichi/.pyenv/versions/anaconda3-5.0.1/lib/python3.6/multipr

In [2]:
# スレッド
import threading

def do_this(what):
    whoami(what)
    
def whoami(what):
    print("Thread %s says: %s\n" % (threading.current_thread(), what))
    
whoami("I'm the main program")
for n in range(4):
    p = threading.Thread(target=do_this, args=("I'm function %s" % n,))
    p.start()

Thread <_MainThread(MainThread, started 140736187483072)> says: I'm the main program

Thread <Thread(Thread-4, started 123145389277184)> says: I'm function 0

Thread <Thread(Thread-5, started 123145389277184)> says: I'm function 1

Thread <Thread(Thread-6, started 123145389277184)> says: I'm function 2

Thread <Thread(Thread-7, started 123145394532352)> says: I'm function 3



In [3]:
# プロセスベースのwasher, dryer関数もスレッドで処理できる
import threading, queue
import time

def washer(dishes, dish_queue):
    for dish in dishes:
        print('Washing', dish)
        time.sleep(5)
        dish_queue.put(dish)
        
def dryer(dish_queue):
    while True:
        dish = dish_queue.get()
        print('Drying', dish)
        time.sleep(10)
        dish_queue.task_done()

dish_queue = queue.Queue()
for n in range(2):
    dryer_thread = threading.Thread(target=dryer, args=(dish_queue,))
    dryer_thread.start()
    
dishes = ['salad', 'bread', 'entree', 'dessert!']
washer(dishes, dish_queue)
dish_queue.join()



Washing salad
DryingWashing  saladbread

Washing Dryingentree
 bread
DryingWashing  dessert!entree

Drying dessert!


In [15]:
# イベント駆動型並列処理 gevent
# 中央でイベントループを回して、イベントが発生したら個々のコルーチン(job)に処理を渡す
# gevent.spawn():グリーンスレッド グリーンスレッドは通常のスレッドと違ってブロックしない
import gevent
from gevent import socket
hosts = ['www.crappytaxidermy.com', 'www.walterpottertaxidermy.com', 'www.antique-taxidermy.com']
jobs = [gevent.spawn(gevent.socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5) # 派生させたすべてのジョブが終了するのを待つ
for job in jobs:
    print(job.value)

66.6.44.4
216.58.197.211
None


In [1]:
# モンキーパッチング関数を使う
# これはgevent専用のモジュールではなく、soketなどの標準モジュールにグリーンスレッドを対応させて、それを使う
"""
from gevent import monkey
monkey.patch_socket() # 標準モジュールのsoketにgevent版socketが挿入される, Pythonコードオンリーな使い方
# monkey.patch_all() # 対応する標準モジュールすべてを書き換える

import gevent
from gevent import monkey; monkey.patch_all()
hosts = ['www.crappytaxidermy.com', 'www.walterpottertaxidermy.com', 'www.antique-taxidermy.com']
jobs = [gevent.spawn(gevent.socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5) # 派生させたすべてのジョブが終了するのを待つ
for job in jobs:
    print(job.value)
"""
# geventは使用方法に注意。用法、用量を守ろう

"\nfrom gevent import monkey\nmonkey.patch_socket() # 標準モジュールのsoketにgevent版socketが挿入される, Pythonコードオンリーな使い方\n# monkey.patch_all() # 対応する標準モジュールすべてを書き換える\n\nimport gevent\nfrom gevent import monkey; monkey.patch_all()\nhosts = ['www.crappytaxidermy.com', 'www.walterpottertaxidermy.com', 'www.antique-taxidermy.com']\njobs = [gevent.spawn(gevent.socket.gethostbyname, host) for host in hosts]\ngevent.joinall(jobs, timeout=5) # 派生させたすべてのジョブが終了するのを待つ\nfor job in jobs:\n    print(job.value)\n"

In [6]:
# twisted 非同期のイベント駆動型ネットワーキングフレーム
# イベントが発生したときに、事前に結び付けれらていた関数が呼びだされる。コールバック
# TCP, UDP上の様々なインターネットプロトコルをサポートする大規模パッケージ

# ノックサーバー
from twisted.internet import protocol, reactor

class Knock(protocol.Protocol):
    def dataReceived(self, data):
        print("Client:", data)
        if data.startswith("Knock knock"):
            response = "Who's there ?"
        else:
            response = data + " who?"
        print("Server:", response)
        self.transport.write(response)
        
class KnockFactory(protocol.Factory):
    def buildProtocol(self, addr):
        return Knock()
    
reactor.listenTCP(8000, KnockFactory())
reactor.run()

ReactorNotRestartable: 

In [5]:
# ノッククライアント
from twisted.internet import reactor, protocol

class KnockClient(protocol.Protocol):
    def connectionMade(self):
        self.transport.write('Knock knock')
        
    def dataReceived(self, data):
        if data.startswith("Who's there?"):
            response = "Disappearing client"
            self.transport.write(response)
        else:
            self.transport.loseConnection()
            reactor.stop()
            
class KnockFactory(protocol.ClientFactory):
    protocol = KnockClient
    
def main():
    f =KnockFactory()
    reactor.connectTCP("localhost", 8000, f)
    reactor.run()
    
main()        

ReactorNotRestartable: 

In [1]:
# Redisをキュー代わりにつかう
""" redis_washer.py
import redis
conn = redis.Redis()
print('Washer is starting')
dishes = ['salad', 'bread', 'entree', 'dessert']
for dish in dishes:
    msg = dish.encode('utf-8')
    conn.rpush('dishes', msg)
    print('Washed', dish)
conn.rpush('dishes', 'quit')
print('Washer is done')
"""


"\nimport redis\nconn = redis.Redis()\nprint('Washer is starting')\ndishes = ['salad', 'bread', 'entree', 'dessert']\nfor dish in dishes:\n    msg = dish.encode('utf-8')\n    conn.rpush('dishes', msg)\n    print('Washed', dish)\nconn.rpush('dishes', 'quit')\nprint('Washer is done')\n"

In [2]:
""" redis_dryer.py
import redis
conn = redis.Redis()
print('Dryer is starting')
while True:
    msg = conn.blpop('dishes') # キューリスト名'dishes'からデータをポップする
    if not msg:
        break
    val = msg[1].decode('utf-8')
    if val == 'quit':
        break
    print('Dried', val)
print('Dishes are dried')
"""

" redis_dryer.py\nimport redis\nconn = redis.Redis()\nprint('Dryer is starting')\nwhile True:\n    msg = conn.blpop('dishes') # キューリスト名'dishes'からデータをポップする\n    if not msg:\n        break\n    val = msg[1].decode('utf-8')\n    if val == 'quit':\n        break\n    print('Dried', val)\nprint('Dishes are dried')\n"

In [3]:
# パブリッシュ/サブスクライバ　モデル
# キューではなく、ブロードキャストである
# Redisを使って、パブサブ
""" redis_pub.py
import redis
import random

conn = redis.Redis()
cats = ['siamese', 'persian', 'maine coon', 'norwegian forest']
hats = ['stovepipe', 'bowler', 'tam-o-shanter', 'fedora']
for msg in range(10):
    cat = random.choice(cats)
    hat = random.choice(hats)
    print('Publish: %s wears a %s' % (cat, hat))
    conn.publish(cat, hat)
"""

"\nimport redis\nimport random\n\nconn = redis.Redis()\ncats = ['siamese', 'persian', 'maine coon', 'norwegian forest']\nhats = ['stovepipe', 'bowler', 'tam-o-shanter', 'fedora']\nfor msg in range(10):\n    cat = random.choice(cats)\n    hat = random.choice(hats)\n    print('Publish: % wears a %s' % (cat, hat))\n    conn.publish(cat, hat)\n"

In [4]:
"""redis_sub.py
import redis
conn = redis.Redis()

topic = ['maine coon', 'persian']
sub = conn.pubsub()
sub.subscribe(topic)
for msg in sub.listen():
    if msg['type'] == 'message':
        cat = msg['channel']
        hat = msg['data']
        print('Subscribe: %s wears a %s' % (cat, hat))
"""

"redis_sub.py\nimport redis\nconn = redis.Redis()\n\ntopic = ['maine coon', 'persian']\nsub = conn.pubsub()\nsub.subscribe(topic)\nfor msg in sub.listen():\n    if msg['type'] == 'message':\n        cat = msg['channel']\n        hat = msg['data']\n        print('Subscribe: %s wears a %s' % (cat, hat))\n"

In [5]:
# ZeroMQによるパブサブ
""" zmq_pub.py
import zmq
import random
import time
host = '*'
port = 6789
ctx = zmq.Context()
pub = ctx.socket(zmq.PUB)
pub.bind('tcp://%s:%s' % (host, port))
cats = ['siamese', 'persian', 'maine coon', 'norwegian forest']
hats = ['stovepipe', 'bowler', 'tam-o-shanter', 'fedora']
time.sleep(1)
for msg in range(10):
    cat = random.choice(cats)
    cat_bytes = cat.encode('utf-8')
    hat = random.choice(hats)
    hat_bytes = hat.encode('utf-8')
    print('Publish: %s wears a %s' % (cat, hat))
    pub.send_multipart([cat_bytes, hat_bytes])
"""

" zmq_pub.py\nimport zmq\nimport random\nimport time\nhost = '*'\nport = 6789\nctx = zmq.Context()\npub = ctx.socket(zmq.PUB)\npub.bind('tcp://%s:%s' % (host, port))\ncats = ['siamese', 'persian', 'maine coon', 'norwegian forest']\nhats = ['stovepipe', 'bowler', 'tam-o-shanter', 'fedora']\ntime.sleep(1)\nfor msg in range(10):\n    cat = random.choice(cats)\n    cat_bytes = cat.encode('utf-8')\n    hat = random.choice(hats)\n    hat_bytes = hat.encode('utf-8')\n    print('Publish: %s wears a %s' % (cat, hat))\n    pub.send_multipart([cat_bytes, hat_bytes])\n"

In [6]:
""" zmq_sub.py
import zmq
host = '127.0.0.1'
port = 6789
ctx = zmq.Context()
sub = ctx.socket(zmq.SUB)
sub.connect('tcp://%s:%s' % (host, port))
topics = ['maine coon', 'persian']
for topic in topics:
    sub.setsockopt(zmq.SUBSCRIBE, topic.encode('utf-8'))
while True:
    cat_bytes, hat_bytes = sub.recv_multipart()
    cat = cat_bytes.decode('utf-8')
    hat = hat_bytes.decode('utf-8')
    print('Subscribe: %s wears a %s' % (cat, hat))"""

" zmq_sub.py\nimport zmq\nhost = '127.0.0.1'\nport = 6789\nctx = zmq.Context()\nsub = ctx.socket(zmq.SUB)\nsub.connect('tcp://%s:%s' % (host, port))\ntopics = ['maine coon', 'persian']\nfor topic in topics:\n    sub.setsockopt(zmq.SUBSCRIBE, topic.encode('utf-8'))\nwhile True:\n    cat_bytes, hat_bytes = sub.recv_multipart()\n    cat = cat_bytes.decode('utf-8')\n    hat = hat_bytes.decode('utf-8')\n    print('Subscribe: %s wears a %s' % (cat, hat))"

In [9]:
# サーバー/クライアント
# UDP
# udp_server.py
"""
from datetime import datetime
import socket

server_address = ('localhost', 6789)
max_size = 4096

print('Starting the server at', datetime.now())
print('Waiting for a client to call.')
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server.bind(server_address)

data, client = server.recvfrom(max_size)

print('At', datetime.now(), client, 'said', data)
server.sendto(b'Are you talking to me?', client)
server.close()
"""

"\nfrom datetime import datetime\nimport socket\n\nserver_address = ('localhost', 6789)\nmax_size = 4096\n\nprint('Starting the server at', datetime.now())\nprint('Waiting for a client to call.')\nserver = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\nserver.bind(server_address)\n\ndata, client = server.recvfrom(max_size)\n\nprint('At', datetime.now(), client, 'said', data)\nserver.sendto(b'Are you talking to me?', client)\nserver.close()\n"

In [10]:
# udp_client.py
"""
import socket
from datetime import datetime

server_address = ('localhost', 6789)
max_size = 4096

print('Starting the client at', datetime.now())
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client.sendto(b'Hey!', server_address)
data, server = client.recvfrom(max_size)
print('At', datetime.now(), server, 'said', data)
client.close()
"""

"\nimport socket\nfrom datetime import datetime\n\nserver_address = ('localhost', 6789)\nmax_size = 4096\n\nprint('Starting the client at', datetime.now())\nclient = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)\nclient.sendto(b'Hey!', server_address)\ndata, server = client.recvfrom(max_size)\nprint('At', datetime.now(), server, 'said', data)\nclient.close()\n"

In [11]:
# TCP
""" tcp_client.py
import socket
from datetime import datetime

address = ('localhost', 7000)
max_size = 1000

print('Starting the client at', datetime.now())
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(address)
client.sendall(b'Hey!')
data = client.recv(max_size)
print('At', datetime.now(), 'someone replied', data)
client.close()
"""

"\nimport socket\nfrom datetime import datetime\n\naddress = ('localhost', 7000)\nmax_size = 1000\n\nprint('Starting the client at', datetime.now())\nclient = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\nclient.connect(address)\nclient.sendall(b'Hey!')\ndata = client.recv(max_size)\nprint('At', datetime.now(), 'someone replied', data)\nclient.close()\n"

In [12]:
# TCP
""" tcp_server.py
from datetime import datetime
import socket

address = ('localhost', 7000)
max_size = 1000

print('Starting the server at', datetime.now())
print('Waiting for a client to call.')
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(server_address)
server.listen(5)

client, addr = server.accept()
data = client.recv(max_size)

print('At', datetime.now(), client, 'said', data)
client.sendall(b'Are you talking to me?')
client.close()
server.close()
"""

" tcp_server.py\nfrom datetime import datetime\nimport socket\n\naddress = ('localhost', 7000)\nmax_size = 1000\n\nprint('Starting the server at', datetime.now())\nprint('Waiting for a client to call.')\nserver = socket.socket(socket.AF_INET, socket.SOCK_STREAM)\nserver.bind(server_address)\nserver.listen(5)\n\nclient, addr = server.accept()\ndata = client.recv(max_size)\n\nprint('At', datetime.now(), client, 'said', data)\nclient.sendall(b'Are you talking to me?')\nclient.close()\nserver.close()\n"

In [None]:
# ZeroMQ 強化版ソケット
# 標準のsocketがやってほしかったことをやってくれる
