# Homework

#### Tier 2 - distributed system for storing artifacts, based on a distributed file system

In [1]:
import ray
import numpy as np
import warnings
from string import ascii_lowercase, digits
from random import randint

# actor classes and consts imports
from artifact import Artifact
from name_node import NameNode
from data_node import DataNode
from consts import *

warnings.filterwarnings('ignore', category=UserWarning)

if ray.is_initialized:
    ray.shutdown()

ray.init(address='ray://localhost:10001', ignore_reinit_error=True)

2025-05-12 15:08:53,478	INFO util.py:154 -- Outdated packages:
  ipywidgets==7.6.5 found, needs ipywidgets>=8
Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2025-05-12 15:08:54,403	INFO client_builder.py:244 -- Passing the following kwargs to ray.init() on the server: ignore_reinit_error, log_to_driver


0,1
Python version:,3.9.18
Ray version:,2.44.1
Dashboard:,http://172.63.0.2:8265


### Helper functions - creating artifact & merging chunks

In [2]:
# creating an artifact with generated data
def create_artifact(artifact_id):
    chars = list(ascii_lowercase + digits)
    data_len = randint(MIN_LEN, MAX_LEN)
    artifact_data = np.random.choice(chars, size=data_len)
    artifact = Artifact.remote(artifact_id, artifact_data)
    return artifact

# merging split artifact data chunks to retrieve artifact
def merge_chunks(chunks):
    data = ""

    for chunk in chunks:
        data += "".join(chunk)
    
    return data

### Client class

In [3]:
# client class
class Client(object):
    def __init__(self, name_node, data_nodes):
        self.name_node = name_node
        self.data_nodes = data_nodes

    
    def add_artifact(self, new_artifact):
        # get id of the artifact to be added
        artifact_id = new_artifact.get_id.remote()

        # divide artifact into chunks
        chunks = ray.get(new_artifact.split_into_chunks.remote())

        # name node suggests data nodes for chunks to be written in
        data_nodes_chunks = ray.get(self.name_node.distribute_new_artifact_to_data_nodes.remote(artifact_id))

        # save respective artifact chunks in suggested data nodes
        for (data_node_idx, chunk_idx) in data_nodes_chunks:
            node = self.data_nodes[data_node_idx]
            chunk = chunks[chunk_idx]
            node.add_chunk.remote(artifact_id, chunk_idx+1, chunk)
            self.fix_node(node)

    
    def update_artifact(self, updated_artifact_chunks, update_flags, artifact_id):
        for chunk_idx, (chunk, flag) in enumerate(zip(updated_artifact_chunks, update_flags)):
            # update artifact chunk only if it was changed
            if flag:
                data_nodes_idxs = ray.get(ray.get(self.name_node.get_data_nodes_for_chunk.remote(artifact_id, chunk_idx)))

                for data_node_idx in data_nodes_idxs:
                    node = self.data_nodes[data_node_idx]
                    node.update_chunk.remote(artifact_id, chunk_idx+1, chunk)
                    self.fix_node(node)

    
    def delete_artifact(self, artifact_id):
        # get info about nodes that store chunks of artifact to be deleted 
        data_nodes_chunks = ray.get(self.name_node.delete_distributed_artifact.remote(artifact_id))
        
        for (data_node_idx, chunk_idx) in data_nodes_chunks:
            node = self.data_nodes[data_node_idx]
            node.delete_chunk.remote(artifact_id, chunk_idx+1)
            self.fix_node(node)


    def get_artifact_chunks(self, artifact_id):
        artifact_chunks = []
        
        for chunk_idx in range(CHUNKS):
            # get info about nodes that store artifact chunk
            data_nodes_idxs = ray.get(ray.get(self.name_node.get_data_nodes_for_chunk.remote(artifact_id, chunk_idx)))

            # we need only one node
            node = self.data_nodes[data_nodes_idxs[0]]
            
            chunk = ray.get(node.get_chunk.remote(artifact_id, chunk_idx+1))
            artifact_chunks.append(chunk)
            self.fix_node(node)

        return artifact_chunks


    def fix_node(self, node):
        if ray.get(node.get_state.remote()) == FAULTY:
            node.fix.remote()


    def list_name_node(self):
        print(ray.get(self.name_node.list_chunks.remote()))


    def list_data_nodes(self):
        for node in self.data_nodes:
            print(ray.get(node.list_chunks.remote()))

