Skip to content

Commit

Permalink
refactor into node module
Browse files Browse the repository at this point in the history
  • Loading branch information
saibatizoku committed Apr 2, 2023
1 parent b6edf65 commit e0bdda1
Show file tree
Hide file tree
Showing 10 changed files with 449 additions and 315 deletions.
1 change: 1 addition & 0 deletions services/voting-node/voting_node/__init__.py
@@ -0,0 +1 @@
"""Service for deploying jormungandr nodes in voting events."""
22 changes: 12 additions & 10 deletions services/voting-node/voting_node/db.py
@@ -1,16 +1,17 @@
import asyncpg
import datetime
from typing import Any, List
from typing import Any

import asyncpg

from .logs import getLogger
from .models import Event, HostInfo, LeaderHostInfo, Proposal, Snapshot, VotePlan
from .utils import get_hostname, LEADER_REGEX
from .utils import LEADER_REGEX, get_hostname

# gets voting node logger
logger = getLogger()


class EventDb(object):
class EventDb:
conn: Any = None
db_url: str

Expand Down Expand Up @@ -65,11 +66,12 @@ async def insert_leader_host_info(self, host_info: HostInfo):
raise Exception(f"failed to insert '{h.hostname}' info to DB")
logger.debug(f"{h.hostname} info added: {result}")

