# Реализуйте двуфазный коммит протокол для high-available регистра (каждый регистр - отдельный процесс в системе)

In [7]:
import logging
import threading
import random
import sys

from multiprocessing import Process
from time import sleep
from typing import Optional

logging.basicConfig()

from kazoo.client import KazooClient
from kazoo.recipe.watchers import DataWatch

ACTION_COMMIT =   b'commit'
ACTION_ROLLBACK = b'rollback'
ACTION_DEAD = b'kill_yourself'
COUNT_CLIENTS = 3
DURATION = 0.5


WAIT_HARD_WORK_SEC = 15

zks = list()
processes = list()

In [8]:
class Client(Process):
    def __init__(self, root: str, id: int, zk):
        super().__init__()
        self.url = f'{root}/{id}'
        self.root = root
        self.id = id
        self.zk = zk
             
    def run(self):
        def watch_myself(data, stat):
            if data != ACTION_DEAD:
                if(stat.version == 1):
                    sleep(1)
                if stat.version != 0:
                    print(f'Клиент {self.id} получил решение от координатора {data.decode()}')
            else:
                print(f'Клиент {self.id} знает, что один из участников вышел')
            
        self.zk.start()
        
        value = ACTION_COMMIT if random.random() > 0.5 else ACTION_ROLLBACK
        print(f'Клиент {self.id} просит {value.decode()}')
        self.zk.create(self.url, value, ephemeral=True)
        print(f'Клиент {self.id} создан')
        datawatcher = DataWatch(self.zk, self.url, watch_myself)
        
        sleep(WAIT_HARD_WORK_SEC)
        self.zk.stop()
        print(f'Клиент {self.id} отключился')
        self.zk.close()


class Coordinator:

    timer: Optional[threading.Timer] = None

    @staticmethod
    def main(number_of_clients = COUNT_CLIENTS, duration = DURATION):
        Coordinator.session_logs = [False] * number_of_clients
        coordinator = KazooClient()
        coordinator.start()

        if coordinator.exists('/task_2'):
            coordinator.delete('/task_2', recursive=True)

        coordinator.create('/task_2')
        coordinator.create('/task_2/transaction')

        Coordinator.timer = None
        
        def select_action():
            tr_clients = coordinator.get_children('/task_2/transaction')
            commit_counter = 0
            abort_counter = 0
            for client in tr_clients:
                commit_counter += int(coordinator.get(f'/task_2/transaction/{client}')[0] == ACTION_COMMIT)
                abort_counter +=  int(coordinator.get(f'/task_2/transaction/{client}')[0] == ACTION_ROLLBACK)

            final_action = ACTION_COMMIT if commit_counter == number_of_clients else ACTION_ROLLBACK
            for client in tr_clients:
                coordinator.set(f'/task_2/transaction/{client}', final_action)
                
        def check_clients():
            tr_clients = coordinator.get_children('/task_2/transaction')
            for i in range(len(Coordinator.session_logs)):
                if Coordinator.session_logs[i] is True and str(i) not in tr_clients:
                    print("Кто-то вышел, все уведомлены")
                    Coordinator.timer.cancel()
                    for client in tr_clients:
                        coordinator.set(f'/task_2/transaction/{client}', ACTION_DEAD)
                    sleep(0.5)
                    for client in tr_clients:
                        zks[int(client)].stop()
                        zks[int(client)].close()
                        processes[int(client)].kill()
                    sys.exit()

        @coordinator.ChildrenWatch('/task_2/transaction')
        def watch_clients(clients):
            for client in clients:
                Coordinator.session_logs[int(client)] = True
                
            if len(clients) == 0:
                if Coordinator.timer is not None:
                    Coordinator.timer.cancel()
            else:
                if Coordinator.timer is not None:
                    Coordinator.timer.cancel()
                Coordinator.timer = threading.Timer(duration, check_clients)
                Coordinator.timer.daemon = True
                Coordinator.timer.start()

            if len(clients) < number_of_clients:
                print(f'Ожидание остальных. Клиенты={clients}')
            elif len(clients) == number_of_clients:
                select_action()

        root = '/task_2/transaction'
        
        for i in range(number_of_clients):
            zks.append(KazooClient())
            process = Client(root, i, zks[-1])
            processes.append(process)
            process.start()
            sleep(5)

In [9]:
Coordinator.main()

Waiting for the others. clients=[]
Client 0 request rollback
Client 0 create
Client 0 triggered 0
Client 0 run
Waiting for the others. clients=['0']
Client 1 request rollback
Client 1 create
Client 1 triggered 0
Client 1 run
Waiting for the others. clients=['0', '1']
Client 2 request rollback
Client 2 create
Client 2 triggered 0
Client 2 run
Client 0 triggered 1Client 1 triggered 1

Client 0 do rollbackClient 2 triggered 1Client 1 do rollback


Client 1 runClient 0 run
Client 2 do rollback

Client 2 run
Client 0 stop
Waiting for the others. clients=['1', '2']
Client 1 exitClient 2 exit

Switching off
Waiting for the others. clients=[]