### Preparing artifacts and initializing system

In [4]:
# nodes and artifacts creation
data_nodes = [DataNode.remote(f"DN{i+1}", i) for i in range(NODES)]
name_node = NameNode.remote("NN1", data_nodes)
init_artifacts = [create_artifact(f"A{i+1}") for i in range(3)]

# create a client object
client = Client(name_node, data_nodes)

# print tracked chunks for name node
client.list_name_node()

# list chunks for data nodes
client.list_data_nodes()

--- Data chunks that are tracked in name node 'NN1' ---


--- Data chunks that are stored in data node 'DN1', state: READY ---


--- Data chunks that are stored in data node 'DN2', state: READY ---


--- Data chunks that are stored in data node 'DN3', state: READY ---


--- Data chunks that are stored in data node 'DN4', state: READY ---


--- Data chunks that are stored in data node 'DN5', state: READY ---


--- Data chunks that are stored in data node 'DN6', state: READY ---


--- Data chunks that are stored in data node 'DN7', state: READY ---


--- Data chunks that are stored in data node 'DN8', state: READY ---




As expected (for now), the output is "empty" - there are no artifacts saved in the system yet.

## Testing different features

---

### Adding artifacts

In [5]:
# let's see what changes after adding each artifact, one by one
client.add_artifact(init_artifacts[0])
client.list_name_node()
print()
client.list_data_nodes()

--- Data chunks that are tracked in name node 'NN1' ---

Artifact A1:
Chunk 1:  DN1 DN2 DN3
Chunk 2:  DN4 DN5 DN6
Chunk 3:  DN7 DN8 DN1



--- Data chunks that are stored in data node 'DN1', state: READY ---
Artifact A1, chunk 1 out of 3
Artifact A1, chunk 3 out of 3


--- Data chunks that are stored in data node 'DN2', state: READY ---
Artifact A1, chunk 1 out of 3


--- Data chunks that are stored in data node 'DN3', state: READY ---
Artifact A1, chunk 1 out of 3


--- Data chunks that are stored in data node 'DN4', state: READY ---
Artifact A1, chunk 2 out of 3


--- Data chunks that are stored in data node 'DN5', state: READY ---
Artifact A1, chunk 2 out of 3


--- Data chunks that are stored in data node 'DN6', state: READY ---
Artifact A1, chunk 2 out of 3


--- Data chunks that are stored in data node 'DN7', state: READY ---
Artifact A1, chunk 3 out of 3


--- Data chunks that are stored in data node 'DN8', state: READY ---
Artifact A1, chunk 3 out of 3




In [6]:
client.add_artifact(init_artifacts[1])
client.list_name_node()
print()
client.list_data_nodes()

--- Data chunks that are tracked in name node 'NN1' ---

Artifact A1:
Chunk 1:  DN1 DN2 DN3
Chunk 2:  DN4 DN5 DN6
Chunk 3:  DN7 DN8 DN1

Artifact A2:
Chunk 1:  DN2 DN3 DN4
Chunk 2:  DN5 DN6 DN7
Chunk 3:  DN8 DN1 DN2



--- Data chunks that are stored in data node 'DN1', state: READY ---
Artifact A1, chunk 1 out of 3
Artifact A1, chunk 3 out of 3
Artifact A2, chunk 3 out of 3


--- Data chunks that are stored in data node 'DN2', state: READY ---
Artifact A1, chunk 1 out of 3
Artifact A2, chunk 1 out of 3
Artifact A2, chunk 3 out of 3


