# HyperShell "ipyparallel" cluster deployment

In [None]:
import json
import tempfile
from enum import Enum

class ShellTemplate:
    
    def __init__(self, template):
        self.template = template
    
    def process(self, arguments):
        
        result = self.template
        for key in arguments:
            result = result.replace("{" + key + "}", arguments[key])
        return result
    
    def save(self, arguments): 
        
        result = self.process(arguments)
        
        file = NamedTemporaryFile(delete=False)
        file.write(result)
        file.close()
        
        return f.name
    
class HyperShellTemplate(ShellTemplate):
    
    def __init__(self, template):
        super().__init__(template) 
    
    def process(self, arguments, clusterDefinition):
        
        result = super().process(arguments)
        
        file = tempfile.NamedTemporaryFile(delete=False)
        file.write(result.encode())
        file.close()
        
        !hysh $clusterDefinition $file.name
        
class TCPPortRange:
    
    def __init__(self, start):
        self.start = start
        
    def next(self):
        result = self.start
        self.start = self.start + 1
        return str(result)

In [None]:
class HyperShellClusterOperationType(Enum):
    SETTINGS = 1
    CONTROLLER = 2
    WORKER = 3

class HyperShellClusterOperationFormat(Enum):
    TEXT = 1
    JSON = 2

class HyperShellCluster:
        
    def process(self, operation):
        if HyperShellClusterOperationType.SETTINGS == operation.operationType:
            self.settings = operation.payload
        elif HyperShellClusterOperationType.CONTROLLER == operation.operationType:
            return operation.process(self.settings)
        elif HyperShellClusterOperationType.WORKER == operation.operationType:
            return operation.process(self.settings)
        
class HyperShellClusterOperation:

    def __init__(self, operationType, payload, outputFormat):
        self.operationType = operationType
        self.payload = payload
        self.outputFormat = outputFormat

    def process(self, settings):     

        outputFormat = "stream+text"
        if HyperShellClusterOperationFormat.JSON == self.outputFormat:
            outputFormat = "stream+json"

        if isinstance(self.payload, HyperShellTemplate):
            if HyperShellClusterOperationType.CONTROLLER == self.operationType:
                return self.payload.process(settings, settings["CONTROLLER_CLUSTER_DEFINITION"])
            elif HyperShellClusterOperationType.WORKER == self.operationType:
                return self.payload.process(settings, settings["WORKER_CLUSTER_DEFINITION"])
        else:
            if HyperShellClusterOperationType.CONTROLLER == self.operationType:
                path = settings["CONTROLLER_CLUSTER_DEFINITION"]
                command = f"\"{self.payload}\""
                response=!hysh $path $command --format $outputFormat
                return response
            elif HyperShellClusterOperationType.WORKER == self.operationType:
                path = settings["WORKER_CLUSTER_DEFINITION"]
                command = f"\"{self.payload}\""
                response=!hysh $path $command --format $outputFormat
                return response

class HyperShellClusterCLI:
    
    def __init__(self):
        self.cluster = HyperShellCluster()

    def c(self, payload):
        return self.cluster.process(HyperShellClusterOperation(HyperShellClusterOperationType.CONTROLLER, payload, HyperShellClusterOperationFormat.TEXT))

    def cj(self, payload):
        result = self.cluster.process(HyperShellClusterOperation(HyperShellClusterOperationType.CONTROLLER, payload, HyperShellClusterOperationFormat.JSON))
        # Get standart output
        result = result[0]
        # Remove comma from the JSON stream
        result = result.rstrip(",")
        # Parse JSON response
        result = json.loads(result)
        # Extract HySh transaction output
        result = result["output"]
        # Parse transaction output
        result = json.loads(result)
        return result

    def ct(self, payload):
        template = HyperShellTemplate(payload)
        return self.cluster.process(HyperShellClusterOperation(HyperShellClusterOperationType.CONTROLLER, template, HyperShellClusterOperationFormat.TEXT))
    
    def s(self, payload):
        return self.cluster.process(HyperShellClusterOperation(HyperShellClusterOperationType.SETTINGS, payload, HyperShellClusterOperationFormat.TEXT))

    def w(self, payload):
        return self.cluster.process(HyperShellClusterOperation(HyperShellClusterOperationType.WORKER, payload, HyperShellClusterOperationFormat.TEXT))
    
    def wt(self, payload):
        template = HyperShellTemplate(payload)
        return self.cluster.process(HyperShellClusterOperation(HyperShellClusterOperationType.WORKER, template, HyperShellClusterOperationFormat.TEXT))

