# Hazelcast HW



## 1. Встановити і налаштувати Hazelcast

## 2. Сконфігурувати і запустити 3 ноди (інстанси) об'єднані в кластер або як частину Java-застосування, або як окремі застосування

I've done it using three Docker instances.
```
docker run -it --name first-hazelcast-node  --rm -e HZ_NETWORK_PUBLICADDRESS=172.17.0.1:5701 -e HZ_CLUSTERNAME=hazelcast-cluster -p 5701:5701 hazelcast/hazelcast:5.3.6
docker run -it --name second-hazelcast-node --rm -e HZ_NETWORK_PUBLICADDRESS=172.17.0.1:5702 -e HZ_CLUSTERNAME=hazelcast-cluster -p 5702:5701 hazelcast/hazelcast:5.3.6
docker run -it --name third-hazelcast-node  --rm -e HZ_NETWORK_PUBLICADDRESS=172.17.0.1:5703 -e HZ_CLUSTERNAME=hazelcast-cluster -p 5703:5701 hazelcast/hazelcast:5.3.6
```

you can run them simultaneously using ```run.sh```

## 3. Продемонструйте роботу Distributed Map

### Використовуючи API створіть Distributed Map + запишіть в неї 1000 значень з ключем від 0 до 1к

In [1]:
import hazelcast
from tqdm import tqdm

client = hazelcast.HazelcastClient(
    cluster_name="hazelcast-cluster",
    cluster_members=[
        "172.17.0.1:5701",
        "172.17.0.1:5702",
        "172.17.0.1:5703",
    ],
    lifecycle_listeners=[
        lambda state: print("Lifecycle event >>>", state),
    ]
)
# Step 1: Create a Distributed Map
distributed_map = client.get_map("my-distributed-map").blocking()

# Step 2: Write 1000 values to the Distributed Map
for i in tqdm(range(1000)):
    distributed_map.put(i, f"value-{i}")

client.shutdown()

Lifecycle event >>> STARTING
Lifecycle event >>> STARTED
Lifecycle event >>> CONNECTED


100%|██████████| 1000/1000 [00:00<00:00, 1899.77it/s]

Lifecycle event >>> SHUTTING_DOWN
Lifecycle event >>> DISCONNECTED
Lifecycle event >>> SHUTDOWN





### За допомогою Management Center подивиться на розподіл значень по нодах
[image  ]

### Як зміниться розподіл даних по нодах

- якщо відключити одну ноду

Дані перерозподіляться.
[image]

- відключити дві ноди.

Якщо зробити це по черзі, то дані збережуться, якщо одночасно, то частина даних втратиться.
[image]

- Чи буде втрата даних?

Якщо багато вузлів відключаться одночасно, не буде з чого відновити дані.

- Яким чином зробити щоб не було втрати даних?

Щоб уникнути втрати даних, треба, щоб кластер мав достатню кількість резервних вузлів.
Це гарантує, що навіть якщо один або декілька вузлів вийде з ладу, дані все одно можна буде відновити з резервних.

## 4. Продемонструйте роботу Distributed Map with locks

In [2]:
# helpers for testing the locking

import multiprocessing
from hazelcast import HazelcastClient
from tqdm import tqdm

verbose = False

# =============================================================================

def increment_value():
    client = HazelcastClient(
        cluster_name="hazelcast-cluster",
        cluster_members=[
            "172.17.0.1:5701",
            "172.17.0.1:5702",
            "172.17.0.1:5703",
        ],
        lifecycle_listeners=[
            lambda state: print("Lifecycle event >>>", state) if verbose else None,
        ]
    )
    map = client.get_map("my-distributed-map").blocking()

    for _ in tqdm(range(10000), disable=(not verbose)):
        value = map.get("key")
        value += 1
        map.put("key", value)
    if verbose:
        print(map.get("key"))
    return map.get("key").__str__()

def increment_value_pl(): # pl = pesimistic lock
    client = HazelcastClient(
        cluster_name="hazelcast-cluster",
        cluster_members=[
            "172.17.0.1:5701",
            "172.17.0.1:5702",
            "172.17.0.1:5703",
        ],
        lifecycle_listeners=[
            lambda state: print("Lifecycle event >>>", state) if verbose else None,
        ]
    )
    map = client.get_map("my-distributed-map").blocking()
    last_value = 0
    for _ in tqdm(range(10000), disable=(not verbose)):
        map.lock("key")
        try:
            value = map.get("key")
            value += 1
            map.put("key", value)
            last_value = value
        finally:
            map.unlock("key")
    if verbose:
        print(last_value)
    return last_value
    
def increment_value_ol(): # ol = optimistic lock
    client = HazelcastClient(
        cluster_name="hazelcast-cluster",
        cluster_members=[
            "172.17.0.1:5701",
            "172.17.0.1:5702",
            "172.17.0.1:5703",
        ],
        lifecycle_listeners=[
            lambda state: print("Lifecycle event >>>", state) if verbose else None,
        ]
    )
    map = client.get_map("my-distributed-map").blocking()
    last_value = 0
    
    for _ in tqdm(range(10000), disable=(not verbose)):
        while True:
            value = map.get("key")
            value += 1
            last_value = value
            if map.replace_if_same("key", value - 1, value):
                break
    if verbose:
        print(last_value)
    return last_value

# =============================================================================

def test_func(func, num_processes=3):
    processes = []
    for _ in range(num_processes):
        p = multiprocessing.Process(target=func)
        p.start()
        processes.append(p)
    for p in processes:
        p.join()

def test_increment_value():
    test_func(increment_value)

def test_increment_value_pl():
    test_func(increment_value_pl)
    