--- Data chunks that are stored in data node 'DN3', state: READY ---
Artifact A1, chunk 1 out of 3
Artifact A2, chunk 1 out of 3


--- Data chunks that are stored in data node 'DN4', state: READY ---
Artifact A1, chunk 2 out of 3
Artifact A2, chunk 1 out of 3


--- Data chunks that are stored in data node 'DN5', state: READY ---
Artifact A1, chunk 2 out of 3
Artifact A2, chunk 2 out of 3


--- Data chunks that are stored in data node 'DN6', state: READY -

In [7]:
client.add_artifact(init_artifacts[2])
client.list_name_node()
print()
client.list_data_nodes()

--- Data chunks that are tracked in name node 'NN1' ---

Artifact A1:
Chunk 1:  DN1 DN2 DN3
Chunk 2:  DN4 DN5 DN6
Chunk 3:  DN7 DN8 DN1

Artifact A2:
Chunk 1:  DN2 DN3 DN4
Chunk 2:  DN5 DN6 DN7
Chunk 3:  DN8 DN1 DN2

Artifact A3:
Chunk 1:  DN3 DN4 DN5
Chunk 2:  DN6 DN7 DN8
Chunk 3:  DN1 DN2 DN3



--- Data chunks that are stored in data node 'DN1', state: READY ---
Artifact A1, chunk 1 out of 3
Artifact A1, chunk 3 out of 3
Artifact A2, chunk 3 out of 3
Artifact A3, chunk 3 out of 3


--- Data chunks that are stored in data node 'DN2', state: READY ---
Artifact A1, chunk 1 out of 3
Artifact A2, chunk 1 out of 3
Artifact A2, chunk 3 out of 3
Artifact A3, chunk 3 out of 3


--- Data chunks that are stored in data node 'DN3', state: READY ---
Artifact A1, chunk 1 out of 3
Artifact A2, chunk 1 out of 3
Artifact A3, chunk 1 out of 3
Artifact A3, chunk 3 out of 3


--- Data chunks that are stored in data node 'DN4', state: READY ---
Artifact A1, chunk 2 out of 3
Artifact A2, chunk 1 out of 3

### Getting an artifact & updating it

In [8]:
# before applying changes, retrieve artifact data
artifact_chunks = client.get_artifact_chunks("A2")
artifact_data = merge_chunks(artifact_chunks)
print(f"Data for artifact 'A2' before update: {artifact_data[:15]}...{artifact_data[-15:]}")

# let's update only 1st and 3rd chunk
artifact_chunks[0] = np.hstack(("Wojciech", artifact_chunks[0][8:]))
artifact_chunks[2] = np.hstack((artifact_chunks[2][:-8], "Michaluk"))
update_flags = [True, False, True]
client.update_artifact(artifact_chunks, update_flags, "A2")

# let's see what changes
artifact_chunks = client.get_artifact_chunks("A2")
artifact_data = merge_chunks(artifact_chunks)
print(f"Data for artifact 'A2' after update: {artifact_data[:15]}...{artifact_data[-15:]}")

Data for artifact 'A2' before update: r0ehk2pprhxe9b8...r9rut34uipayewy
Data for artifact 'A2' after update: Wojciechrhxe9b8...r9rut34Michaluk


In [9]:
# nodes distribution should stay the same
client.list_name_node()
print()
client.list_data_nodes()

--- Data chunks that are tracked in name node 'NN1' ---

Artifact A1:
Chunk 1:  DN1 DN2 DN3
Chunk 2:  DN4 DN5 DN6
Chunk 3:  DN7 DN8 DN1

Artifact A2:
Chunk 1:  DN2 DN3 DN4
Chunk 2:  DN5 DN6 DN7
Chunk 3:  DN8 DN1 DN2

Artifact A3:
Chunk 1:  DN3 DN4 DN5
Chunk 2:  DN6 DN7 DN8
Chunk 3:  DN1 DN2 DN3