## Cluster settings

In [None]:
# In the firewall rule, we can reserve 10 * 11 = 110 ports 
# to be able to re-start controller 10 times
# So, we expose ports from 8081 to 8910
portRange = TCPPortRange(9801)

In [None]:
# 11 ports for each controller instance
settings = {
    "CONTROLLER_CLUSTER_DEFINITION": "~/controllers.cluster.json",
    "WORKER_CLUSTER_DEFINITION": "~/workers.cluster.json",
    "BIND_IP_ADDRESS": "0.0.0.0",
    "LOCATION_IP_ADDRESS": "34.239.11.167",
    "REGISTRATION_PORT": portRange.next(),
    "CONTROL_PORT_1": portRange.next(),
    "CONTROL_PORT_2": portRange.next(),
    "MUX_PORT_1": portRange.next(),
    "MUX_PORT_2": portRange.next(),
    "HB_PING_PORT": portRange.next(),
    "HB_PONG_PORT": portRange.next(),
    "TASK_PORT_1": portRange.next(),
    "TASK_PORT_2": portRange.next(),
    "IOPUB_PORT_1": portRange.next(),
    "IOPUB_PORT_2": portRange.next(),
    # "NOTIFICATION_PORT": portRange.next()
}

## Create the cluster

In [None]:
cluster = HyperShellClusterCLI()
cluster.s(settings)

## Install "ipyparallel"

In [None]:
controllersResponse = cluster.c("pip install ipyparallel")
workersResponse = cluster.w("pip install ipyparallel")
print(controllersResponse)
print(workersResponse)

## Start the controller

In [None]:
cluster.ct("""
ipcontroller \
    --log-level=DEBUG \
    --log-to-file=True \
    --ip={BIND_IP_ADDRESS} \
    --location={LOCATION_IP_ADDRESS} \
    --HubFactory.regport={REGISTRATION_PORT} \
    --HubFactory.hb={HB_PING_PORT},{HB_PONG_PORT} \
    --HubFactory.control={CONTROL_PORT_1},{CONTROL_PORT_2} \
    --HubFactory.mux={MUX_PORT_1},{MUX_PORT_2} \
    --HubFactory.iopub={IOPUB_PORT_1},{IOPUB_PORT_2} \
    --HubFactory.task={TASK_PORT_1},{TASK_PORT_2} | tee
""")

In [None]:
remoteSettings = cluster.cj("cat ~/.ipython/profile_default/security/ipcontroller-engine.json | tee")
settings["KEY"] = remoteSettings["key"]
cluster.s(settings)

In [None]:
print(cluster.cluster.settings)

## Write engine settings

In [None]:
cluster.wt("""
mkdir -p ~/.ipython/profile_default/security
echo "{ \
  \\"ssh\\": \\"\\", \
  \\"interface\\": \\"tcp://{LOCATION_IP_ADDRESS}\\", \
  \\"registration\\": {REGISTRATION_PORT}, \
  \\"control\\": {CONTROL_PORT_2}, \
  \\"mux\\": {MUX_PORT_2}, \
  \\"hb_ping\\": {HB_PING_PORT}, \
  \\"hb_pong\\": {HB_PONG_PORT}, \
  \\"task\\": {TASK_PORT_2}, \
  \\"iopub\\": {IOPUB_PORT_2}, \
  \\"key\\": \\"{KEY}\\", \
  \\"location\\": \\"{LOCATION_IP_ADDRESS}\\", \
  \\"pack\\": \\"json\\", \
  \\"unpack\\": \\"json\\", \
  \\"signature_scheme\\": \\"hmac-sha256\\" \
}" > ~/.ipython/profile_default/security/ipcontroller-engine.json
""")

In [None]:
cluster.w("cat ~/.ipython/profile_default/security/ipcontroller-engine.json")

## Start engines

In [None]:
cluster.w("ipengine --log-level=DEBUG --log-to-file=True --profile-dir=~/.ipython/profile_default/ &")

## Check engines

In [None]:
cluster.c("ps")

In [None]:
cluster.w("ps")

In [None]:
!hysh ~/controllers.cluster.json "cat ~/.ipython/profile_default/log/*"

In [None]:
!hysh ~/workers.cluster.json "cat ~/.ipython/profile_default/log/ipengine*"