async def fetch_sorted_leaders_host_info(self) -> List[LeaderHostInfo]:
async def fetch_sorted_leaders_host_info(self) -> list[LeaderHostInfo]:
"""Fetch host information for leader nodes.
Returns a list of leader host information.
Raises exceptions if the DB fails to return a list of records, or
if the list is empty."""
if the list is empty.
"""
where = f"WHERE hostname != $1 AND hostname ~ '{LEADER_REGEX}'"
order_by = "ORDER BY hostname ASC"
query = f"SELECT (hostname, pubkey) FROM voting_node {where} {order_by}"
Expand All @@ -88,7 +90,7 @@ def extract_leader_info(leader):

return list(map(extract_leader_info, leaders))

async def fetch_proposals(self) -> List[Proposal]:
async def fetch_proposals(self) -> list[Proposal]:
query = "SELECT * FROM proposal ORDER BY id ASC"
result = await self.conn.fetch(query)
if result is None:
Expand All @@ -101,7 +103,7 @@ async def fetch_proposals(self) -> List[Proposal]:
raise Exception("no proposals found in DB")
case [*proposals]:
logger.debug(f"proposals retrieved from DB: {len(proposals)}")
return list(map(lambda r: Proposal(**dict(r)), proposals))
return [Proposal(**dict(r)) for r in proposals]

async def check_if_snapshot_is_final(self, event_id: int) -> bool:
query = "SELECT final FROM snapshot WHERE event = $1"
Expand Down Expand Up @@ -132,7 +134,7 @@ async def fetch_snapshot(self, event_id: int) -> Snapshot:
logger.debug(f"snapshot retrieved from DB: {snapshot}")
return snapshot

async def fetch_voteplans(self, event_id: int) -> List[VotePlan]:
async def fetch_voteplans(self, event_id: int) -> list[VotePlan]:
# fetch the voteplans
query = "SELECT * FROM voteplan WHERE event_id = $1 ORDER BY id ASC"
result = await self.conn.fetch(query, event_id)
Expand All @@ -146,7 +148,7 @@ async def fetch_voteplans(self, event_id: int) -> List[VotePlan]:
raise Exception("no voteplans found in DB")
case [*voteplans]:
logger.debug(f"voteplans retrieved from DB: {len(voteplans)}")
return list(map(lambda r: VotePlan(**dict(r)), voteplans))
return [VotePlan(**dict(r)) for r in voteplans]

async def insert_block0_info(self, event_row_id: int, block0_bytes: bytes, block0_hash: str):
# insert the hostname row into the voting_node table
Expand Down
13 changes: 7 additions & 6 deletions services/voting-node/voting_node/jcli.py
Expand Up @@ -2,10 +2,10 @@
from pathlib import Path


class JCli(object):
class JCli:
"""Wrapper type for the jcli command-line."""

def __init__(self, jcli_exec: str):
def __init__(self, jcli_exec: str) -> None:
self.jcli_exec = jcli_exec

async def key_generate(self, secret_type: str = "ed25519") -> str:
Expand Down Expand Up @@ -75,7 +75,7 @@ async def key_to_bytes(self, key: str) -> str:
return key

async def votes_committee_communication_key_generate(self) -> str:
"""Run 'jcli genesis encode' to make block0 from genesis.yaml"""
"""Run 'jcli genesis encode' to make block0 from genesis.yaml."""
proc_args = (
"votes",
"committee",
Expand All @@ -97,7 +97,8 @@ async def votes_committee_communication_key_generate(self) -> str:

async def votes_committee_communication_key_to_public(self, input_key: str) -> str:
"""Run 'jcli vote committee communication-key to-public [INPUT]' to return
the public communication key."""
the public communication key.
"""
proc_args = (
"votes",
"committee",
Expand All @@ -122,7 +123,7 @@ async def vote_committee_member_key_generate(self, comm_pub_keys: list[str], thr
...

async def genesis_encode(self, block0_bin: Path, genesis_yaml: Path):
"""Run 'jcli genesis encode' to make block0 from genesis.yaml"""
"""Run 'jcli genesis encode' to make block0 from genesis.yaml."""
proc = await asyncio.create_subprocess_exec(
self.jcli_exec,
"genesis",
Expand All @@ -140,7 +141,7 @@ async def genesis_encode(self, block0_bin: Path, genesis_yaml: Path):
raise Exception("failed to generate block0")

async def genesis_hash(self, block0_bin: Path) -> str:
"""Run 'jcli genesis hash' to get the hash from block0.bin"""
"""Run 'jcli genesis hash' to get the hash from block0.bin."""
proc = await asyncio.create_subprocess_exec(
self.jcli_exec,
"genesis",
Expand Down
49 changes: 47 additions & 2 deletions services/voting-node/voting_node/jormungandr.py
@@ -1,5 +1,50 @@
class Jormungandr(object):
import asyncio

from .logs import getLogger

# gets voting node logger
logger = getLogger()


class Jormungandr:
"""Wrapper type for the jormungandr command-line."""

def __init__(self, jorm_exec: str):
def __init__(self, jorm_exec: str) -> None:
self.jorm_exec = jorm_exec

# keeps on launching jormungandr until `stop_schedule()` is called
async def run_jorm_node(self):
jorm_task = asyncio.create_task(self.start_jormungandr())
try:
logger.debug("jorm task starting")
await jorm_task
logger.debug("jorm task is finished")
except Exception as e:
logger.debug(f"jorm failed to start: {e}")

async def start_jormungandr(self):
try:
await self.jormungandr_subprocess_exec()
except Exception as e:
f"jorm error: {e}"
raise e

async def jormungandr_subprocess_exec(self):
try:
proc = await asyncio.create_subprocess_exec(
self.jorm_exec, # "--help",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
)
stdout, stderr = await proc.communicate()

if stdout:
logger.info(f"[stdout]\n{stdout.decode()}")
if stderr:
logger.warning(f"[stderr]\n{stderr.decode()}")

if proc.returncode != 0:
raise Exception(f"jormungandr exited with non-zero status: {proc.returncode}")
except Exception as e:
logger.warning(f"jorm node error: {e}")
raise e
98 changes: 71 additions & 27 deletions services/voting-node/voting_node/main.py
@@ -1,38 +1,59 @@
from typing import Final

import click
import uvicorn

from . import api, logs, service
from .models import ServiceSettings

# Environment variables
VOTING_HOST: Final = "VOTING_HOST"
VOTING_PORT: Final = "VOTING_PORT"
VOTING_LOG_LEVEL: Final = "VOTING_LOG_LEVEL"
VOTING_NODE_STORAGE: Final = "VOTING_NODE_STORAGE"
IS_NODE_RELOADABLE: Final = "IS_NODE_RELOADABLE"
EVENTDB_URL: Final = "EVENTDB_URL"
JORM_PATH: Final = "JORM_PATH"
JORM_PORT_REST: Final = "JORM_PORT_REST"
JORM_PORT_JRPC: Final = "JORM_PORT_JRPC"
JORM_PORT_P2P: Final = "JORM_PORT_P2P"


@click.group()
@click.option("--debug/--no-debug", default=False)
@click.option("--hot-reload/--no-hot-reload", default=False)
def cli(debug, hot_reload):
click.echo(f"debug-mode={debug}")
click.echo(f"hot-reload-mode={hot_reload}")
def cli():
"""Main CLI entry point."""


@click.command()
@click.option(
"--reloadable",
is_flag=True,
envvar="IS_NODE_RELOADABLE",
help="""\
Enables the node to reload when it detects changes to the current Voting Event.\
If not set, the node will still detect changes to the Voting Event, but will use\
the configuration it has, emitting warnings in the logs.
""",
)
@click.option(
"--host",
envvar="VOTING_HOST",
envvar=VOTING_HOST,
default="0.0.0.0",
help="""\
Host for the voting node API. If left unset it will look for VOTING_HOST.\
If no host is found, the default value is: 0.0.0.0""",
)
@click.option(
"--port",
envvar="VOTING_PORT",
envvar=VOTING_PORT,
default=8000,
help="""\
Port for the voting node API. If left unset it will look for VOTING_PORT.\
If no port is found, the default value is: 8000""",
)
@click.option(
"--log-level",
envvar="VOTING_LOG_LEVEL",
envvar=VOTING_LOG_LEVEL,
default="info",
type=click.Choice(["info", "debug", "warn", "error", "trace"]),
help="""\
Expand All @@ -41,65 +62,88 @@ def cli(debug, hot_reload):
)
@click.option(
"--database-url",
envvar="DATABASE_URL",
envvar=EVENTDB_URL,
default="postgres://localhost/CatalystEventDev",
help="""\
Sets the URL for the database. Default: postgres://localhost/CatalystEventDev""",
)
@click.option(
"--node-storage",
envvar="VOTING_NODE_STORAGE",
envvar=VOTING_NODE_STORAGE,
default="./node_storage",
help="Sets the location for the voting node's storage",
help="Sets the path to the voting node storage directory",
)
@click.option(
"--jorm-path",
envvar="JORM_PATH",
envvar=JORM_PATH,
default="jormungandr",
help="""\
Path to the 'jormungandr' executable.
""",
)
@click.option(
"--jorm-rest-port",
envvar="JORM_REST_PORT",
"--jorm-port-rest",
envvar=JORM_PORT_REST,
default=10080,
help="""\
jormungandr REST listening port
""",
)
@click.option(
"--jorm-jrpc-port",
envvar="JORM_JRPC_PORT",
"--jorm-port-jrpc",
envvar=JORM_PORT_JRPC,
default=10085,
help="""\
jormungandr JRPC listening port
""",
)
@click.option(
"--jorm-p2p-port",
envvar="JORM_P2P_PORT",
"--jorm-port-p2p",
envvar=JORM_PORT_P2P,
default=10090,
help="""\
jormungandr P2P listening port
""",
)
@click.option(
"--jcli-path",
envvar="JCLI_PATH",
default="jcli",
help="""\
Path to the 'jcli' executable.
""",
)
@click.option("--jcli-path", envvar="JCLI_PATH", default="jcli")
def start(
reloadable,
host,
port,
log_level,
database_url,
node_storage,
jorm_path,
jcli_path,
jorm_rest_port,
jorm_jrpc_port,
jorm_p2p_port,
jorm_port_rest,
jorm_port_jrpc,
jorm_port_p2p,
):
"""Starts the voting node."""
logs.configLogger(log_level)
click.echo(f"reloadable={reloadable}")

api_config = uvicorn.Config(api.app, host=host, port=port, log_level=log_level)
settings = ServiceSettings(
jorm_rest_port,
jorm_jrpc_port,
jorm_p2p_port,
jorm_port_rest,
jorm_port_jrpc,
jorm_port_p2p,
node_storage,
jcli_path,
jorm_path,
database_url,
reloadable,
)

voting_node = service.VotingNode(api_config, settings, database_url)
voting_node.start()
voting = service.VotingService(api_config, settings)
voting.start()


# this groups commands in the main 'cli' group
Expand Down

0 comments on commit e0bdda1

Please sign in to comment.