# Apache Zookeeper

Координационный сервис для распределенных приложений. Логически представляет из себя двевовидную структуру, наподобие файловой системы, к каждому узлу которой можно присвоить значение. 

Запускаем его в `Docker` вместе с `Kafka`:
```bash
docker run --name kafka_sandbox -v `pwd`:/course -p 2181:2181 -p 9092:9092 spotify/kafka
```

### Подключение

In [1]:
from kazoo.client import KazooClient

zk = KazooClient(hosts='127.0.0.1:2181')
zk.start()

### Создание элемента

In [2]:
zk.ensure_path('/node/a')
zk.set('/node/a', b'hello')

ZnodeStat(czxid=29, mzxid=30, ctime=1636632798620, mtime=1636632798622, version=1, cversion=0, aversion=0, ephemeralOwner=0, dataLength=5, numChildren=0, pzxid=29)

In [3]:
zk.ensure_path('/database2/host')
zk.set('/database2/host', b'192.168.1.2')

ZnodeStat(czxid=32, mzxid=33, ctime=1636632800059, mtime=1636632800062, version=1, cversion=0, aversion=0, ephemeralOwner=0, dataLength=11, numChildren=0, pzxid=32)

In [4]:
zk.get('/database2/host')

(b'192.168.1.2',
 ZnodeStat(czxid=32, mzxid=33, ctime=1636632800059, mtime=1636632800062, version=1, cversion=0, aversion=0, ephemeralOwner=0, dataLength=11, numChildren=0, pzxid=32))

можно иначе

In [5]:
zk.create('/node/b', b'hello2')

'/node/b'

### Отслеживание изменений

In [6]:
def callback(p):    
    print('Event detected: ', p)
    
zk.get('/node/a', callback)
zk.set('/node/a', b'hello2')

Event detected:  WatchedEvent(type='CHANGED', state='CONNECTED', path='/node/a')


ZnodeStat(czxid=29, mzxid=35, ctime=1636632798620, mtime=1636632803567, version=2, cversion=0, aversion=0, ephemeralOwner=0, dataLength=6, numChildren=0, pzxid=29)

### Транзакции

In [7]:
with zk.transaction() as t:
    t.create('/node/c', b'c value')
    t.create('/node/d', b'd value')
zk.get('/node/c')    

(b'c value',
 ZnodeStat(czxid=36, mzxid=36, ctime=1636632805734, mtime=1636632805734, version=0, cversion=0, aversion=0, ephemeralOwner=0, dataLength=7, numChildren=0, pzxid=36))

### Блокировки

In [8]:
import threading
import time

def thread_func(num):
    with zk.Lock('/node/a') as lock:
        print('Thread #{} lock'.format(num))
        zk.ensure_path('/node/a/c')
        zk.set('/node/a/c', b'hello')
        time.sleep(2)
        print('Thread #{} unlock'.format(num))

for i in range(3):
    threading.Thread(target=thread_func, args=(i,)).start()

Thread #0 lock
Thread #0 unlock
Thread #1 lock
Thread #1 unlock
Thread #2 lock


### Election

In [9]:
def thread_func(num):
    election = zk.Election("/node")
    
    def election_func():
        print('Election won: {}'.format(num))
    
    election.run(election_func)


for i in range(3):
    threading.Thread(target=thread_func, args=(i,)).start()

Election won: 0
Election won: 2
Election won: 1
Thread #2 unlock


### Очереди

In [10]:
if zk.exists('/queue'):
    zk.delete('/queue', recursive=True)
    
q = zk.Queue('/queue')
q.put(b'1', 10)
q.put(b'2', 0)

In [11]:
q.get()

b'2'

### Счетчики

In [12]:
if zk.exists('/counter'):
    zk.delete('/counter', recursive=True)
    
counter = zk.Counter('/counter')
counter += 20
counter -= 5
counter.value

15

In [13]:
counter += 100

In [14]:
zk.get('/counter')

(b'115',
 ZnodeStat(czxid=57, mzxid=60, ctime=1636632817262, mtime=1636632817398, version=3, cversion=0, aversion=0, ephemeralOwner=0, dataLength=3, numChildren=0, pzxid=57))

Connection dropped: socket connection broken
Transition to CONNECTING
Session has expired
