In [5]:
import helix
from helix.client import Query
from helix.types import Payload
from typing import List
from instance import Instance

## Check Configs

In [6]:
import json
import os
from pathlib import Path

CONFIG_PATH = "helixdb-cfg"
CONFIG_FILE = "config.hx.json"
QUERIES_FILE = "queries.hx"
SCHEMA_FILE = "schema.hx"
DEFAULT_CONFIG = {
    "vector_config": {
        "m": 16,
        "ef_construction": 128,
        "ef_search": 768
    },
    "graph_config": {
        "secondary_indices": []
    },
    "db_max_size_gb": 10
}

helix_dir = Path(os.path.dirname(os.path.curdir)).resolve()
os.makedirs(os.path.join(helix_dir, CONFIG_PATH), exist_ok=True)

if not Path(os.path.join(helix_dir, CONFIG_PATH, CONFIG_FILE)).exists():
    configs = DEFAULT_CONFIG
    path = os.path.join(helix_dir, CONFIG_PATH, CONFIG_FILE)
    with open(path, "w") as f:
        json.dump(configs, f, indent=4)
    print("Config file created")
else:
    path = os.path.join(helix_dir, CONFIG_PATH, CONFIG_FILE)
    with open(path, "r") as f:
        configs = json.load(f)
    print("Config file loaded")

Config file loaded


## Create Schema

In [7]:
from typing import Dict

nonnumeric_types = [
    "ID",
    "Date",
    "String",
    "Boolean"
]

F_types = [32, 64]
I_types = [8, 16, 32, 64]
U_types = [8, 16, 32, 64, 128]

def check_type(type: str):
    if type in nonnumeric_types:
        return True

    if type[0] == "[" and type[-1] == "]":
        return check_type(type[1:-1])
    
    if type[0] == "F":
        bits = int(type[1:])
        if bits in F_types:
            return True

    if type[0] == "I":
        bits = int(type[1:])
        if bits in I_types:
            return True

    if type[0] == "U":
        bits = int(type[1:])
        if bits in U_types:
            return True

    return False

In [8]:
def create_node(node_type:str, properties:Dict[str, str] = {}) -> str:
    if not isinstance(node_type, str):
        raise TypeError(f"Node type must be a string, got {type(node_type).__name__}")
    if len(node_type) < 1:
        raise ValueError(f"Node type is empty")
    if not isinstance(properties, dict):
        raise TypeError(f"Properties must be a dictionary, got {type(properties).__name__}")
    if len(properties) < 1:
        raise ValueError(f"Properties dictionary is empty")
    output = ""
    output += "N::" + node_type + " {\n"
    for key, value in properties.items():
        if not isinstance(key, str):
            raise TypeError(f"Key '{key}' must be a string, got {type(key).__name__}")
        if not isinstance(value, str):
            raise TypeError(f"Value for key '{key}' must be a string, got {type(value).__name__}")
        if not check_type(value):
            raise ValueError(f"Value for key '{key}' must be a valid type, got {value}")
        
        output += "    " + key + ": " + value + "\n"
    output += "}\n"
    return output

print(create_node("User", {"name": "String", "age": "U32"}))
print(create_node("test", {"test": "U16"}))

N::User {
    name: String
    age: U32
}

N::test {
    test: U16
}



In [None]:
def create_edge(edge_type:str, from_node:str, to_node:str, properties:Dict[str, str] = {}) -> str:
    if not isinstance(edge_type, str):
        raise TypeError(f"Edge type must be a string, got {type(edge_type).__name__}")
    if not isinstance(from_node, str):
        raise TypeError(f"From node must be a string, got {type(from_node).__name__}")
    if not isinstance(to_node, str):
        raise TypeError(f"To node must be a string, got {type(to_node).__name__}")
    if not isinstance(properties, dict):
        raise TypeError(f"Properties must be a dictionary, got {type(properties).__name__}")
    
    output = ""
    output += "E::" + edge_type + " {\n"
    output += "    From: " + from_node + "\n"
    output += "    To: " + to_node + "\n"
    output += "    Properties: {\n"
    for key, value in properties.items():
        output += "        " + str(key) + ": " + str(value) + "\n"
    output += "    }\n"
    output += "}\n"
    return output

print(create_edge("Follows", "User", "User", {"since": "I32"}))