--- Data chunks that are stored in data node 'DN1', state: READY ---
Artifact A1, chunk 1 out of 3
Artifact A1, chunk 3 out of 3
Artifact A2, chunk 3 out of 3
Artifact A3, chunk 3 out of 3


--- Data chunks that are stored in data node 'DN2', state: READY ---
Artifact A1, chunk 1 out of 3
Artifact A2, chunk 1 out of 3
Artifact A2, chunk 3 out of 3
Artifact A3, chunk 3 out of 3


--- Data chunks that are stored in data node 'DN3', state: READY ---
Artifact A1, chunk 1 out of 3
Artifact A2, chunk 1 out of 3
Artifact A3, chunk 1 out of 3
Artifact A3, chunk 3 out of 3


--- Data chunks that are stored in data node 'DN4', state: READY ---
Artifact A1, chunk 2 out of 3
Artifact A2, chunk 1 out of 3

### Deleting artifacts

In [10]:
# delete one artifact
client.delete_artifact("A1")

# let's see what changes
client.list_name_node()
print()
client.list_data_nodes()

--- Data chunks that are tracked in name node 'NN1' ---

Artifact A2:
Chunk 1:  DN2 DN3 DN4
Chunk 2:  DN5 DN6 DN7
Chunk 3:  DN8 DN1 DN2

Artifact A3:
Chunk 1:  DN3 DN4 DN5
Chunk 2:  DN6 DN7 DN8
Chunk 3:  DN1 DN2 DN3



--- Data chunks that are stored in data node 'DN1', state: READY ---
Artifact A2, chunk 3 out of 3
Artifact A3, chunk 3 out of 3


--- Data chunks that are stored in data node 'DN2', state: READY ---
Artifact A2, chunk 1 out of 3
Artifact A2, chunk 3 out of 3
Artifact A3, chunk 3 out of 3


--- Data chunks that are stored in data node 'DN3', state: READY ---
Artifact A2, chunk 1 out of 3
Artifact A3, chunk 1 out of 3
Artifact A3, chunk 3 out of 3


--- Data chunks that are stored in data node 'DN4', state: READY ---
Artifact A2, chunk 1 out of 3
Artifact A3, chunk 1 out of 3


--- Data chunks that are stored in data node 'DN5', state: READY ---
Artifact A2, chunk 2 out of 3
Artifact A3, chunk 1 out of 3


--- Data chunks that are stored in data node 'DN6', state: READY -

In [11]:
# delete another artifact
client.delete_artifact("A2")

# let's see what changes
client.list_name_node()
print()
client.list_data_nodes()

--- Data chunks that are tracked in name node 'NN1' ---

Artifact A3:
Chunk 1:  DN3 DN4 DN5
Chunk 2:  DN6 DN7 DN8
Chunk 3:  DN1 DN2 DN3



--- Data chunks that are stored in data node 'DN1', state: READY ---
Artifact A3, chunk 3 out of 3


--- Data chunks that are stored in data node 'DN2', state: READY ---
Artifact A3, chunk 3 out of 3


--- Data chunks that are stored in data node 'DN3', state: READY ---
Artifact A3, chunk 1 out of 3
Artifact A3, chunk 3 out of 3


--- Data chunks that are stored in data node 'DN4', state: READY ---
Artifact A3, chunk 1 out of 3


--- Data chunks that are stored in data node 'DN5', state: READY ---
Artifact A3, chunk 1 out of 3


--- Data chunks that are stored in data node 'DN6', state: READY ---
Artifact A3, chunk 2 out of 3


--- Data chunks that are stored in data node 'DN7', state: READY ---
Artifact A3, chunk 2 out of 3


--- Data chunks that are stored in data node 'DN8', state: READY ---
Artifact A3, chunk 2 out of 3




### The End

In [12]:
# clean up the environment
ray.shutdown()