# 11.1  並行処理

## 11.1.1  キュー

キューはFIFO(先入れ先出し)と呼ばれる  
ひとつの仕事を一人が担当していけば、一人ですべての仕事を担当するよりも早く終わる  
たとえそれが同期的(行列のように順番に続いていく場合)でも非同期的(タスクが独立してバラバラにある場合)でも早く終わることは変わらない

## 11.1.2  プロセス

キューの実装方法はたくさんあり、1台のマシンを使う場合、標準ライブラリのmultiprocessingモジュールのQueue関数を使う方法がある

In [None]:
# python3 - pyworks\dishes.py
import multiprocessing as mp

def washer(dishes, output):
    for dish in dishes:
        print(f"Washing {dish} dish")
        output.put(dish)
        
def dryer(input):
    while True:
        dish = input.get()
        print(f"Drying {dish} dish")
        input.task_done()
        
if __name__ == "__main__":
    dish_queue = mp.JoinableQueue()
    dryer_proc = mp.Process(target=dryer, args=(dish_queue,))
    dryer_proc.daemon = True 
    dryer_proc.start()

    dishes = ["salad","bread","entree","dessert"]
    washer(dishes, dish_queue)
    dish_queue.join()

Washing salad dish
Washing bread dish
Washing entree dish
Washing dessert dish


上のコードをdishes.pyとしてコマンドプロンプトで `$ python dishes.py`と実行すると  
`Washing salad dish
 Washing bread dish
 Washing entree dish
 Washing dessert dish
 Drying salad dish
 Drying bread dish
 Drying entree dish
 Drying dessert dish`  
と返ってくる

multiprocessモジュールの JoinableQueue 関数は、Queue 関数の子クラスでtask_doneメソッドやjoinメソッドが追加されたクラスになる  
JoinableQueue関数を使って Queue オブジェクトを作成  
Process 関数に引数のtargetにdryer関数、argsにQueueオブジェクト(のちのwasher関数のoutput.putから返される値が入る)を渡して、Processオブジェクトを作成  
ProcessオブジェクトのdaemonアトリビュートをTrueにしているがWindowsではどちらでもよい(Trueにすることでプロセスが終了するとき、そのプロセスはその子プロセスであるデーモンプロセスすべてを終了させようとする)  
Processオブジェクトをstartして処理を始めているが、すぐには開始せず、ほかのプロセスが終了するのを待つ  
 
washer関数にQueueオブジェクトを渡して、washer関数内の処理でQueueオブジェクトのputメソッドを使って値をQueueオブジェクト内に保存する  
すべての処理が完了したら、Queueオブジェクトのjoin関数がProcessオブジェクトの処理の処理を始めるように伝え、Processオブジェクトのdryer関数の処理が始まる  
dryer関数にはQueueオブジェクトが渡されていて、get関数を使うことで値を先入先出に従って取り出していく  
Queueオブジェクトのtask_doneメソッドで以前に渡された要素の処理が完了したことを示していて、すべてを取り出したら処理を終了し、whileループから抜け出す

## 11.1.3  スレッド

スレッドはプロセス内で実行され、プロセス内のすべてのものにアクセスできる

In [3]:
# python3 - pyworks\threads.py
import threading

def do_this(what):
    whoami(what)
    
def whoami(what):
    print(f"Thread {threading.current_thread()} says: {what}")
    
if __name__ == "__main__":
    whoami("I'm the main program")
    for n in range(4):
        p = threading.Thread(target=do_this, args=(f"I'm function {n}",))
        p.start()
! python pyworks\threads.py

Thread <_MainThread(MainThread, started 8728)> says: I'm the main program
Thread <Thread(Thread-12, started 14548)> says: I'm function 0
Thread <Thread(Thread-13, started 5700)> says: I'm function 1
Thread <Thread(Thread-14, started 16816)> says: I'm function 2
Thread <Thread(Thread-15, started 20632)> says: I'm function 3
Thread <_MainThread(MainThread, started 11596)> says: I'm the main program
Thread <Thread(Thread-1, started 13532)> says: I'm function 0
Thread <Thread(Thread-2, started 15188)> says: I'm function 1
Thread <Thread(Thread-3, started 16596)> says: I'm function 2
Thread <Thread(Thread-4, started 17356)> says: I'm function 3


threadingモジュールのTread関数を呼んでキーワード引数のtargetに関数、argsに引数を渡すことでThreadオブジェクトを作成できる  
Threadオブジェクトのstartメソッドを呼ぶことで複数スレッドを使って処理できる

In [4]:
# python3 - pyworks\thread_dishes.py
import threading, queue, time

def washer(dishes, dish_queue):
    for dish in dishes:
        time.sleep(5)
        print("Washing", dish)
        dish_queue.put(dish)
        
def dryer(dish_queue):
    while True:
        dish = dish_queue.get()
        time.sleep(8)
        print("Dring", dish)
        dish_queue.task_done()
        
dish_queue = queue.Queue()
for n in range(2):
    dryer_thread = threading.Thread(target=dryer, args=(dish_queue,))
    dryer_thread.start()
    
dishes = ["salad", "bread", "entree", "desert"]
washer(dishes, dish_queue)
dish_queue.join()

Washing salad
Washing bread
Dring salad
Washing entree
Dring bread
Washing desert
Dring entree
Dring desert


threadingモジュールとqueueモジュールを使うことでmultiprocessingモジュールを使わないでも同じような処理ができる  

しかし、threadingモジュールには terminate 関数がないため、問題が起きた時に強制終了する方法がないという違いがある

## 11.1.4  グリーンスレッドとgevent

別スレッドや別プロセスで実行する以外にイベント駆動型プログラミング(コンピュータプログラムが起動すると共にイベントを待機し、発生したイベントに従って受動的に処理を行うプログラミングパラダイムのこと)を使う方法がある  
イベント駆動プログラムは、中央でイベントループを実行し、仕事を少しずつ外部に分け与えてループを繰り返す  
gevent ライブラリはイベント駆動で、通常の命令型のコードを書くと、geventがオブジェクトをコルーチン(プログラミングの構造の一種で、サブルーチンがエントリーからリターンまでを一つの処理単位とするのに対し、コルーチンはいったん処理を中断した後、続きから処理を再開できる)に変換する  
geventは多くのPython標準オブジェクトを書き換え、geventのメカニズムを使うようにさせるが、一部のデータベースドライバのようにCで書かれたPython拡張コードは操作できない

