In [1]:
import ray
import os
import random
from collections import defaultdict

In [2]:
### CONSTANTS ###
NUM_STORAGE_NODES = 4
NUM_COPIES = 2
CHUNK_SIZE = 10

In [3]:
@ray.remote
class StorageNode:
     def __init__(self) -> None:
          self.chunks = defaultdict(dict)
     
     def save_chunk(self, artifact_name, chunk_id, chunk):
          self.chunks[artifact_name][chunk_id] = chunk
          
     def modify_chunk(self, artifact_name, chunk_id, new_chunk):
          if chunk_id in self.chunks[artifact_name]:
              self.chunks[artifact_name][chunk_id] = new_chunk
          else:
              print("That node does not store this chunk!")
          
     def remove_chunk(self, artifact_name, chunk_id):
          if chunk_id in self.chunks[artifact_name]:
              del self.chunks[artifact_name][chunk_id]
          else:
              print("That node does not store this chunk!")
               
     def remove_artifact(self, artifact_name):
          del self.chunks[artifact_name]
               
     def get_chunk(self, artifact_name, chunk_id):
          return self.chunks[artifact_name].get(chunk_id)

     def list_chunks(self):
          return os.getpid(), dict(self.chunks)

In [4]:
@ray.remote
class NameNode:
     def __init__(self) -> None:
          self.storage_nodes = {storage_id : StorageNode.remote() for storage_id in range(NUM_STORAGE_NODES)}
          self.artifacts = {}
          
     def upload(self, name, content):
          if name in self.artifacts:
               print("Can not upload existed element!")
               return -1
          
          self.artifacts[name] = {}

          chunks = self.split_content(content)
          for chunk_id, chunk in enumerate(chunks):
               self.artifacts[name][chunk_id] = []
               self._store_chunks_in_random_storage_nodes(name, chunk_id, chunk)
                    
     def update(self, name, content):
          if name not in self.artifacts:
               print("Can not update non existed artifact!")
               return -1
          
          artifact = self.artifacts[name]
          
          chunks = self.split_content(content)
          for chunk_id, chunk in enumerate(chunks):
               if chunk_id not in artifact:
                    artifact[chunk_id] = []
                    self._store_chunks_in_random_storage_nodes(name, chunk_id, chunk)
               else:
                    for storage_node_id in artifact[chunk_id]:
                         storage_node = self.storage_nodes[storage_node_id]
                         storage_node.modify_chunk.remote(name, chunk_id, chunk)
          
          
          chunks_to_remove = []
          for chunk_id in artifact:
               if chunk_id >= len(chunks):
                    chunks_to_remove.append(chunk_id)
                    
          for chunk_id in chunks_to_remove:
               for storage_node_id in artifact[chunk_id]:
                    storage_node = self.storage_nodes[storage_node_id]
                    storage_node.remove_chunk.remote(name, chunk_id)
               artifact.pop(chunk_id)
               
     def delete(self, name):
          for storage_node in self.storage_nodes.values():
               storage_node.remove_artifact.remote(name)
          if name in self.artifacts:
               self.artifacts.pop(name)
               
     def get(self, name):
          if name in self.artifacts:
               artifact = self.artifacts[name]
               result = ""
               for chunk_id, storages in artifact.items():
                    storage_node = self.storage_nodes[storages[0]]
                    result += ray.get(storage_node.get_chunk.remote(name, chunk_id))
               return result
          
     def split_content(self, content):
          return [content[i:i+CHUNK_SIZE] for i in range(0, len(content), CHUNK_SIZE)]

     def get_nodes_status(self):
          result = []
          for storage_node in self.storage_nodes.values():
               result.append(storage_node.list_chunks.remote())
          result = ray.get(result)
          return result
               
     def get_cluster_status(self, storage_id):
          if storage_id in self.storage_nodes:
               return self.storage_nodes[storage_id].list_chunks.remote()
               
     def get_artifacts_list(self):
          return '\n'.join([f"{artifact_name} : {chunks}" for artifact_name, chunks in self.artifacts.items()])
     
     def _store_chunks_in_random_storage_nodes(self, name, chunk_id, chunk):
          random_storage_nodes = random.sample(sorted(self.storage_nodes.keys()), NUM_COPIES)
          for storage_id in random_storage_nodes:
               storage_node = self.storage_nodes.get(storage_id)
               storage_node.save_chunk.remote(name, chunk_id, chunk)
               self.artifacts[name][chunk_id].append(storage_id)
     
     def cancel_tasks(self):
        ray.cancel([task for storage_node in self.storage_nodes.values() for task in storage_node.list_chunks()])