E::Follows {
    From: User
    To: User
    Properties: {
        since: I32
    }
}



In [13]:
def create_vector(vector_type:str, properties:Dict[str, str] = {}) -> str:
    output = ""
    output += "V::" + vector_type + " {\n"
    for key, value in properties.items():
        output += "    " + str(key) + ": " + str(value) + "\n"
    output += "}\n"
    return output

print(create_vector("User", {"name": "String", "age": "U32"}))

V::User {
    name: String
    age: U32
}



In [15]:
schema_text = ""

schema_text += create_node("User", {"name":"String", "age":"U32", "email":"String", "created_at":"I32", "updated_at":"I32"})
schema_text += create_node("Post", {"content":"String", "created_at":"I32", "updated_at":"I32"})
schema_text += create_edge("Follows", "User", "User", {"since":"I32"})
schema_text += create_edge("Created", "User", "Post", {"created_at":"I32"})

print(schema_text)

N::User {
    name: String
    6: U32
    email: String
    created_at: I32
    updated_at: I32
}
N::Post {
    content: String
    created_at: I32
    updated_at: I32
}
E::Follows {
    From: User
    To: User
    Properties: {
        since: I32
    }
}
E::Created {
    From: User
    To: Post
    Properties: {
        created_at: I32
    }
}



### Scan schema

In [165]:
if not Path(os.path.join(helix_dir, CONFIG_PATH, SCHEMA_FILE)).exists():
    path = os.path.join(helix_dir, CONFIG_PATH, SCHEMA_FILE)
    with open(path, "w") as f:
        json.dump("", f, indent=4)
    print("Schema file created")
else:
    path = os.path.join(helix_dir, CONFIG_PATH, SCHEMA_FILE)
    with open(path, "r") as f:
        schema = f.read()
        schema = schema.split('\n\n')
    print("Schema file loaded")

print(schema)

Schema file loaded
['N::User {\n    name: String,\n    age: U32,\n    email: String,\n    created_at: I32,\n    updated_at: I32,\n}', 'N::Post {\n    content: String,\n    created_at: I32,\n    updated_at: I32,\n}', 'E::Follows {\n    From: User,\n    To: User,\n    Properties: {\n        since: I32,\n    }\n}', 'E::Created {\n    From: User,\n    To: Post,\n    Properties: {\n        created_at: I32,\n    }\n}']