In [5]:
import gevent
from gevent import socket
hosts = ["www.crappytaxidermy.com", "www.walterpottertaxidermy.com", "www.antique-taxidermy.com"]
jobs = [gevent.spawn(gevent.socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5)
for job in jobs:
    print(job.value)

66.6.44.4
104.27.172.75
None


In [6]:
gevent.socket.gethostbyname("www.crappytaxidermy.com")

'66.6.44.4'

geventライブラリの socket モジュールの gethostbyname 関数は渡されたドメイン名に対応するIPアドレスを返す  
普通の socket モジュールにも gethostbyname 関数はあるが同期的なので世界中のネームサーバーとをつかまえて解決しようと競うために時間がかかるが、geventライブラリのsocketモジュールのgethostbynameは非同期的なので複数のサイトを独立してバラバラに解決することができるの早く処理が終わる  
geventの spawn 関数は渡された関数を実行するためのグリーンスレッド(オペレーティングシステムではなく仮想マシン (VM) によってスケジュールされるスレッド)を作成する  
グリーンスレッドは通常のスレッドとは違い、ブロック処理(他のスレッドがその資源にアクセスできないようする処理で，排他処理や排他制御ともいう)をしない  
通常のスレッドをブロックしてしまうことが起きてもgeventはほかのグリーンスレッドのどれかに制御を切り替える  
geventの joinall 関数は派生させた処理が処理されるのを待つ  

geventのsocketモジュールではなく、モンキーパッチング関数を使う方法もあり、標準モジュールのsocketなどをグリーンスレッドを使うように書き換えることができる  
長いコードの最後のほうまでgeventを適用したいときに便利

In [7]:
from gevent import monkey
monkey.patch_socket()

プログラムの冒頭に上のコードを呼び出すことでできる  
geventライブラリの monkey モジュールの patch_socket 関数を呼ぶことで全ての箇所にgeventのsocketが挿入される

In [8]:
from gevent import monkey
monkey.patch_all()

True

  with loop.timer(seconds, ref=ref) as t:


上のようにmonkeyモジュールの patch_all 関数を呼ぶとさらに多くの標準ライブラリがモンキーパッチングされる  

In [9]:
import gevent, socket
from gevent import monkey; monkey.patch_all()

hosts = ["www.crappytaxidermy.com", "www.walterpottertaxidermy.com", "www.google.com"]
jobs = [gevent.spawn(socket.gethostbyname, host) for host in hosts]
gevent.joinall(jobs, timeout=5)
for job in jobs:
    print(job.value)

66.6.44.4
104.27.172.75
172.217.161.228
The history saving thread hit an unexpected error (This operation would block forever
	Hub: <Hub '' at 0x2841fac9b88 default pending=0 ref=0 thread_ident=0x58c8>
	Handles:
[HandleState(handle=<cdata 'struct uv_handle_s *' 0x000002841F68CA78>, type=b'check', watcher=<loop at 0x28420ee8888 default pending=0 ref=0>, ref=0, active=1, closing=0),
 HandleState(handle=<cdata 'struct uv_handle_s *' 0x000002841D1E2518>, type=b'timer', watcher=<loop at 0x28420ee8888 default pending=0 ref=0>, ref=0, active=1, closing=0),
 HandleState(handle=<cdata 'struct uv_handle_s *' 0x000002841F68E288>, type=b'prepare', watcher=<loop at 0x28420ee8888 default pending=0 ref=0>, ref=0, active=1, closing=0),
 HandleState(handle=<cdata 'struct uv_handle_s *' 0x000002841F68D628>, type=b'check', watcher=<loop at 0x28420ee8888 default pending=0 ref=0>, ref=1, active=0, closing=0)]).History will not be written to the database.


上のようにmonkeyモジュールのpatc_all関数を呼ぶことで、標準のsocketモジュールでもグリーンスレッドを呼び素早く処理することができる

## 11.1.5  twisted

twistedは非同期のイベント駆動型ネットワークフレームワークで、データ受信や接続切断といったイベントに関数を結びつけると、イベントが発生したときに関数が呼び出される(コールバック：コンピュータプログラム中で、ある関数などを呼び出す際に別の関数などを途中で実行するよう指定する手法のこと)

http://bit.ly/twisted-ex にサンプルコードがいくつかあるのでダウンロードしてみるとよい

## 11.1.6  asyncio

現在、asyncioはtwisted,geventなどの非同期メソッドと互換性のある共通イベントループを提供している  
目標はクリーンで標準的なパフォーマンスに優れた非同期的APIを提供することなので、今後注目

## 11.1.7  Redis

1台のマシンでもネットワークでも実行できるキューに対する別のアプローチがRedisを使うことでできる  
つまり、シングルマシンからマルチマシンによる並行処理への橋渡しができる

In [None]:
# python3 - pyworks\redis_washer.py
import redis
conn = redis.Redis()
print("Washer is starting")
deishes = ["salad","bread","entree","dessert"]
for dish in dishes:
    msg = dish.encode("utf-8")
    conn.rpush("dishes", msg)
    print("Washed", dish)
conn.rpush("dishes","quit")
print("Washer is done")

Washer is starting
Washed salad
Washed bread
Washed entree
Washed desert
Washer is done


Traceback (most recent call last):
  File "src/gevent/_abstract_linkable.py", line 134, in gevent.__abstract_linkable.AbstractLinkable._notify_links
greenlet.error: cannot switch to a different thread
2020-09-29T04:19:39Z (<built-in method switch of greenlet.greenlet object at 0x000002841F993468>, <gevent._event.Event object at 0x000002842109BE28>) failed with error



In [15]:
# python3 - pyworks\redis_dryer.py
import redis
conn = redis.Redis()
print("Dryer is starting")
while True:
    msg = conn.blpop("dishes")
    if not msg:
        break
    val = msg[1].decode("utf-8")
    if val == "quit":
        break
    print("Dried", val)
print("Dishes are dried")

Dryer is starting
Dried salad
Dried bread
Dried entree
Dried desert
Dishes are dried


コマンドプロンプトを起動して上のredis_dryer.pyを実行した後に、ほかのコマンドプロンプトを起動してredis_washer.pyを実行すると、待機していたredis_dryer.pyのほうも同時に処理が始まる

redis_washer.pyのコード内容としては、まず処理の開始をprintで合図、リストを作成、forループ内でmsg変数にリストの内容を保存、Redisのリストにdishesという名前を付けて文字列を右から追加していき、どの文字列の処理が終わったかをprintで伝える、すべてのループが終わったらdishesリストにquitという文字列を追加、printですべての処理が終わったことを伝える

redis_dryer.pyのコード内容としては、処理の開始されたことをprintで伝える、whileループでRedisのdishesというリスト名の左から値を取り出し変数msgに代入しリストから削除する、もしまだredis_dryer.pyが起動していない場合はblpopでリスト名が見つかるまで待機される、もし変数msgから何も返されなくなったらリストが空になった証拠なのでループから抜け出す、blpopを使って返されるのはリスト名と消したバイト文字列のタプルなのでmsg[ 1 ]でバイト文字列のみを指定して文字列にデコードしてval変数に代入する、もし変数valがquitならループから抜け出す、valの処理が終了したことを表示、全ての処理が終了したことを表示

In [16]:
# python3 - pyworks\redis_dryer2.py
def dryer():
    import redis, os, time
    conn = redis.Redis()
    pid = os.getpid()
    timeout = 20
    print(f"Dryer process {pid} is starting")
    while True:
        msg = conn.blpop("dishes", timeout)
        if not msg:
            break
        val = msg[1].decode("utf-8")
        if val == "quit":
            break
        print(f"{pid}: dried {val}")
        time.sleep(0.1)
    print(f"Dryer process {pid} is done")
    
import multiprocessing
DRYER=3
for num in range(DRYER):
    p = multiprocessing.Process(target=dryer)
    p.start()

redis_dryer.pyの代わりに上のredis_dryer2.pyを起動してみると、複数のスレッドを使って処理をすることができる  
DRYERの値を3にしてあるのでforループを3回繰り返しスレッドも3作られ作成したdryer関数を3つのスレッドで処理される  
試しに実行してみると  
`Dryer process 20368 is starting
 Dryer process 3368 is starting
 Dryer process 15008 is starting
 20368: dried salad
 3368: dried bread
 15008: dried entree
 Dryer process 20368 is done
 15008: dried dessert
 Dryer process 3368 is done
 Dryer process 15008 is done`  
とdryerのプロトコルでは表示される  
プロセスがまず3つ起動し、データがRedisのdishesリストに追加されるたびに別々のスレッドが処理してくれる  
しかし、実際にquitの文字列を受けっとって終了できるのは一つのスレッドのみなので、blpopメソッドの第二引数(キーワード引数はtimeout)に秒数を渡すことで、渡された秒数後にNoneを返すようにしている  
上ではtimeoutは20に設定されているので20秒後にbreak文に向かうことができ、処理を終了させることができる

## 11.1.8  キューを越えて

動く部品が多くなると混乱が生じる可能性が高くなる  
そういった問題に対処するためのテクニックがいくつかある  
- ファイア・アンド・フォーゲット  
仕事を渡したら、たとえ渡し先がなかったとしても無視して続ける行動のこと
- 要求/応答  
要求のひとつひつを確認してから渡す
- バックプレッシャまたはスロットリング  
下流の処理が追い付かなくなったときに、処理の早いところにペースを落とすよう指示する

実際のシステムではワーカーが需要についていけてるかに注意する必要がある  
新しいタスクは保留リストに追加、なんらかのワーカープロセスがメッセージをポップして作業中リストに追加、処理が終了したら作業中リストから取り除いて完了済みリストに追加する  
こうすればどのタスクがエラーを起こしたか、時間がかかりすぎているかがわかる  
Pythonベースのキューパッケージの中にはこのような管理レベルを追加したものがある  
- celery  
multiprocessing、geventなどを使って同期的でも非同期的でもタスク分散できる
- thoonk  
Redisを基礎としていてジョブキューとパブサブを提供している
- rq  
ジョブキューのためのPythonライブラリで、Redisを基礎としている
- Queues  
このサイトはPythonベースのものやその他のものを含めてのキューイングソフトウェアについての議論の場になっている

# 11.2  ネットワーク

以降ではネットワーキング、すなわち空間的に広がった分散コンピューティングを扱う

## 11.2.1  パターン

ネットワークアプリケーションは、いくつかの基本パターンから作ることができる  
もっとも一般的なパターンは、要求/応答(クライアント/サーバー)というものになる  
このパターンは同期的で、クライアントはサーバーが応答を返してくれるまで待つ  
例えばウェブブラウザもクライアントであり、ウェブサーバーに対してHTTP要求をを送ることでサーバーは応答を返してくれる  

プロセスプール内の態勢が整ったワーカーにデータを送るプッシュ(ファンアウト)もよく見られるパターンである  
例えばロードバランサーの先にあるウェブサーバーがそれにあたる  

プッシュの逆はプル(ファンイン)で、一つ以上のソースからデータを受け付ける  
例えば複数のプロセスからテキストのメッセージを受け取り、単一のログファイルに書き込むロガーがそれにあたる  

パブリッシュ/サブスクライブ(パブサブ)という、ラジオやテレビの放送とよく似たパターンがある  
このパターンでは、パブリッシャがデータを送り出す  
単純なパブサブシステムでは、すべてのサブスクライバがコピーを受け取る  
プッシュパターンとは異なり、複数のサブスクライバが同じデータを受け取る可能性がある

## 11.2.2  パブリッシュ/サブスクライブモデル

パブリッシュ/サブスクライブはキューではなくブロードキャスト(放送という意味の英単語で、通信・ネットワークの分野ではネットワークに参加するすべての機器に同時に信号やデータを送信すること)だ  
一つ以上のプロセスがメッセージをパブリッシュ(発行)する  
個々のサブスクライバ(購読者)プロセスは、どのようなタイプのメッセージを受け取りたいかを指定する  
指定されたタイプに合致するメッセージのコピーが個々のサブスクライバに送られる  
つまり、メッセージは一度処理されるかもしれないかもしれないし、複数回処理されるかもしれないし、全く処理されない可能性もある

### 11.2.2.1  Redis

Redisを使うことで、手っ取り早くパブサブシステムを作れる  
パブリッシャはトピック値を持つメッセージを送る  
サブスクライバはどのトピックを受信したいかを指定する

In [None]:
# python3 - pyworks\redis_pub.py
import redis, random
conn = redis.Redis()
cats = ["siamese", "persian", "maine coon", "norwegian forest"]
hats = ["stovepipe", "bowler", "tam-o-shanter", "fedora"]
for msg in range(10):
    cat = random.choice(cats)
    hat = random.choice(hats)
    print(f"Publish: {cat} wears a {hat}")
    conn.publish(cat,hat)

In [None]:
# python3 - pyworks\redis_sub.py
import redis
conn = redis.Redis()
topics = ["main coon", "persian"]
sub = conn.pubsub()
sub.subscribe(topics)
for msg in sub.listen():
    if msg["type"] == "message":
        cat = msg["channel"]
        hat = msg["data"]
        print(f"Subscribe: {cat} wears a {hat}")

コマンドプロンプトでredis_sub.pyを起動し待機させる、もう一つのコマンドプロンプトでredis_pub.pyを起動する  
するとredis_pubのほうでは10通りのメッセージが表示され、redis_subのほうではmain coonとpersianのメッセージのみが表示される  

redis_pub.pyのほうの内容としては、redisオブジェクトのpublish関数にchannnelとmessage(data)の2つを渡すことでパブリッシャ(発行)の機能を行う  

redis_sub.pyのほうの内容としては、まずredisオブジェクトのpubsub関数を呼んでpubsubオブジェクトを作成している  
pubsubオブジェクトの subscribe メソッドに取得したいchannelのリストを渡して設定している(上ではmain coonとpersian)  
pubsubオブジェクトの listen メソッドでchannelを取得するまで待機して、取得したらtype、pettern、channel、dataがキーの辞書になって返ってくる  
辞書の中のtypeの値がmessageの時は内容を受信したときで、channelの値にchannel名、dataの値にmessage(data)が入っている

### 11.2.2.2  ZeroMQ

ZeroMQは中央のサーバーという存在がないので、個々のパブリッシャがすべてのサブスクライバに書き込みをする  

In [None]:
# python3 - pyworks\zmq_pub.py
import zmq, random, time
host = "*"
port = 6789
ctx = zmq.Context()
pub = ctx.socket(zmq.PUB)
pub.bind(f"tcp://{host}:{port}")
cats = ["siamese", "persian", "maine coon", "norwegian forest"]
hats = ["stovepipe", "bowler", "tam-o-shanter", "fedora"]
time.sleep(1)
for msg in range(10):
    cat = random.choice(cats)
    cat_bytes = cat.encode("utf-8")
    hat = random.choice(hats)
    hat_bytes = hat.encode("utf-8")
    print(f"Publish: {cat} wears a {hat}")
    pub.send_multipart([cat_bytes, hat_bytes])

In [None]:
# pytohn3 - pyworks\zmq_sub.py
import zmq
host = "127.0.0.1"
port = 6789
ctx = zmq.Context()
sub = ctx.socket(zmq.SUB)
sub.connect(f"tcp://{host}:{port}")
topics = ["maine coon", "persian"]
for topic in topics:
    sub.setsockopt(zmq.SUBSCRIBE, topic.encode("utf-8"))
while True:
    cat_bytes, hat_bytes = sub.recv_multipart()
    cat = cat_bytes.decode("utf-8")
    hat = hat_bytes.decode("utf-8")
    print(f"Subscribe: {cat} weras a {hat}")

コマンドプロンプトでzmq_sub.pyを実行後、別のコマンドプロンプトでzmq_pub.pyを実行すると、
zmq_pub.pyでは  
`Publish: persian wears a bowler
 Publish: maine coon wears a tam-o-shanter
 Publish: persian wears a stovepipe
 Publish: maine coon wears a stovepipe
 Publish: norwegian forest wears a bowler
 Publish: norwegian forest wears a stovepipe
 Publish: maine coon wears a fedora
 Publish: persian wears a tam-o-shanter
 Publish: persian wears a fedora
 Publish: siamese wears a fedora`  
のように表示され、待機中だったzmq_sub.pyでは  
`Subscribe: persian wears a bowler
 Subscribe: maine coon wears a tam-o-shanter
 Subscribe: persian wears a stovepipe
 Subscribe: maine coon wears a stovepipe
 Subscribe: maine coon wears a fedora
 Subscribe: persian wears a tam-o-shanter
 Subscribe: persian wears a fedora`  
 と表示される

zmq_pub.pyの内容は、zmqモジュールの Context 関数を呼び出してcontextオブジェクトを作成  
contextオブジェクトの socket メソッドにsocket_type(zmq.PUB==1)を渡してsocketオブジェクト(pubモード)を作成  
socketオブジェクトの bind メソッドにTCPのIPアドレスを渡すことで、そのアドレスと結びつける  
socketオブジェクトの send_multipart メソッドにサブスクライバ(recv_multipart)に送りたいトピックを渡す

zmq_sub.pyの内容は、zmqモジュールの Context 関数を呼び出してcontextオブジェクトを作成  
contextオブジェクトの socket メソッドにsocket_type(zmq.SUB==2)を渡してsocketオブジェクト(subモード)を作成  
socketオブジェクトの connect メソッドにTCPのIPアドレスを渡して、そのアドレスと接続する  
socketオブジェクトの setsockopt メソッドにオプション(zmq.SUBSCRIBE==6)と設定したい文字列を渡して、取得したい情報を設定  
whileループの中でsocketオブジェクトの recv_multipart メソッドを呼ぶことでパブリッシャ(send_multipart)から送られてきた文字列をリストとして取得することができる

### 11.2.2.3  その他のパブサブツール 

- RabbitMQ
有名なメッセージングシステムで、pikaというPython APIがある

## 11.2.3  TCP/IP

インターネットは、接続の開設、データの交換、接続の切断、タイムアウトの処理などをどのようにすべきを決めた規則を基礎としている  
これらをプロトコルと呼ばれ、レイヤ(階層)に分けられている  
レイヤ化されているのは、イノベーション(新しい技術の発明)を促し、同じことをする別の方法を作れるようにするため  
プロトコルに従って上下のレイヤとのやり取りを行う限り、どんなことでもできる  

もっとも下のレイヤは電気信号などを規定している  
上位レイヤは下位レイヤを基礎として作られている  
中間にはIP(Internet Protocol)レイヤがあり、ネットワーク内での位置のアドレッシングの方法とデータのパケット(チャンク)の流し方を規定している  
IPのすぐ上のレイヤには、位置間でバイトを移動する方法を記述する以下の2つのプロトコルがある  
- UDP(User Datagram Protocol)  
短いデータの交換に使われる。データグラムはハガキに書かれたコメントのように単発で送られる小さなメッセージのこと
- TCP(Transmission Control Protocol)  
このプロトコルは、UDPよりも寿命の長い接続のために使われる。バイトのストリーム(データの送受信や処理を連続的に行うこと)を送り、重複なく順番にデータが届くことを保証する

UDPメッセージには受信確認がないので、相手先に届いたかどうかがわからない  
TCPは、送信側と受信側の間で密かにつながっており、確実な接続を保証する  
インターネットで操作するほとんどのもの(ウェブやデータベースサーバーなど)はIPプロトコルの上で実行されるTCPプロトコルを基礎としている  
TCP/IPはこれを簡潔に言ったものである

## 11.2.4  ソケット

ネットワークプログラミングの最下層は、C言語とUnixオペレーティングシステムから借用してソケットを使うことになる  
ソケットレベルのコーディングは面倒だが、ネットワーキングエラーが起きた時にソケットについてのメッセージが現れることがある  

クライアントサーバーの非常に単純なやり取りを書いてみる  
クライアントは、サーバーにUDPデータグラムで文字列を送る  
サーバーは文字列を格納するデータパケットを送る  
サーバーは特定のアドレスの特定のポートでリスン(データの受信待ち)をしなければならない  
クライアントはメッセージを送ったり、応答を受け取ったりするためにはアドレスとポートの値を知っていなければならない  

次のクライアントとサーバーのコードでは、アドレスは(address, port)のタプルで、addressは文字列であり、名前("localhost")でもIPアドレス(127.0.0.1)でもよい

例として、まず片方のプロセスからもう片方のプロセスに小さなデータを送り、小さな返事を送り返そう  
第1のプログラムがサーバーで第2のプログラムがクライアントになる  
どちらのプログラムでも時刻を表示してからソケットを開く  
サーバーは自分のソケットに対する接続をリスンし、クライアントは自分のソケットに書き込みをする  
すると、クライアントのソケットはサーバーにメッセージを送る

In [None]:
# python3 - pyworks\udp_server.py
import socket
from datetime import datetime
server_address = ("localhost", 6789)
max_size = 4096
print("Starting the server at", datetime.now())
print("Waiting for a client to call")
server = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
server.bind(server_address)
data, client = server.recvfrom(max_size)
print(f"At {datetime.now()} {client} said {data}")
server.sendto(b"Are you talking to me?", client)
server.close()

In [None]:
# python3 - pyworks\udp_client.py
import socket
from datetime import datetime
server_address = ("localhost", 6789)
max_size = 4096
print("Starting the client at", datetime.now())
client = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
client.sendto(b"Hey!", server_address)
data, server = client.recvfrom(max_size)
print(f"At {datetime.now()} {server} said {data}")
client.close()

まず上のudp_server.pyを起動すると  
`Starting the server as 2020-07-14 00:12:59.557508
 Waiting for a client to call`  
とリスン(受信待ち)状態に入る  
次にudp_client.pyを起動すると  
`Starting the client at 2020-07-14 00:13:22.652083
 At 2020-07-14 00:13:22.667723 ('127.0.0.1', 6789) said b'Are you talking to me?'`  
とデータ(メッセージ)を送った後に相手からの返信を受信し、表示している  
udp_server.pyのほうも、追加で  
`At 2020-07-14 00:13:22.667723 ('127.0.0.1', 53717) said b'Hey!'`  
とデータ(メッセージ)を受信した後にclient側に返信を行っている  

udp_server.pyの内容は、  
socketモジュールの socket 関数にfamilyとtypeを渡してsocketオブジェクトを作成している  
familyに渡してるsocket.AF_INET(==2)はインターネット(IP)ソケットを作るという意味で、  
typeに渡してるsocket.SOCK_DGRAM(==2)はデータグラムを送受信するという意味で、要するにUDPを使うということになる  
socketオブジェクトの bind メソッドにアドレスとポート番号のタプルを渡して、そのアドレスに届いたデータを受け取るためにリスン状態に入る  
データを受け取ると次に進み  
socketオブジェクトの recvfrom メソッドにデータの最大受信サイズを示す整数を渡すことで、受信したデータと相手のアドレスが入ったタプルを返す  
socketオブジェクトの sendto メソッドに送りたいデータと相手のアドレスとポート番号のタプルを渡すことで、データを相手のアドレスに送る  
最後にsocketオブジェクトの close メソッドで切断

udp_client.pyの内容は、
udp_serverの順番を変えただけで、  
socketオブジェクトの sendto メソッドを使って先にデータを送信した後に、  
socketオブジェクトの recvfrom メソッドを使ってデータを受信して、closeで終了している

クライアントはサーバーのアドレスとポート番号を知っていなければいけないが、自分自身のポート番号は自動的に割り当てられる(上の場合は53717)

UDPはひとつのチャンクでデータを送るため、データサイズによっては到達できなかったり、順番がバラバラで送られたりする  
高速で軽かったり、事前の経路確保のやり取りなどを必要としない(コネクションレス)だが、信頼性に欠ける  
そこでTCPの出番となる

TCPは、ウェブなどのUDPよりも寿命の長い接続などで使われる  
TCPは、送信側が送った順序でデータを送り届け、データに問題があれば再送を試みる

In [None]:
# python3 - pyworks\tcp_client.py
import socket
from datetime import datetime
address = ("localhost", 6789)
max_size = 1000
print("Starting the client at", datetime.now())
client = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
client.connect(address)
client.sendall(b"Hey!")
data = client.recv(max_size)
print(f"At {datetime.now()} someone replied {data}")
client.close()

In [None]:
# python3 - pyworks\tcp_server.py
import socket
from datetime import datetime
address = ("localhost", 6789)
max_size = 1000
print("Starting the server at", datetime.now())
print("Waiting for a client to call")
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(address)
server.listen(5)
client, addr = server.acccept()
data = client.recv(max_size)
print(f"At {datetime.now()} {client} said {data}")
client.sendall(b"Are you talking to me?")
client.close()
server.close()

まずtcp_server.pyを起動すると  
`Starting the server at 2020-07-14 11:35:46.995028
 Waiting for a client to call`  
と表示されリスン状態に入る  
次にtcp_client.pyを起動すると  
`Starting the client at 2020-07-14 11:35:50.340154
 At 2020-07-14 11:35:50.350046 someone replied b'Are you talking to me?'`  
と表示され、tcp_server.pyのほうも  
`At 2020-07-14 11:35:50.350046 <socket.socket fd=568, family=AddressFamily.AF_INET, type=SocketKind.SOCK_STREAM, proto=0, 
 laddr=('127.0.0.1', 6789), raddr=('127.0.0.1', 50758)> said b'Hey!'`  
と追加される

tcp_client.pyがUDPの時と違うところは、  
socketオブジェクトを作るときの socket メソッドにsocket.SOCK_STREAMを渡して、ストリーミングプロトコルであるTCPを使うことを知らせている  
socketオブジェクトの connect メソッドにアドレスを渡してストリームの接続をしている  
connect メソッドですでにアドレス先との接続は完了しているので、socketオブジェクトの sendto メソッドを使って送り先を指定する必要がなく、  
代わりにsocketオブジェクトの sendall メソッドを使って送りたいデータのみを渡している  
データを受け取るときもsocketオブジェクトの recv メソッドを使って受け取って、相手のアドレスは受け取らないようにしている

tcp_server.pyがUDPの時と違うところは、  
socketオブジェクトの socket メソッドにsocket.SOCK_STREAMを渡していること  
socketオブジェクトの listen メソッドに整数を渡して、5個のクライアント接続がたまったら新しい接続を拒否するように設定している  
socketオブジェクトの accept メソッドは、最初のデータが届いたときにデータの情報(相手のsocketオブジェクト)とアドレスを取り出すことができる  
相手のsocketオブジェクトを使って recv メソッドを呼び出し、データを受け取り、データの送信もしている  
socketオブジェクトを2つとも閉じる

ソケットレベルで対処しなければならない問題の一部を挙げる  
- UDPはメッセージを送るが、サイズが限られており、デスティネーション(相手)に届く保証がない  
- TCPはメッセージではなくバイトストリームを送る。毎回の呼び出しで何バイト送受信することになるかはわからない  
- TCPで大きなメッセージを交換するときには、セグメント(細分化された一部)からメッセージ全体を再構築するための新たな情報が必要になる  
固定メッセージサイズ(バイト)、メッセージ全体のサイズ、なんらかの区切り文字といったものを決める必要がある  
- メッセージはバイトでありUnicode文字列ではないので、Pythonのbytes型を使わなければならない

## 11.2.5  ZeroMQ

ZeroMQは強化版ソケットと呼ばれることがあり、ZeroMQソケットは普通のソケットがしてくれないこともしてくれる  
- メッセージ全体の交換  
- 接続の再試行  
- 送信側と受信側のタイミングが合わないときにデータを守るためのデータのバッファリング(処理速度や転送速度の差を補ったり、送受信データを一時的に保存しておくこと)

ZeroMQはソケットタイプの組み合わせによってさまざまなものを作ることができる  
以下はソケットタイプの一覧になる  
- REQ(同期要求)
- REP(同期応答)
- DEALER(非同期要求)
- ROUTERS(非同期応答)
- PUB(パブリッシュ)
- SUB(サブスクライブ)
- PUSH(プッシュ)
- PULL(プル)

もっとも簡単なパターンは要求/応答のペアで、同期的なので片方のソケットが要求を発行すると、反対側が応答する

In [None]:
# python3 - pyworks\zmq_server.py
import zmq
host = "127.0.0.1"
port = 6789
context = zmq.Context()
server = context.socket(zmq.REP)
server.bind(f"tcp://{host}:{port}")
while True:
    request_str = server.recv().decode("utf-8")
    print(f"That voice in my head says: {request_str}")
    reply_bytes = f"Stop saying: {request_str}".encode("utf-8")
    server.send(reply_bytes)

In [None]:
# python3 - pyworks\zmq_client.py
import zmq
host = "127.0.0.1"
port = 6789
context = zmq.Context()
client = context.socket(zmq.REQ)
client.connect(f"tcp://{host}:{port}")
for num in range(1,6):
    request_str = "message {num}"
    request_bytes = request_str.encode("utf-8")
    client.send(request_bytes)
    reply_str = client.recv().decode("utf-8")
    print(f"Sent {request_str}, received {reply_str}")

zmq_server.pyを起動した後にzmq_client.pyを起動すると  
zmq_server,pyは  
`That voice in my head says: message 1
 That voice in my head says: message 2
 That voice in my head says: message 3
 That voice in my head says: message 4
 That voice in my head says: message 5`  
を表示し、zmq_client.pyは  
`Send message 1, received Stop saying: message 1
 Send message 2, received Stop saying: message 2
 Send message 3, received Stop saying: message 3
 Send message 4, received Stop saying: message 4
 Send message 5, received Stop saying: message 5`  
を表示する

zmq_server.pyの内容は、  
zmqモジュールの Context メソッドを呼び出しContextオブジェクトを作成している  
Contextオブジェクトの socket メソッドにタイプを示すためのzmq.REP(同期応答)を渡して、socketオブジェクト(REP)を作成している  
socketオブジェクトの bind メソッドに特定のIPアドレスとポートを渡して、そのIPアドレスとポートでリスンするようにしている  
whileループの中で、socketオブジェクトの recv メソッドを使って内容を受信する  
その内容に応じて、socketオブジェクトの send メソッドにバイト文字列を渡して送信している

zmq_client.pyの内容は、  
zmqモジュールの Context メソッドを呼び出しContextオブジェクトを作成している  
Contextオブジェクトの socket メソッドにタイプを示すためのzmq.REQ(同期要求)を渡して、socketオブジェクト(REQ)を作成している  
client側ではsocketオブジェクトのbindではなく、connect メソッドを使ってIPアドレスとポートに接続する  
あとは同じで、socketオブジェクトの send で送り、recv で受け取っている

上のコードだと単純な文字列を使っているのでエンコードとデコードをすれば十分だったが、ほかのデータ型も含む場合は、MessagePackのようなライブラリを使うとよい  
サーバーは一度にひとつずつ同期的に要求を処理していくが、処理中に届いたほかの要求をなくしたりせず、ZeroMQは指定された上限まではメッセージをバッファリングする  
ZeroMQのMはMessageで、QはQueue、Zeroはブローカー(仲介者)不要という意味になる

ZeroMQは中央のブローカーを使うことを強制しないが、必要ならブローカーを作ることができる  
例えば、複数のREQソケットがひとつのROUTERに接続し、ROUTERは個々の要求をDEALERに渡す  
DEALERは自分に接続されているREPソケットとやり取りをする  
これは、ウェブサーバーファームの手前のプロキシサーバーとやり取りしている一連のブラウザという形とよく似ていて、複数のクライアントサーバーを必要なだけ追加できる  

ベンチレーターと呼ばれるネットワーキングパターンでは、PUSHソケットを使って非同期のタスクを分散し、PULLソケットを使って結果を集める

ZeroMQは、ソケット作成時にソケットの接続タイプを変更するだけでスケールアップとスケールダウンの両方を実現できる  
- tcpは、ひとつ以上のマシンのプロセス間の通信  
- ipcは、同じマシンのプロセス間の通信  
- inprocは、同じプロセスのスレッド間の通信

## 11.2.7  インターネットサービス

Pythonには、守備範囲の広いネットワーキングツールセットが備わっている  
以下の節では最もよく使われているインターネットサービスの一部の自動化の方法を見ていく

### 11.2.7.1  DNS

コンピュータは、85.2.101.94のような数値によるIPアドレスを持っているが、人間は数値よりも名前のほうが覚えやすい  
DNS(Domain Name System)は、分散データベースを使ってIPアドレスと名前を相互変換するインターネットサービスで、極めて重要になる  
DNS関数の一部は低水準のsocketモジュールに含まれている  

In [22]:
import socket

In [23]:
socket.gethostbyname("www.crappytaxidermy.com")

'66.6.44.4'

In [24]:
socket.gethostbyname_ex("www.crappytaxidermy.com")

('crappytaxidermy.com', ['www.crappytaxidermy.com'], ['66.6.44.4'])

socketモジュールの gethostbyname 関数にドメイン名を渡すことで、対応するIPアドレスを返してくれる  
拡張版の gethostbyname_ex 関数はドメイン名を渡すことで、引数の名前と代替名のリスト、IPアドレスのリストを返す

In [25]:
socket.getaddrinfo("www.crappytaxidermy.com", 80)

[(<AddressFamily.AF_INET: 2>, 0, 0, '', ('66.6.44.4', 80))]

socketモジュールの getaddrinfo 関数にドメイン名とポート番号を渡すことで、IPアドレスだけでなく、そこに接続するソケットを作るために必要な情報も返してくれる  
ポート番号の80は"http"と同じで、80の代わりに"http"を渡すことで同じ結果を得ることができる

In [26]:
socket.getaddrinfo("www.crappytaxidermy.com", 80, socket.AF_INET, socket.SOCK_STREAM)

[(<AddressFamily.AF_INET: 2>,
  <SocketKind.SOCK_STREAM: 1>,
  0,
  '',
  ('66.6.44.4', 80))]

上のようにキーワード引数のfamilyとtypeを指定して渡すこともできる

In [27]:
socket.getservbyname("http")

80

In [28]:
socket.getservbyport(80)

Traceback (most recent call last):
  File "src/gevent/_abstract_linkable.py", line 134, in gevent.__abstract_linkable.AbstractLinkable._notify_links
greenlet.error: cannot switch to a different thread
2020-09-29T04:50:10Z (<built-in method switch of greenlet.greenlet object at 0x000002841F993468>, <gevent._event.Event object at 0x00000284229B7528>) failed with error



'http'

一部のTCP、UDP、ポート番号はIANAによってサービス名と対応付けられている  
socketモジュールの getservbyname 関数にサービス名を渡すことで対応するポート番号を返してくれる  
逆に getservbyport 関数にポート番号を渡すことで対応するサービス名を返してくれる

### 11.2.7.2  Pythonの電子メールモジュール

標準ライブラリには以下のような電子メールモジュールが含まれている  
- smtplib  
SMTP(Simple Mail Transfer Protocol)で電子メールメッセージを送信する  
- email  
電子メールメッセージを作成、構文解析をする  
- poplib  
POP3(Post Office Protocol 3)で電子メールメッセージを読み出す  
- imaplib  
IMAP(Internet Message Access Protocol)で電子メールメッセージを読み出す

### 11.2.7.3  その他のプロトコル

標準の ftplib モジュールを使えば、FTP(File Transfer Protocol)を使ってバイトをプッシュできる

## 11.2.8  ウェブサービスとAPI

情報プロバイダは、必ずウェブサイトを持っているが、それは人間相手に見やすくしたもので、オートメーション(自動的に作業を行う仕組み)のためのものではない  
データがウェブサイトの形でしか公開されていなければ、データにアクセスして構造化したいユーザーはスクレイバーを作り、ページの書式が変わるたびに書き直さなければいけなくなる  
それに対し、ウェブサイトがデータにアクセスするためのAPIを提供していれば、クライアントプログラムが直接データにアクセスできる  
APIはウェブページほど頻繁に更新されないので、クライアントが書き直しをする回数も減る  

もっとも簡単なAPIは、ウェブインターフェースでありながら、JSONやXMLなどの構造化された形式でデータを提供してくれる  
そのようなAPIは必要最低限のものかもしれないし、本格的なRESTful APIかもしれないが、いずれにしてもバイト列の代わりに別の取り出し口を提供してくれる  

APIはTwitter、Facebook、LinkedInなどの有名なSNSサイトをマイニング(大量のデータを解析して、有用な情報を抽出すること)したいときに役立つ  
これらのサイトはすべて自由に使えるAPIを提供しているが、どれもユーザー登録して接続時に使うキーを手に入れる必要がある  
サイトは、キーによって誰がデータにアクセスしているかを知ることができたり、サーバーに対する要求トラフィックを制限するための手段としても使える

## 11.2.9  リモート処理

Pythonの表現のおかげでローカルマシンと同じようにほかのマシンのコードを呼び出すこともできる  
高度な設定の下では、1台のマシンのスペースを使い切ってしまったときに、ほかのマシンにまでスペースを広げることができる  
マシンのネットワークを相手にすれば、もっと多くのプロセス、スレッドにアクセスできる

### 11.2.9.1  RPC

RPC(Remote Procedure Calls)は、ネットワークを越えてリモートマシンで実行される  
URLや要求本体にエンコードされた引数を使ってRESTful APIを呼び出すのではなく、ローカルマシンのRCP関数を呼び出しすのである  
RPCクライアントで水面下で行われていることは以下のようなことになる  
1. RPC関数は関数への引数をバイト列に変換する(マーシャリング、シリアライズ、直列化、エンコードなどと呼ばれる)
2. エンコードされたバイト列をリモートマシンに送る  
リモートマシンでは以下のようなことが行われている  
1. エンコードされた要求バイトを受信する  
2. バイト列を受信し終えたら、RCPクライアントはバイト列をデコードして元のデータ構造を復元する  
3. ローカル関数を見つけて、デコードされたデータを渡して呼び出す  
4. 関数の実行結果をエンコードする  
5. エンコードされたバイトを呼び出し元に送り返す  
最後に呼び出しを送ったマシンがバイト列をデコードして値を返す

RPCは人気の高いテクニックで、さまざまな方法で実装されている  
サーバー側では、サーバープログラムを起動し、サーバーとなんらかのバイトトランスポート、エンコード/デコードメソッドをつなぎ、サービス関数を定義し、RPCが稼働中というサインを点灯する  
クライアントはサーバーに接続し、RPCを介して関数のどれかを呼び出す  

標準ライブラリには、XMLを交換形式とするRPCの実装が一つあり、 xmlrpc を使うことでできる  
サーバー側で関数を定義して登録すると、クライアントはまるでインポートされたもののようにその関数を呼び出すことができる

In [None]:
# python3 - pyworks\xmlrpc_server.py
from xmlrpc.server import SimpleXMLRPCServer
def double(num):
    return num * 2
server = SimpleXMLRPCServer(("localhost", 6789))
server.register_function(double, "double")
server.serve_forever()

In [None]:
# python3 - pyworks\xmlrpc_client.py
from xmlrpc.client import ServerProxy
proxy = ServerProxy("http://localhost:6789/")
num = 7
result = proxy.double(num)
print(f"Double {num} is {result}")

xmlrpc_server.pyを実行した後にxmlrpc_client.pyを実行すると  
xmlrpc_client.pyは  
`Double 7 is 14`  
と表示され、xmlrpc_serverは  
`127.0.0.1 - - [15/Jul/2020 15:35:46] "POST / HTTP/1.1" 200 -`  
と表示される

xmlrpc_server側の内容は  
xmlrpc.server モジュールの SimpleXMLRPCServer 関数をインポートし  
SimpleXMLRPCServer関数にアドレス名とポート番号のタプルを渡してサーバーと接続し server オブジェクトを取得する  
serverオブジェクトの register_function メソッドに関数オブジェクトと、名前を渡すことで関数をサーバーに渡す  
serverオブジェクトの serve_forever を呼ぶことでサーバを接続し続ける

xmlrpc_client.pyの内容は  
xmlrpc.client モジュールの ServerProxy 関数をインポートし  
ServerProxy関数にIPアドレスを渡してサーバーと接続し proxy オブジェクトを作成する  
proxyオブジェクトの関数として、serverにあげた関数名を呼ぶことができ、結果の値を取得できる

よく使われるトランスポートの手段はHTTPとZroMQで、よく使われるエンコーディングはXML、JSON、プロトコルバッファ、メッセージパックなどになる  
メッセージパック自身が用意しているPython RPC実装もある  
pip install msgpack-rpc-python でインストールできる

### 11.2.9.2  fabric

fabricパッケージは、リモート、ローカルコマンドを実行し、ファイルをアップロード、ダウンロードするために使われ、特権ユーザーのもとで使われる  
fabricパッケージは、リモートマシンでプログラムを実行するためにSSH(ネットワークを介して別のコンピュータにログインして操作するためのソフトウェアの一つ)を使う  
関数はfabricファイルにPython言語で書き、それをローカル、リモートのどちらで実行するかを指示する  
fabricプログラムを実行するときに、どのリモートマシンを使って関数を呼び出すか支持をする

現在、通常のfabricはPython3に対応していないので pip install fabric3 をインストールする

In [2]:
# python3 - pyworks\fab1.py
def iso():
    from datetime import date
    print(date.today().isoformat())

コマンドプロトコルで `$ fab -f fab1.py -H localhost iso` を実行すると  
`[localhost] Executing task 'iso'
 2020-07-15
 Done.`  
と表示される

`fab`でfabricの実行、`-f fab1.py`のオプション部分でデフォルトのfabfile.pyではなくfab1.pyファイルを呼び出すように指示、`-H localhost`のオプション部分でローカルマシンで実行するように指示、`iso`でファイル内の実行するべき関数を教えている

ローカル、リモートマシンで外部プログラムを実行するには、SSHサーバーが実行されていなければならない  
windowsでは組み込みSSHサポートはないので http://bit.ly/putty-ssh をインストールするとよい

In [3]:
# python3 - pyworks\fab2.py
from fabric.api import local
def iso():
    local("date")

`$ fab -f fab2.py -H localhost iso`で実行すると  
`[localhost] Executing task 'iso'
[localhost] local: date
現在の日付: 2020/07/15
新しい日付を入力してください: (年-月-日)
Done.`  
と表示される

上のように fabric.api モジュールの local 関数を関数定義内で使い文字列を渡すことで、ローカルで文字列を実行することができる  

### 11.2.9.3  Salt

Saltは、リモート実行を実現するための方法としてスタートしたが、現在では本格的なシステム管理プラットフォームに成長している  
SSHではなくZeroMQを使うことによって、数千台のサーバーにスケールアップにすることができる

## 11.2.10  ビックデータとMapReduce

Googleなどのインターネット企業は、成長して大企業になる過程で、従来のコンピューティングソリューションにはスケーラビリティがないことに気づいた  
数十台のマシンで動作するソフトウェイでも、数千台のマシン上で実行すると手に負えなくなる

データベースやファイルを格納するハードディスクは、ディスクヘッドの機械的な移動を必要とするシークがあまりにも多い(音楽レコードのようなもの)  
しかし、ディスクの連続した領域をストリーミング(通信ネットワークを介して動画や音声などを受信して再生する際に、データを受信しながら同時に再生を行う方式)すればはるかに高速に動作する

デベロッパーたちは、独立したマシンを使うよりも、ネットワーク化された多数のマシンにデータを分散させて分析したほうが高速だということに気づいていた  
MapReduceはそのようなもののひとつで、多くのマシンに計算をばらまき、結果を集める

Googleが結果を論文として公開すると、YahooもJavaベースのHadoopというオープンソースパッケージを公開した  
ビッグデータという言葉はこのようなところで使われる  
ディスク、メモリ、CPU時間、あるいはそれらすべてよりも大きいデータということだ  
Hadoopは、多数のマシンにデータをコピーし、MapReduceプログラムで処理して、各ステップで結果をディスクに保存する  
このパッチ処理は遅くなる危険性がある  
それに対し、Hadoopストリーミングという方法では、各ステップごとにディスク書き込みをせずにプログラム郡にデータをストリーミングしていく  
Hadoopストリーミングプログラムは、Pythonを含む任意の言語で書くことができる  

音楽のストリーミングで知られているSpotifyは、Hadoopストリーミング用に開発したPythonコンポーネント、Luigiをオープンソース化した  
Sparkというライバルは、Hadoopよりも10倍から100倍高速に実行されるように設計されている  
Sparkは、Hadoopデータソースと、フォーマットをすべて読み出し、処理することができ、また、Pythonなどの言語を対象とするAPIが含まれている  
もう一つのライバルシステム、DiscoはMapReduce処理にPython、通信にErlangを使っているが、pipではインストールできない

## 11.2.11  クラウドでの処理

分散システムを構築しなくても、クラウドサーバーを借りることができる  
このモデルを採用すると、メンテナンスは他人の問題になり、自分のサービスなどの世界に見せたいものに集中できる  
ウェブ経由で利用できるダッシュボードとAPIを使えば、必要な設定を持つサーバーを素早く簡単に立ち上げることができる  
クラウドは弾力的で、状態を監視しなんらかの指標が限界値を越えたらアラームを受けることができる  
以下ではクラウドを操作する人気の高い方法を見ていく

### 11.2.11.1  Google

Googleは社内でかなりPythonを使っており、App Engineのサイトに行き、Choose a LanguageでPythonボックスをクリックすると、Cloud PlaygroundにPythonコードを入力して結果を見ることができる  
そのあとすぐには、Python SDKをダウンロードするためのリンクと説明がある  
このSDKを使うと自分のハードウェア上でGoogleのクラウドAPIを呼び出すコードを開発できる  
そのあとでは、アプリケーションをAppEngineにデプロイする方法が説明されている  
Googleのメインクラウドページに行くと以下のようなGoogleクラウドサービスの説明を見ることができる  
- App Engine  
flask、django、などのPythonツールも含まれる高水準プラットフォーム  
- Compute Engine  
大規模な分散コンピューティングタスクのために仮想マシンクラスタを作る  
- Cloud Storage  
オブジェクトのストレージ  
- Cloud Datastore  
大規模なNoSQLデータベース  
- Cloud SQL  
大規模なSQLデータベース  
- Cloud Endpoints  
アプリケーションへのRESTfulによるアクセス  
- BigQuery  
Hadoop風のビッグデータ

### 11.2.11.2  Amazon

Amazonは、さまざまなソリューションを借用したり作りだしたりして、市場を支配しているAWS(Amazon Web Services)を作った  
もっとも重要なサービスは以下のようなものだ  
- Elastic Beanstalk  
高水準アプリケーションプラットフォーム  
- EC2(Elastic Computer)  
分散コンピューティング  
- S3(Simple Storage Service)  
オブジェクトのストレージ  
- RDS  
リレーショナルデータベース(MySQL,PostgreSQL,Oracle,MSSQL)  
- DynamoDB  
NoSQLデータベース  
- Redshift  
データウェアハウス  
- EMR  
Hadoop

### 11.2.11.3  OpenStack

OpenStackは、公開、非公開、ハイブリッドクラウドを作るために自由に使えるオープンソースプラットフォームで、6か月ごとに新リリースがある  
OpenStackは、CERN、PayPalを含む多くの企業、組織で本番用に使われている  
OpenStackのメインAPIはRESTfulで、プログラムインタフェースを提供するPythonモジュールとシェルオートメーションのためのコマンドラインPythonプログラムがある  
現在の標準サービスの一部を以下に載せる  
- Keystone  
認証、権限付与、サービスディスカバリなどの機能を提供するアイデンティティサービス  
- Nova  
ネットワーク化されたサーバーに作業を分散させる計算サービス  
- Swift  
AmazonのS3のようなオブジェクトストレージ  
- Glance  
中間水準のイメージストレージサービス  
- Cinder  
低水準のブロックストレージサービス  
- Horizon  
すべてのサービスを対象とするウェブベースのダッシュボード  
- Neutron  
ネットワーク管理サービス  
- Heat  
オーケストレーション(マルチクラウド)サービス  
- Ceilometer  
遠隔測定(メトリクス管理、モニタリング、検針)サービス