def test_increment_value_ol():
    test_func(increment_value_ol)

In [3]:
client = hazelcast.HazelcastClient(
    cluster_name="hazelcast-cluster",
    cluster_members=[
        "172.17.0.1:5701",
        "172.17.0.1:5702",
        "172.17.0.1:5703",
    ],
    lifecycle_listeners=[
        lambda state: print("Lifecycle event >>>", state),
    ]
)
def test_prep():
    client.get_map("my-distributed-map").blocking().clear()
    my_map = client.get_map("my-distributed-map")
    my_map.put_if_absent("key", 0)
    return my_map

my_map = test_prep()
test_increment_value()
print("Test 1, unlocked increment finished. Final key value:", my_map.get("key").result())

my_map = test_prep()
test_increment_value_pl()
print("Test 2, pessimistic locking increment finished. Final key value:", my_map.get("key").result())

my_map = test_prep()
test_increment_value_ol()
print("Test 3, optimistic locking increment finished. Final key value:", my_map.get("key").result())

client.shutdown()

Lifecycle event >>> STARTING
Lifecycle event >>> STARTED
Lifecycle event >>> CONNECTED
Test 1, unlocked increment finished. Final key value: 11963
Test 2, pessimistic locking increment finished. Final key value: 30000
Test 3, optimistic locking increment finished. Final key value: 30000
Lifecycle event >>> SHUTTING_DOWN
Lifecycle event >>> DISCONNECTED
Lifecycle event >>> SHUTDOWN


### Порівняйте результати кожного з запусків
В реалізації без блокувань спостерігається втрата даних, а у реалізаціях з песимістичним та оптимістичним блокуванням є однакові результати, як і очікувано.

```
Test 1, unlocked increment finished. Final key value: ~13000
Test 2, pessimistic locking increment finished. Final key value: 30000
Test 3, optimistic locking increment finished. Final key value: 30000
```

## 5. Робота з Bounded queue

In [4]:
# # import threading, hazelcast
# # client = hazelcast.HazelcastClient(
# #     cluster_name="hazelcast-cluster",
# #     cluster_members=[
# #         "172.17.0.1:5701",
# #         "172.17.0.1:5702",
# #         "172.17.0.1:5703",
# #     ],
# #     lifecycle_listeners=[
# #         lambda state: print("Lifecycle event >>>", state),
# #     ]
# # )

# # queue = client.get_queue("queue") 

# # def produce():
# #     for i in range(100):
# #         queue.offer(i)


# # producer_thread = threading.Thread(target=produce)

# # producer_thread.start()

# # producer_thread.join()


# # def consume():
# #     consumed_count = 0
# #     while consumed_count < 50: 
# #         head = queue.take().result()
# #         print("Consuming {}".format(head))
# #         consumed_count += 1


# # consumer_thread = threading.Thread(target=consume)

# # consumer_thread.start()

# # second_consumer_thread = threading.Thread(target=consume)

# # second_consumer_thread.start()


# # consumer_thread.join()
# # second_consumer_thread.join()

# # client.shutdown()

# import hazelcast
# import concurrent.futures

# def producer():
#     # Connect to Hazelcast cluster
#     client = hazelcast.HazelcastClient(cluster_name="hazelcast-cluster",
#     cluster_members=[
#         "172.17.0.1:5701",
#         "172.17.0.1:5702",
#         "172.17.0.1:5703",
#     ],
#     lifecycle_listeners=[
#         lambda state: print("Prod Lifecycle event >>>", state),
#     ])

#     # Create or get the bounded queue
#     queue = client.get_queue("bounded-queue")

#     # Write values 1 to 100 to the queue
#     for i in range(1, 101):
#         queue.put(i)

#     # Close the client connection
#     client.shutdown()

# def consumer(client_id):
#     # Connect to Hazelcast cluster
#     client = hazelcast.HazelcastClient(cluster_name="hazelcast-cluster",
#     cluster_members=[
#         "172.17.0.1:5701",
#         "172.17.0.1:5702",
#         "172.17.0.1:5703",
#     ],
#     lifecycle_listeners=[
#         lambda state: print("Cons Lifecycle event >>>", state),
#     ])

#     # Create or get the bounded queue
#     queue = client.get_queue("bounded-queue")

#     # Read values from the queue
#     for _ in range(150):
#         try:
#             value = queue.take()
#             print(f"Client {client_id} - Received: {value.result()}")
#         except hazelcast.errors.HazelcastQueueError:
#             print(f"Client {client_id} - Queue is empty. Waiting for items...")

# # Run producer and consumers simultaneously
# with concurrent.futures.ThreadPoolExecutor(max_workers=3) as executor:
#     # Start producer
#     producer_future = executor.submit(producer)

#     # Start two consumers
#     consumer_futures = [executor.submit(consumer, i) for i in range(1, 3)]

#     # Wait for all tasks to complete
#     concurrent.futures.wait([producer_future] + consumer_futures)
import hazelcast

def main():
    client = hazelcast.HazelcastClient(cluster_name="hazelcast-cluster",
    cluster_members=[
        "172.17.0.1:5701"
    ],
    lifecycle_listeners=[
        lambda state: print("P Lifecycle event >>>", state),
    ])
    queue = client.get_queue("boundedQueue")

    for i in range(1, 101):
        queue.put(i)

    client.shutdown()
main()
# if __name__ == "__main__":
    # main()


P Lifecycle event >>> STARTING
P Lifecycle event >>> STARTED
P Lifecycle event >>> CONNECTED
P Lifecycle event >>> SHUTTING_DOWN
P Lifecycle event >>> DISCONNECTED
P Lifecycle event >>> SHUTDOWN