In [5]:
if ray.is_initialized():
    ray.shutdown()
ray.init(address="ray://localhost:10001")

0,1
Python version:,3.10.0
Ray version:,2.10.0
Dashboard:,http://172.63.0.2:8265


In [6]:
name_node = NameNode.remote()

In [7]:
def print_status(result):
    for pid, chunks in result:
        print(f"Storage Node PID: {pid}")
        for artifact_name, artifact_chunks in chunks.items():
            print(f"  Artifact: {artifact_name}")
            for chunk_id, chunk in artifact_chunks.items():
                print(f"    Chunk ID: {chunk_id} | Chunk Content: {chunk}")


In [8]:
name_node.upload.remote("art-1", "wKWSU8umg9cL7mDG7UhM8tex11BI5dFNCfzcszpXE5PB2KGrkomaHJXAMPZ")
result = ray.get(name_node.get_nodes_status.remote())
print_status(result)

Storage Node PID: 159
  Artifact: art-1
    Chunk ID: 2 | Chunk Content: 8tex11BI5d
    Chunk ID: 3 | Chunk Content: FNCfzcszpX
    Chunk ID: 5 | Chunk Content: maHJXAMPZ
Storage Node PID: 159
  Artifact: art-1
    Chunk ID: 1 | Chunk Content: cL7mDG7UhM
    Chunk ID: 2 | Chunk Content: 8tex11BI5d
    Chunk ID: 4 | Chunk Content: E5PB2KGrko
Storage Node PID: 160
  Artifact: art-1
    Chunk ID: 0 | Chunk Content: wKWSU8umg9
    Chunk ID: 1 | Chunk Content: cL7mDG7UhM
    Chunk ID: 4 | Chunk Content: E5PB2KGrko
    Chunk ID: 5 | Chunk Content: maHJXAMPZ
Storage Node PID: 160
  Artifact: art-1
    Chunk ID: 0 | Chunk Content: wKWSU8umg9
    Chunk ID: 3 | Chunk Content: FNCfzcszpX


In [9]:
name_node.upload.remote("art-2", "ezCgKuZx8qxt55WfOdQHfdr5rvsYupf9Fakx9mdIY9q7XbqIeRyQSKCwNeiRcMi1022d9MK29UseUpxvw6eAhfSV")
result = ray.get(name_node.get_nodes_status.remote())
print_status(result)

Storage Node PID: 159
  Artifact: art-1
    Chunk ID: 2 | Chunk Content: 8tex11BI5d
    Chunk ID: 3 | Chunk Content: FNCfzcszpX
    Chunk ID: 5 | Chunk Content: maHJXAMPZ
  Artifact: art-2
    Chunk ID: 0 | Chunk Content: ezCgKuZx8q
    Chunk ID: 1 | Chunk Content: xt55WfOdQH
    Chunk ID: 2 | Chunk Content: fdr5rvsYup
    Chunk ID: 3 | Chunk Content: f9Fakx9mdI
    Chunk ID: 4 | Chunk Content: Y9q7XbqIeR
    Chunk ID: 5 | Chunk Content: yQSKCwNeiR
    Chunk ID: 6 | Chunk Content: cMi1022d9M
    Chunk ID: 8 | Chunk Content: w6eAhfSV