In [None]:
scanned_node = schema[1]
print(f"{scanned_node}")
scanned_node = scanned_node.replace("\n", "")
print(f"Type: {scanned_node.split("::")[0]}")
print(f"Name: {scanned_node.split("::")[1].split("{")[0].strip()}")
scanned_node_props = {}
for item in scanned_node.split("{")[1].split("}")[0].split(", "):
    item = item.replace(",", "").split(":")
    scanned_node_props[item[0].strip()] = item[1].strip()

print(f"Prop: {scanned_node_props}")

N::Post {
    content: String,
    created_at: I32,
    updated_at: I32,
}
Type: N
Name: Post
Prop: {'content': 'String', 'created_at': 'I32', 'updated_at': 'I32'}


In [72]:
def read_vector(vector_str:str):
    vector_str = vector_str.replace("\n", "")
    vector_type = vector_str.split("::")[1].split("{")[0].strip()
    properties = {}
    for item in vector_str.split("{")[1].split("}")[0].split(", "):
        item = item.replace(",", "").split(":")
        properties[item[0].strip()] = item[1].strip()
    return vector_type, properties
sample_vector = """
V::Embedding {
    vec: [F64]
}
"""
vector_type, properties = read_vector(sample_vector)
print(vector_type)
print(properties)
print(type(properties))

Embedding
{'vec': '[F64]'}
<class 'dict'>


In [128]:
scanned_node = schema[3]
edge_str = scanned_node.replace("\n", "")
edge_type = edge_str.split("::")[1].split("{")[0].strip()
properties = {}
for item in ''.join(edge_str.split("{")[1:]).split("}")[0].strip(",").split(", "):
    key = item.replace(",", "").split(":")[0].strip()
    value = ":".join(item.replace(",", "").split(":")[1:]).strip()
    if key == "Properties":
        properties[key] = {}
        for prop in value.split(", "):
            prop_key = prop.split(":")[0].strip()
            prop_value = ":".join(prop.split(":")[1:]).strip()
            properties[key][prop_key] = prop_value
    else:
        properties[key] = value

print(f"Type: {edge_type}")
print(f"Properties: {properties}")

Type: Created
Properties: {'From': 'User', 'To': 'Post', 'Properties': {'created_at': 'I32'}, '': ''}


In [130]:
def read_edge(edge_str:str):
    edge_str = edge_str.replace("\n", "")
    edge_type = edge_str.split("::")[1].split("{")[0].strip()
    properties = {}
    for item in ''.join(edge_str.split("{")[1:]).split("}")[0].strip(",").split(", "):
        key = item.replace(",", "").split(":")[0].strip()
        value = ":".join(item.replace(",", "").split(":")[1:]).strip()
        if key == "Properties":
            properties[key] = {}
            for prop in value.split(", "):
                prop_key = prop.split(":")[0].strip()
                prop_value = ":".join(prop.split(":")[1:]).strip()
                properties[key][prop_key] = prop_value
        else:
            properties[key] = value

    from_node = properties.get("From")
    to_node = properties.get("To")
    prop_str = properties.get("Properties", {})
    properties = {}

    for key, value in prop_str.items():
        properties[key.strip()] = value.strip()

    return edge_type, from_node, to_node, properties
edge_type, from_node, to_node, properties = read_edge(schema[3])
print(schema[3])
print(f"Edge Type: {edge_type}")
print(f"From Node: {from_node}")
print(f"To Node: {to_node}")
print(f"Properties: {properties}")

E::Created {
    From: User,
    To: Post,
    Properties: {
        created_at: I32,
    }
}
Edge Type: Created
From Node: User
To Node: Post
Properties: {'created_at': 'I32'}


In [108]:
def read_node(node_str:str):
    node_str = node_str.replace("\n", "")
    node_type = node_str.split("::")[1].split("{")[0].strip()
    properties = {}
    for item in node_str.split("{")[1].split("}")[0].strip(",").split(", "):
        item = item.replace(",", "").split(":")
        properties[item[0].strip()] = item[1].strip()
    return node_type, properties
node_type, properties = read_node(schema[0])
print(f"Node Type: {node_type}")
print(f"Properties: {properties}")
print(type(properties))

Node Type: User
Properties: {'name': 'String', 'age': 'U32', 'email': 'String', 'created_at': 'I32', 'updated_at': 'I32'}
<class 'dict'>


In [None]:
def read_schema(schema):
    for element in schema:
        element_type = element.split("::")[0]
        inputs = {}
        match element_type:
            case "N":
                node_type, properties = read_node(element)
                inputs["node_type"] = node_type
                inputs["properties"] = properties 
            case "E":
                edge_type, from_node, to_node, properties = read_edge(element)
                inputs["edge_type"] = edge_type
                inputs["from_node"] = from_node
                inputs["to_node"] = to_node
                inputs["properties"] = properties
            case "V":
                vector_type, properties = read_vector(element)
                inputs["vector_type"] = vector_type
                inputs["properties"] = properties
            case _:
                node_type, properties = read_node(element)
                inputs["node_type"] = node_type
                inputs["properties"] = properties
        print(element)
        print(inputs, "\n")

N::User {
    name: String,
    age: U32,
    email: String,
    created_at: I32,
    updated_at: I32,
}
{'node_type': 'User', 'properties': {'name': 'String', 'age': 'U32', 'email': 'String', 'created_at': 'I32', 'updated_at': 'I32'}} 

N::Post {
    content: String,
    created_at: I32,
    updated_at: I32,
}
{'node_type': 'Post', 'properties': {'content': 'String', 'created_at': 'I32', 'updated_at': 'I32'}} 

E::Follows {
    From: User,
    To: User,
    Properties: {
        since: I32,
    }
}
{'edge_type': 'Follows', 'from_node': 'User', 'to_node': 'User', 'properties': ''} 

E::Created {
    From: User,
    To: Post,
    Properties: {
        created_at: I32,
    }
}
{'edge_type': 'Created', 'from_node': 'User', 'to_node': 'Post', 'properties': ''} 



### Schema class

In [None]:
import json
import os
from pathlib import Path
from typing import Dict, List

class Schema:
    nonnumeric_types = [
        "ID",
        "Date",
        "String",
        "Boolean"
    ]

    F_types = [32, 64]
    I_types = [8, 16, 32, 64]
    U_types = [8, 16, 32, 64, 128]

    schema_file = "schema.hx"

    def __init__(self, config_path:str="helixdb-cfg"):
        self.config_path = config_path

        self.nodes = {}
        self.edges = {}
        self.vectors = {}
        self.output = ""

        self.helix_dir = Path(os.path.dirname(os.path.curdir)).resolve()
        os.makedirs(os.path.join(self.helix_dir, self.config_path), exist_ok=True)

        if not Path(os.path.join(self.helix_dir, self.config_path, Schema.schema_file)).exists():
            self.schema_path = os.path.join(self.helix_dir, self.config_path, Schema.schema_file)
            with open(self.schema_path, "w") as f:
                json.dump("", f, indent=4)
            print("Schema file created")
        else:
            self.schema_path = os.path.join(self.helix_dir, self.config_path, Schema.schema_file)
            with open(self.schema_path, "r") as f:
                schema = f.read()
                schema = schema.split('\n\n')
                print(schema)
                self._read_schema(schema)
                self._compile()
            print("Schema file loaded")

    def create_node(self, node_type:str, properties:Dict[str, str] = {}):
        if not isinstance(node_type, str):
            raise TypeError(f"Node type must be a string, got {type(node_type).__name__}")
        if len(node_type) < 1:
            raise ValueError(f"Node type is empty")
        if not isinstance(properties, dict):
            raise TypeError(f"Properties must be a dictionary, got {type(properties).__name__}")
        if len(properties) < 1:
            raise ValueError(f"Properties dictionary is empty")

        self.nodes[node_type] = {"properties": properties}

    def update_node(self, node_type:str, properties:Dict[str, str] = {}):
        self.create_node(node_type, properties)

    def get_nodes(self):
        return self.nodes

    def get_node(self, node_type:str):
        return self.nodes.get(node_type)

    def delete_node(self, node_type:str):
        if node_type in self.nodes:
            del self.nodes[node_type]
        else:
            raise ValueError(f"Node type {node_type} does not exist")
    
    def delete_nodes(self, node_types:List[str]):
        for node_type in node_types:
            self.delete_node(node_type)

    def create_edge(self, edge_type:str, from_node:str, to_node:str, properties:Dict[str, str] = {}):
        if not isinstance(edge_type, str):
            raise TypeError(f"Edge type must be a string, got {type(edge_type).__name__}")
        if len(edge_type) < 1:
            raise ValueError(f"Edge type is empty")
        if not isinstance(from_node, str):
            raise TypeError(f"From node must be a string, got {type(from_node).__name__}")
        if len(from_node) < 1:
            raise ValueError(f"From node is empty")
        if not isinstance(to_node, str):
            raise TypeError(f"To node must be a string, got {type(to_node).__name__}")
        if len(to_node) < 1:
            raise ValueError(f"To node is empty")
        if not isinstance(properties, dict):
            raise TypeError(f"Properties must be a dictionary, got {type(properties).__name__}")

        self.edges[edge_type] = {"from": from_node, "to": to_node, "properties": properties}

    def update_edge(self, edge_type:str, from_node:str, to_node:str, properties:Dict[str, str] = {}):
        self.create_edge(edge_type, from_node, to_node, properties)

    def get_edges(self):
        return self.edges

    def get_edge(self, edge_type:str):
        return self.edges.get(edge_type)

    def delete_edge(self, edge_type:str):
        if edge_type in self.edges:
            del self.edges[edge_type]
        else:
            raise ValueError(f"Edge type {edge_type} does not exist")
    
    def delete_edges(self, edge_types:List[str]):
        for edge_type in edge_types:
            self.delete_edge(edge_type)

    def create_vector(self, vector_type:str, properties:Dict[str, str] = {}):
        if not isinstance(vector_type, str):
            raise ValueError("Vector type must be a string")
        if len(vector_type) < 1:
            raise ValueError("Vector type must be at least 1 character long")
        if not isinstance(properties, dict):
            raise ValueError("Properties must be a dictionary")

        self.vectors[vector_type] = {"properties": properties}

    def update_vector(self, vector_type:str, properties:Dict[str, str] = {}):
        self.create_vector(vector_type, properties)

    def get_vectors(self):
        return self.vectors

    def get_vector(self, vector_type:str):
        return self.vectors.get(vector_type)

    def delete_vector(self, vector_type:str):
        if vector_type in self.vectors:
            del self.vectors[vector_type]
        else:
            raise ValueError(f"Vector type {vector_type} does not exist")
    
    def delete_vectors(self, vector_types:List[str]):
        for vector_type in vector_types:
            self.delete_vector(vector_type)

    def save(self):
        self._compile()
        with open(self.schema_path, "w") as f:
            f.write(self.output)

    def show_schema(self):
        self._compile()
        print(self.output)
        return self.output
    
    def _read_schema(self, schema:str):
        for element in schema:
            if len(element) < 1:
                continue
            element_type = element.split("::")[0]
            match element_type:
                case "N":
                    self._read_node(element)
                case "E":
                    self._read_edge(element)
                case "V":
                    self._read_vector(element)
                case _:
                    self._read_node(element)

    def _check_type(type: str):
        if type in Schema.nonnumeric_types:
            return True

        if type[0] == "[" and type[-1] == "]":
            return Schema._check_type(type[1:-1])
        
        if type[0] == "F":
            try:
                bits = int(type[1:])
            except ValueError:
                return False
            if bits in Schema.F_types:
                return True

        if type[0] == "I":
            try:
                bits = int(type[1:])
            except ValueError:
                return False
            if bits in Schema.I_types:
                return True

        if type[0] == "U":
            try:
                bits = int(type[1:])
            except ValueError:
                return False
            if bits in Schema.U_types:
                return True

        return False

    def _check_valid_property(key, value):
        if not isinstance(key, str):
            raise TypeError(f"Property key {key} must be a string, got {type(key).__name__}")
        if len(key) < 1:
            raise ValueError(f"Property key for {value} is empty")
        if not isinstance(value, str):
            raise TypeError(f"Property value for {key} must be a string, got {type(value).__name__}")
        if len(value) < 1:
            raise ValueError(f"Property value for {key} is empty")
        if not Schema._check_type(value):
            raise ValueError(f"Property {key}: {value} is not a valid type")

    def _read_node(self, node_str:str):
        node_str = node_str.replace("\n", "")
        node_type = node_str.split("::")[1].split("{")[0].strip()
        
        properties = {}
        for item in node_str.split("{")[1].split("}")[0].strip(",").split(", "):
            item = item.replace(",", "").split(":")
            properties[item[0].strip()] = item[1].strip()
        
        self.create_node(node_type, properties)

    def _compile_node(self, node_type, properties):
        properties = properties.get("properties", {})
        output = ""
        output += "N::" + node_type + " {\n"
        for key, value in properties.items():
            Schema._check_valid_property(key, value)
            output += "    " + key + ": " + value + ",\n"
        output += "}\n\n"

        self.output += output

    def _read_edge(self, edge_str:str):
        edge_str = edge_str.replace("\n", "")
        edge_type = edge_str.split("::")[1].split("{")[0].strip()
        properties = {}
        for item in ''.join(edge_str.split("{")[1:]).split("}")[0].strip(",").split(", "):
            key = item.replace(",", "").split(":")[0].strip()
            value = ":".join(item.replace(",", "").split(":")[1:]).strip()
            if key == "Properties":
                properties[key] = {}
                for prop in value.split(", "):
                    prop_key = prop.split(":")[0].strip()
                    prop_value = ":".join(prop.split(":")[1:]).strip()
                    properties[key][prop_key] = prop_value
            else:
                properties[key] = value

        from_node = properties.get("From")
        to_node = properties.get("To")
        prop_str = properties.get("Properties", {})
        properties = {}

        for key, value in prop_str.items():
            properties[key.strip()] = value.strip()

        self.create_edge(edge_type, from_node, to_node, properties)

    def _compile_edge(self, edge_type, properties):
        from_node = properties.get("from")
        to_node = properties.get("to")
        properties = properties.get("properties", {})

        if from_node not in self.nodes:
            raise ValueError(f"From node {from_node} does not exist")
        if to_node not in self.nodes:
            raise ValueError(f"To node {to_node} does not exist")
        
        output = ""
        output += "E::" + edge_type + " {\n"
        output += "    From: " + from_node + ",\n"
        output += "    To: " + to_node + ",\n"
        output += "    Properties: {\n"
        for key, value in properties.items():
            Schema._check_valid_property(key, value)
            output += "        " + str(key) + ": " + str(value) + ",\n"
        output += "    }\n"
        output += "}\n\n"

        self.output += output

    def _read_vector(self, vector_str:str):
        vector_str = vector_str.replace("\n", "")
        vector_type = vector_str.split("::")[1].split("{")[0].strip(",").strip()
        properties = {}
        for item in vector_str.split("{")[1].split("}")[0].split(", "):
            item = item.replace(",", "").split(":")
            properties[item[0].strip()] = item[1].strip()
        
        self.create_vector(vector_type, properties)

    def _compile_vector(self, vector_type:str, properties:Dict[str, str] = {}):
        properties = properties.get("properties", {})
        output = ""
        output += "V::" + vector_type + " {\n"
        for key, value in properties.items():
            Schema._check_valid_property(key, value)
            output += "    " + str(key) + ": " + str(value) + ",\n"
        output += "}\n\n"
    
        self.output += output

    def _compile(self):
        self.output = ""
        for node_type, properties in self.nodes.items():
            self._compile_node(node_type, properties)
        for edge_type, properties in self.edges.items():
            self._compile_edge(edge_type, properties)
        for vector_type, properties in self.vectors.items():
            self._compile_vector(vector_type, properties)

In [164]:
schema = Schema()
print(schema.output)
schema.update_node("User", {"name":"String", "age":"U64", "email":"String", "created_at":"I32", "updated_at":"I32"})
# schema.create_node("Post", {"content":"String", "created_at":"I32", "updated_at":"I32"})
# schema.create_edge("Follows", "User", "User", {"since":"I32"})
# schema.create_edge("Created", "User", "Post", {"created_at":"I32"})
schema.get_edges()

Schema file loaded
N::User {
    name: String
    age: U32
    email: String
    created_at: I32
    updated_at: I32
}
N::Post {
    content: String
    created_at: I32
    updated_at: I32
}
E::Follows {
    From: User
    To: User
    Properties: {
        since: I32
    }
}
E::Created {
    From: User
    To: Post
    Properties: {
        created_at: I32
    }
}



{'Follows': {'from': 'User', 'to': 'User', 'properties': {'since': 'I32'}},
 'Created': {'from': 'User',
  'to': 'Post',
  'properties': {'created_at': 'I32'}}}

## Helix CLI

### Deploy

In [18]:
import subprocess
import re
import pexpect

# Run command in background
deploy_db = subprocess.Popen(
    ['helix', 'deploy', '--path', 'helixdb-cfg', '--port', '6969'],
    stdout=subprocess.PIPE,
    text=True
)

output = []

# Read output line by line
for line in deploy_db.stdout:
    process_line = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
    line = process_line.sub('', line)
    output.append(line.strip())
    print(line.strip())

# Wait for process to complete
deploy_db.wait()

if "error" in "\n".join(output).lower():
    raise Exception(f"Failed to deploy Helix instance")

instance_id = [out for out in output if out.startswith("Instance ID:")][0].split("Instance ID: ")[1].split(" (running)")[0]
# print('\n'.join(output))
print(f"Instance ID: {instance_id}")

[1m[32mSuccessfully compiled[0m[0m 2 [1m[32mquery files[0m[0m
⢺ Building Helix

Successfully transpiled queries
Successfully wrote queries file


[1m[32mSuccessfully built Helix[0m[0m
⢹ Starting Helix instance

Instance ID: 091fd66f-c09a-413b-8377-ea0e2f28b3a6 (running)
└── Label:
└── Port: 6969
└── Available endpoints:
└── /get_followed_users_posts
└── /get_posts_by_user
└── /get_followed_users
└── /create_follow
└── /create_post
└── /get_users
└── /get_posts
└── /create_user
Instance ID: 091fd66f-c09a-413b-8377-ea0e2f28b3a6


[1m[32mSuccessfully started Helix instance[0m[0m


### Redeploy

In [8]:
redeploy_db = subprocess.Popen(
    ['helix', 'redeploy', '--path', 'helixdb-cfg', instance_id],
    stdout=subprocess.PIPE,
    text=True
)

output = []

# Read output line by line
for line in redeploy_db.stdout:
    process_line = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
    line = process_line.sub('', line)
    output.append(line.strip())
    print(line.strip())

# Wait for process to complete
redeploy_db.wait()

if "error" in "\n".join(output).lower():
    print('\n'.join(output))
    raise Exception(f"Failed to redeploy Helix instance")

# instance_id = [out for out in output if out.startswith("Instance ID:")][0].split("Instance ID: ")[1].split(" (running)")[0]
# print('\n'.join(output))
# print(f"Instance ID: {instance_id}")

[1m[32mSuccessfully compiled[0m[0m 2 [1m[32mquery files[0m[0m
⢺ Building Helix

Helix instance found!
Successfully wrote queries file


⢹ Building Helix

Instance ID: 9198b99e-c28d-4bb1-ad27-1c739f78e8f2 (running)
└── Label:
└── Port: 6969
└── Available endpoints:
└── /get_followed_users
└── /get_posts
└── /get_posts_by_user
└── /get_users
└── /create_follow
└── /create_post
└── /get_followed_users_posts
└── /create_user


[1m[32mSuccessfully built Helix[0m[0m
[1m[32mSuccessfully started Helix instance[0m[0m


### Stop

In [9]:
stop_db = subprocess.Popen(
    ['helix', 'stop', instance_id],
    stdout=subprocess.PIPE,
    text=True
)

output = []

for line in stop_db.stdout:
    process_line = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
    line = process_line.sub('', line)
    output.append(line.strip())

stop_db.wait()

print('\n'.join(output))

Stopped instance 9a642d47-7c00-435c-bfda-c800380e7c94


### Start

In [12]:
start_db = subprocess.Popen(
    ['helix', 'start', instance_id],
    stdout=subprocess.PIPE,
    text=True
)

output = []

# Read output line by line
for line in start_db.stdout:
    process_line = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
    line = process_line.sub('', line)
    output.append(line.strip())

# Wait for process to complete
start_db.wait()

print('\n'.join(output))

Instance ID: 9a642d47-7c00-435c-bfda-c800380e7c94 (running)
└── Label:
└── Port: 6969
└── Available endpoints:
└── /create_user
└── /create_post
└── /get_users
└── /get_followed_users
└── /get_followed_users_posts
└── /create_follow
└── /get_posts_by_user
└── /get_posts


[1m[32mSuccessfully started Helix instance[0m[0m


### Delete

In [20]:
import subprocess
import re

# Run the command and capture the output
result = subprocess.run(
    ['helix', 'delete', instance_id],
    input="y\n",  # Send 'y' and newline as input
    text=True,
    capture_output=True
)

output = result.stdout.split('\n')

output = [line for line in output if not line.startswith("Are you sure you want to delete")]

# Process and print the output
for line in output:
    # Remove ANSI escape sequences
    line = re.sub(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])', '', line)
    print(line)

# Print any errors if they occurred
if result.stderr:
    print("Errors:", result.stderr)

Helix instance found!
Stopped instance 091fd66f-c09a-413b-8377-ea0e2f28b3a6
Deleted Helix instance data



In [7]:
# delete_db = pexpect.spawn(
#     ' '.join(['helix', 'delete', instance_id]),
# )

# delete_db.expect("Are you sure you want to delete", timeout=10)
# delete_db.sendline("y")
# delete_db.expect(pexpect.EOF, timeout=10)

# del_out = delete_db.before.decode('utf-8').split('\n')

# for line in del_out:
#     process_line = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
#     line = process_line.sub('', line)
#     print(line)

### View Instances

In [19]:
status_db = subprocess.Popen(
    ['helix', 'instances'],
    stdout=subprocess.PIPE,
    text=True
)

output = []

# Read output line by line
for line in status_db.stdout:
    process_line = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')
    line = process_line.sub('', line)
    output.append(line.strip())

# Wait for process to complete
status_db.wait()

print('\n'.join(output))

Instance ID: 091fd66f-c09a-413b-8377-ea0e2f28b3a6 (running)
└── Label:
└── Port: 6969
└── Available endpoints:
└── /get_followed_users_posts
└── /get_posts_by_user
└── /get_followed_users
└── /create_follow
└── /create_post
└── /get_users
└── /get_posts
└── /create_user



### Check Instances

In [1]:
from instance import Instance
h_instance1 = Instance("helixdb-cfg", "6969")
print(h_instance1.deploy())

[1m[32mSuccessfully compiled[0m[0m 2 [1m[32mquery files[0m[0m
⣸ Building Helix

Successfully transpiled queries
Successfully wrote queries file
Instance ID: 7462e524-ce82-4181-80c3-5b23f02dbe7b (running)
└── Label:
└── Port: 6969
└── Available endpoints:
└── /create_post
└── /get_posts
└── /get_followed_users
└── /get_users
└── /get_followed_users_posts
└── /get_posts_by_user
└── /create_user
└── /create_follow


[1m[32mSuccessfully built Helix[0m[0m
[1m[32mSuccessfully started Helix instance[0m[0m


In [2]:
print(h_instance1.status())

Instance ID: 7462e524-ce82-4181-80c3-5b23f02dbe7b (running)
└── Label:
└── Port: 6969
└── Available endpoints:
└── /create_post
└── /get_posts
└── /get_followed_users
└── /get_users
└── /get_followed_users_posts
└── /get_posts_by_user
└── /create_user
└── /create_follow



In [4]:
import subprocess
import re
process_line = re.compile(r'\x1B(?:[@-Z\\-_]|\[[0-?]*[ -/]*[@-~])')


cmd = ['helix', 'instances']
process = subprocess.Popen(
    cmd,
    stdout=subprocess.PIPE,
    text=True
)

output = []

for line in process.stdout:
    line = process_line.sub('', line)
    output.append(line.strip())

process.wait()

print(output)

['Instance ID: 7462e524-ce82-4181-80c3-5b23f02dbe7b (running)', '└── Label:', '└── Port: 6969', '└── Available endpoints:', '└── /create_post', '└── /get_posts', '└── /get_followed_users', '└── /get_users', '└── /get_followed_users_posts', '└── /get_posts_by_user', '└── /create_user', '└── /create_follow', '']


In [11]:
print(len(output) > 1 and output[0].startswith("Instance ID"))
print(output[0].removeprefix("Instance ID: ").removesuffix(" (running)").removesuffix(" (not running)"))
print(output[0].split(" ")[-1])

True
7462e524-ce82-4181-80c3-5b23f02dbe7b
(running)


In [26]:
ports = []
ids = []
running = []

for line in output:
    if line.startswith("Instance ID: "):
        ids.append(line.removeprefix("Instance ID: ").removesuffix(" (running)").removesuffix(" (not running)"))
        running.append(line.split(" ")[-1] == "(running)")
    elif line.startswith("└── Port: "):
        ports.append(line.removeprefix("└── Port: "))

port_ids = dict(zip(ports, ids))
ids_running = dict(zip(ids, running))
print(port_ids)
print(ids_running)

{'6969': '7462e524-ce82-4181-80c3-5b23f02dbe7b'}
{'7462e524-ce82-4181-80c3-5b23f02dbe7b': True}


### Instance class

In [2]:
from instance import Instance
helix_instance = Instance("helixdb-cfg", "6969", verbose=True)
helix_instance.status()

[32m[HELIX][0m Helix instances status:
No running Helix instances
[32m[HELIX][0m Ports: {}
[32m[HELIX][0m Instances Running: {}


'No running Helix instances'

In [9]:
helix_instance.deploy()

[32m[HELIX][0m Instance is running - redeploying
[32m[HELIX][0m Redeploying Helix instance: c51fb4ec-5f5a-4460-8c49-88ead12e230e
Helix instance found!
[1m[32mSuccessfully compiled[0m[0m 2 [1m[32mquery files[0m[0m
Successfully wrote queries file
[1m[32mSuccessfully built Helix[0m[0m
⢺ Starting Helix instanceInstance ID: c51fb4ec-5f5a-4460-8c49-88ead12e230e (running)
└── Label:
└── Port: 6969
└── Available endpoints:
└── /get_users
└── /get_followed_users
└── /create_user
└── /create_post
└── /get_posts
└── /create_follow
└── /get_followed_users_posts
└── /get_posts_by_user
[1m[32mSuccessfully started Helix instance[0m[0m


'Helix instance found!\nSuccessfully wrote queries file\nInstance ID: c51fb4ec-5f5a-4460-8c49-88ead12e230e (running)\n└── Label:\n└── Port: 6969\n└── Available endpoints:\n└── /get_users\n└── /get_followed_users\n└── /create_user\n└── /create_post\n└── /get_posts\n└── /create_follow\n└── /get_followed_users_posts\n└── /get_posts_by_user'

## Create RAG Query