In [1]:
import logging
import threading
import random
import sys

from multiprocessing import Process
from time import sleep
from typing import Optional

logging.basicConfig()

from kazoo.client import KazooClient
from kazoo.recipe.watchers import DataWatch

ACTION_COMMIT =   b'commit'
ACTION_ROLLBACK = b'rollback'
ACTION_DEAD = b'kill_yourself'

WAIT_HARD_WORK_SECONDS = 15
zk_list = list()
p_list = list()

In [2]:
class Client(Process):
    def __init__(self, root: str, id: int, zk):
        super().__init__()
        self.url = f'{root}/{id}'
        self.root = root
        self.id = id
        self.zk = zk
             
    def run(self):
        #@zk.DataWatch(self.url)
        def watch_myself(data, stat):
            if data == ACTION_DEAD:
                print(f'Client {self.id} exit')
            else:
                if(stat.version == 1):
                    sleep(1)
                print(f'Client {self.id} triggered {stat.version}')
                if stat.version != 0:
                    print(f'Client {self.id} do {data.decode()}')
                print(f'Client {self.id} run')
            
        self.zk.start()
        
        value = ACTION_COMMIT if random.random() > 0.5 else ACTION_ROLLBACK
        print(f'Client {self.id} request {value.decode()}')
        self.zk.create(self.url, value, ephemeral=True)
        print(f'Client {self.id} create')
        datawatcher = DataWatch(self.zk, self.url, watch_myself)
        
        sleep(WAIT_HARD_WORK_SECONDS)
        self.zk.stop()
        print(f'Client {self.id} stop')
        self.zk.close()


class Coordinator:

    timer: Optional[threading.Timer] = None

    @staticmethod
    def main(number_of_clients = 3, duration = 0.5):
        Coordinator.session_logs = [False] * number_of_clients # Храним, заходил ли уже клиент
        coordinator = KazooClient()
        coordinator.start()

        if coordinator.exists('/task_2'):
            coordinator.delete('/task_2', recursive=True)

        coordinator.create('/task_2')
        coordinator.create('/task_2/transaction')

        Coordinator.timer = None
        
        def make_decision():
            print("check_clients()")
            tr_clients = coordinator.get_children('/task_2/transaction')
            commit_counter = 0
            abort_counter = 0
            for client in tr_clients:
                commit_counter += int(coordinator.get(f'/task_2/transaction/{client}')[0] == ACTION_COMMIT)
                abort_counter +=  int(coordinator.get(f'/task_2/transaction/{client}')[0] == ACTION_ROLLBACK)

            # Принимает commit только единогласно
            final_action = ACTION_COMMIT if commit_counter == number_of_clients else ACTION_ROLLBACK
            for client in tr_clients:
                coordinator.set(f'/task_2/transaction/{client}', final_action) # Рассылаем результат
                
        def true_check_clients():
            tr_clients = coordinator.get_children('/task_2/transaction')
            for i in range(len(Coordinator.session_logs)):
                if Coordinator.session_logs[i] is True and str(i) not in tr_clients:
                    print("Один отключился, отключайся!!!")
                    Coordinator.timer.cancel()
                    for client in tr_clients:
                        coordinator.set(f'/task_2/transaction/{client}', ACTION_DEAD)
                    sleep(0.5)
                    for client in tr_clients:
                        zk_list[int(client)].stop()
                        zk_list[int(client)].close()
                        p_list[int(client)].kill()
                    sys.exit()

        @coordinator.ChildrenWatch('/task_2/transaction')
        def watch_clients(clients):
            for client in clients:
                Coordinator.session_logs[int(client)] = True
            print(Coordinator.session_logs)
                
            if len(clients) == 0:
                if Coordinator.timer is not None:
                    Coordinator.timer.cancel()
            else:
                if Coordinator.timer is not None:
                    Coordinator.timer.cancel()
                Coordinator.timer = threading.Timer(duration, true_check_clients) # Проверяем, не отключился ли клиент
                Coordinator.timer.daemon = True
                Coordinator.timer.start()

            if len(clients) < number_of_clients:
                print(f'Waiting for the others. clients={clients}')
            elif len(clients) == number_of_clients:
                print(f'Check clients')
                make_decision()

        root = '/task_2/transaction'
        
        for i in range(number_of_clients):
            zk_list.append(KazooClient())
            p = Client(root, i, zk_list[-1])
            p_list.append(p)
            p.start()
            sleep(5)

In [3]:
Coordinator.main()

[False, False, False]
Waiting for the others. clients=[]
Client 0 request commit
Client 0 create
Client 0 triggered 0
Client 0 run
[True, False, False]
Waiting for the others. clients=['0']
Client 1 request rollback
Client 1 create
Client 1 triggered 0
Client 1 run
[True, True, False]
Waiting for the others. clients=['0', '1']
Client 2 request rollback
Client 2 create
Client 2 triggered 0
Client 2 run
[True, True, True]
Check clients
check_clients()
Client 0 triggered 1Client 1 triggered 1Client 2 triggered 1


Client 0 do rollbackClient 1 do rollbackClient 2 do rollback


Client 0 runClient 1 runClient 2 run


Client 0 stop
[True, True, True]
Waiting for the others. clients=['1', '2']
Client 1 exitClient 2 exit

Один отключился, отключайся!!!
[True, True, True]
Waiting for the others. clients=[]