Storage Node PID: 159
  Artifact: art-1
    Chunk ID: 1 | Chunk Content: cL7mDG7UhM
    Chunk ID: 2 | Chunk Content: 8tex11BI5d
    Chunk ID: 4 | Chunk Content: E5PB2KGrko
  Artifact: art-2
    Chunk ID: 1 | Chunk Content: xt55WfOdQH
    Chunk ID: 4 | Chunk Content: Y9q7XbqIeR
    Chunk ID: 7 | Chunk Content: K29UseUpxv
Storage Node PID: 160
  Artifact: art-1
    Chunk ID: 0 | Chunk Content: wKWSU8umg9
    Chunk ID: 1 | Chunk Content: cL7mDG7UhM
    Chunk I

In [10]:
name_node.update.remote("art-2", "updatedArtifact2WorkingCorrectlyabcdbaksdjahdkjcdADLKJADLKJ")
result = ray.get(name_node.get_nodes_status.remote())
print_status(result)

Storage Node PID: 159
  Artifact: art-1
    Chunk ID: 2 | Chunk Content: 8tex11BI5d
    Chunk ID: 3 | Chunk Content: FNCfzcszpX
    Chunk ID: 5 | Chunk Content: maHJXAMPZ
  Artifact: art-2
    Chunk ID: 0 | Chunk Content: updatedArt
    Chunk ID: 1 | Chunk Content: ifact2Work
    Chunk ID: 2 | Chunk Content: ingCorrect
    Chunk ID: 3 | Chunk Content: lyabcdbaks
    Chunk ID: 4 | Chunk Content: djahdkjcdA
    Chunk ID: 5 | Chunk Content: DLKJADLKJ
Storage Node PID: 159
  Artifact: art-1
    Chunk ID: 1 | Chunk Content: cL7mDG7UhM
    Chunk ID: 2 | Chunk Content: 8tex11BI5d
    Chunk ID: 4 | Chunk Content: E5PB2KGrko
  Artifact: art-2
    Chunk ID: 1 | Chunk Content: ifact2Work
    Chunk ID: 4 | Chunk Content: djahdkjcdA
Storage Node PID: 160
  Artifact: art-1
    Chunk ID: 0 | Chunk Content: wKWSU8umg9
    Chunk ID: 1 | Chunk Content: cL7mDG7UhM
    Chunk ID: 4 | Chunk Content: E5PB2KGrko
    Chunk ID: 5 | Chunk Content: maHJXAMPZ
  Artifact: art-2
    Chunk ID: 2 | Chunk Content: ingC

In [11]:
name_node.delete.remote("art-1")
result = ray.get(name_node.get_nodes_status.remote())
print_status(result)

Storage Node PID: 159
  Artifact: art-2
    Chunk ID: 0 | Chunk Content: updatedArt
    Chunk ID: 1 | Chunk Content: ifact2Work
    Chunk ID: 2 | Chunk Content: ingCorrect
    Chunk ID: 3 | Chunk Content: lyabcdbaks
    Chunk ID: 4 | Chunk Content: djahdkjcdA
    Chunk ID: 5 | Chunk Content: DLKJADLKJ
Storage Node PID: 159
  Artifact: art-2
    Chunk ID: 1 | Chunk Content: ifact2Work
    Chunk ID: 4 | Chunk Content: djahdkjcdA
Storage Node PID: 160
  Artifact: art-2
    Chunk ID: 2 | Chunk Content: ingCorrect
    Chunk ID: 3 | Chunk Content: lyabcdbaks
Storage Node PID: 160
  Artifact: art-2
    Chunk ID: 0 | Chunk Content: updatedArt
    Chunk ID: 5 | Chunk Content: DLKJADLKJ


In [12]:
print(ray.get(name_node.get.remote("art-2")))

updatedArtifact2WorkingCorrectlyabcdbaksdjahdkjcdADLKJADLKJ


In [13]:
print(ray.get(name_node.get.remote("art-1")))

None


In [14]:
name_node.cancel_tasks.remote()

ClientObjectRef(1e8ff6d23613278489a72d008480465bdc13d3cb0100000001000000)