In [1]:
import ray
import time
import numpy as np
import os
import uuid
import random  

In [None]:
if ray.is_initialized:
    ray.shutdown()
ray.init(ignore_reinit_error=True)

In [None]:
# content - string of length 64 
# it is always split into 4 blocks of size 16 each 
class Artefact:
    def __init__(self, name, content):
        self.name = name
        self.content = content 
# end Artefact class 

class Block:
    def __init__(self, blockID, blockContent, randomDataNodes):
        self.blockID = blockID
        self.blockContent = blockContent
        self.dataNodes = randomDataNodes
        
    def saveContentToNodes(self):
        for dataNode in self.dataNodes:
            dataNode.saveBlockContent.remote(self.blockID, self.blockContent)
    
    def getContentFromNode(self):
        n = len(self.dataNodes)

        # random data node - load balancing 
        idx = np.random.randint(0, n)

        return ray.get(self.dataNodes[idx].getBlockContent.remote(self.blockID))
# end class Block

In [None]:
NUM_OF_BLOCK_COPIES = 3 

@ray.remote 
class NameNode:
    def __init__(self, dataNodes):
        self.files = {} # filename -> list of instances of Block class 
        self.dataNodes = dataNodes # list of data nodes
    
    def addNewArtefact(self, artefact: Artefact):
        blocks = self.splitArtefactIntoBlocks(artefact)
        self.files[artefact.name] = blocks 
    #

    def splitArtefactIntoBlocks(self, artefact: Artefact):
        blocks = [] 
        for a in range(4):
            blockContent = artefact.content[16 * a : 16 * (a + 1)]
            blockID = uuid.uuid4()
            randomDataNodes = random.sample(self.dataNodes, min(len(self.dataNodes), NUM_OF_BLOCK_COPIES))

            block = Block.remote(blockID, blockContent, randomDataNodes)
            block.saveContentToNodes()

            blocks.append(block)
        #
        return blocks 
    #

    def restoreArtefact(self, filename):
        blocks = self.files[filename]
        contents = []
        for block in blocks:
            content = block.getContentFromNode()
            contents.append(ray.get(content))
        #
        return "".join(contents)
    #
# end NameNode class 

In [None]:
@ray.remote 
class DataNode:
    def __init__(self):
        self.dataNodeID = uuid.uuid4()
        self.data = {} # block ID -> block content 
    
    def saveBlockContent(self, blockID, blockContent):
        self.data[blockID] = blockContent

    def getBlockContent(self, blockID):
        return self.data[blockID]
# end DataNode class 

In [None]:
dataNodes = [DataNode.remote() for _ in range(10)]
nameNode = NameNode.remote(dataNodes)

In [None]:
text = "Filip i Karol weszli razem na polanke by poogladac piekna przyrode i razem odpoczac"

artefact = Artefact("password", text[:64])
nameNode.addNewArtefact.remote(artefact)

ObjectRef(cd571e6fa427b0660b4572d8321d92cee2d5cf9e0100000001000000)

In [None]:
content = nameNode.restoreArtefact.remote(artefact.name)
content = ray.get(content)
print(content)
print(len(content))

Filip i Karol weszli razem na polanke by poogladac piekna przyro
64
