Skip to content

Commit

Permalink
Merge tag 'main' into docs
Browse files Browse the repository at this point in the history
  • Loading branch information
jmxpearson committed Jan 15, 2024
2 parents d40fc9e + 0626a77 commit 5c2cf8b
Show file tree
Hide file tree
Showing 5 changed files with 153 additions and 49 deletions.
27 changes: 17 additions & 10 deletions improv/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,6 @@ def __init__(self, configFile):
# Reading config from other yaml file
self.configFile = configFile

self.actors = {}
self.connections = {}
self.hasGUI = False

def createConfig(self):
"""Read yaml config file and create config for Nexus
TODO: check for config file compliance, error handle it
beyond what we have below.
"""
with open(self.configFile, "r") as ymlfile:
cfg = yaml.safe_load(ymlfile)

Expand All @@ -36,7 +27,10 @@ def createConfig(self):
self.settings = cfg["settings"]
else:
self.settings = {}
self.settings["use_watcher"] = None

if "use_watcher" not in self.settings:
self.settings["use_watcher"] = False

except TypeError:
if cfg is None:
logger.error("Error: The config file is empty")
Expand All @@ -45,6 +39,19 @@ def createConfig(self):
logger.error("Error: The config file is not in dictionary format")
raise TypeError

self.config = cfg

self.actors = {}
self.connections = {}
self.hasGUI = False

def createConfig(self):
"""Read yaml config file and create config for Nexus
TODO: check for config file compliance, error handle it
beyond what we have below.
"""
cfg = self.config

for name, actor in cfg["actors"].items():
if name in self.actors.keys():
raise RepeatedActorError(name)
Expand Down
73 changes: 49 additions & 24 deletions improv/nexus.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ def createNexus(
self,
file=None,
use_hdd=False,
use_watcher=False,
store_size=10000000,
use_watcher=None,
store_size=10_000_000,
control_port=0,
output_port=0,
):
Expand All @@ -64,19 +64,42 @@ def createNexus(
curr_dt = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
logger.info(f"************ new improv server session {curr_dt} ************")

if file is None:
logger.exception("Need a config file!")
raise Exception # TODO
else:
logger.info(f"Loading configuration file {file}:")
self.loadConfig(file=file)
with open(file, "r") as f: # write config file to log
logger.info(f.read())

# set config options loaded from file
# in Python 3.9, can just merge dictionaries using precedence
cfg = self.config.settings
if "use_hdd" not in cfg:
cfg["use_hdd"] = use_hdd
if "use_watcher" not in cfg:
cfg["use_watcher"] = use_watcher
if "store_size" not in cfg:
cfg["store_size"] = store_size
if "control_port" not in cfg or control_port != 0:
cfg["control_port"] = control_port
if "output_port" not in cfg or output_port != 0:
cfg["output_port"] = output_port

# set up socket in lieu of printing to stdout
self.zmq_context = zmq.Context()
self.out_socket = self.zmq_context.socket(PUB)
self.out_socket.bind("tcp://*:%s" % output_port)
self.out_socket.bind("tcp://*:%s" % cfg["output_port"])
out_port_string = self.out_socket.getsockopt_string(SocketOption.LAST_ENDPOINT)
output_port = int(out_port_string.split(":")[-1])
cfg["output_port"] = int(out_port_string.split(":")[-1])

self.in_socket = self.zmq_context.socket(REP)
self.in_socket.bind("tcp://*:%s" % control_port)
self.in_socket.bind("tcp://*:%s" % cfg["control_port"])
in_port_string = self.in_socket.getsockopt_string(SocketOption.LAST_ENDPOINT)
control_port = int(in_port_string.split(":")[-1])
cfg["control_port"] = int(in_port_string.split(":")[-1])

# default size should be system-dependent; this is 40 GB
# default size should be system-dependent
self._startStoreInterface(store_size)
self.out_socket.send_string("StoreInterface started")

Expand All @@ -86,14 +109,14 @@ def createNexus(
self.store.subscribe()

# LMDB storage
self.use_hdd = use_hdd
self.use_hdd = cfg["use_hdd"]
if self.use_hdd:
self.lmdb_name = f'lmdb_{datetime.now().strftime("%Y%m%d_%H%M%S")}'
self.store_dict = dict()

# TODO: Better logic/flow for using watcher as an option
self.p_watch = None
if use_watcher:
if cfg["use_watcher"]:
self.startWatcher()

# Create dicts for reading config and creating actors
Expand All @@ -104,19 +127,21 @@ def createNexus(
self.flags = {}
self.processes = []

if file is None:
logger.exception("Need a config file!")
raise Exception # TODO
else:
self.loadConfig(file=file)
self.initConfig()

self.flags.update({"quit": False, "run": False, "load": False})
self.allowStart = False
self.stopped = False

return (control_port, output_port)
return (cfg["control_port"], cfg["output_port"])

def loadConfig(self, file):
"""Load configuration file.
file: a YAML configuration file name
"""
self.config = Config(configFile=file)

def initConfig(self):
"""For each connection:
create a Link with a name (purpose), start, and end
Start links to one actor's name, end to the other.
Expand All @@ -134,7 +159,6 @@ def loadConfig(self, file):
"""
# TODO load from file or user input, as in dialogue through FrontEnd?

self.config = Config(configFile=file)
flag = self.config.createConfig()
if flag == -1:
logger.error(
Expand Down Expand Up @@ -195,7 +219,7 @@ def loadConfig(self, file):
for name, link in self.data_queues.items():
self.assignLink(name, link)

if self.config.settings["use_watcher"] is not None:
if self.config.settings["use_watcher"]:
watchin = []
for name in self.config.settings["use_watcher"]:
watch_link = Link(name + "_watch", name, "Watcher")
Expand Down Expand Up @@ -266,13 +290,14 @@ def destroyNexus(self):
logger.warning("Destroying Nexus")
self._closeStoreInterface()

try:
os.remove(self.store_loc)
except FileNotFoundError:
logger.warning(
"StoreInterface file {} is already deleted".format(self.store_loc)
)
logger.warning("Delete the store at location {0}".format(self.store_loc))
if hasattr(self, "store_loc"):
try:
os.remove(self.store_loc)
except FileNotFoundError:
logger.warning(
"StoreInterface file {} is already deleted".format(self.store_loc)
)
logger.warning("Delete the store at location {0}".format(self.store_loc))

async def pollQueues(self):
"""
Expand Down
20 changes: 20 additions & 0 deletions test/configs/minimal_with_settings.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
settings:
store_size: 20_000_000
not_relevant: for testing purposes
control_port: 6000
output_port: 6001
logging_port: 6002
use_hdd: false
use_watcher: false

actors:
Generator:
package: actors.sample_generator
class: Generator

Processor:
package: actors.sample_processor
class: Processor

connections:
Generator.q_out: [Processor.q_in]
35 changes: 20 additions & 15 deletions test/test_config.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
# from importlib import import_module

# from improv.config import RepeatedActorError
from improv.config import Config as config
from improv.config import Config
from improv.utils import checks

import logging
Expand All @@ -33,7 +33,7 @@ def test_init(test_input, set_configdir):
Whether config has the correct config file.
"""

cfg = config(test_input)
cfg = Config(test_input)
assert cfg.configFile == test_input


Expand Down Expand Up @@ -70,9 +70,9 @@ def test_createConfig_settings(set_configdir):
If the default setting is the dictionary {"use_watcher": "None"}
"""

cfg = config("good_config.yaml")
cfg = Config("good_config.yaml")
cfg.createConfig()
assert cfg.settings == {"use_watcher": None}
assert cfg.settings == {"use_watcher": False}


# File with syntax error cannot pass the format check
Expand All @@ -95,7 +95,7 @@ def test_createConfig_wrong_import(set_configdir):
If createConfig raise any errors.
"""

cfg = config("minimal_wrong_import.yaml")
cfg = Config("minimal_wrong_import.yaml")
res = cfg.createConfig()
assert res == -1

Expand All @@ -107,7 +107,7 @@ def test_createConfig_clean(set_configdir):
If createConfig does not raise any errors.
"""

cfg = config("good_config.yaml")
cfg = Config("good_config.yaml")
try:
cfg.createConfig()
except Exception as exc:
Expand All @@ -117,49 +117,47 @@ def test_createConfig_clean(set_configdir):
def test_createConfig_noActor(set_configdir):
"""Tests if AttributeError is raised when there are no actors."""

cfg = config("no_actor.yaml")
cfg = Config("no_actor.yaml")
with pytest.raises(AttributeError):
cfg.createConfig()


def test_createConfig_ModuleNotFound(set_configdir):
"""Tests if an error is raised when the package can"t be found."""

cfg = config("bad_package.yaml")
cfg = Config("bad_package.yaml")
res = cfg.createConfig()
assert res == -1


def test_createConfig_class_ImportError(set_configdir):
"""Tests if an error is raised when the class name is invalid."""

cfg = config("bad_class.yaml")
cfg = Config("bad_class.yaml")
res = cfg.createConfig()
assert res == -1


def test_createConfig_AttributeError(set_configdir):
"""Tests if AttributeError is raised."""

cfg = config("bad_class.yaml")
cfg = Config("bad_class.yaml")
res = cfg.createConfig()
assert res == -1


def test_createConfig_blank_file(set_configdir):
"""Tests if a blank config file raises an error."""

cfg = config("blank_file.yaml")
with pytest.raises(TypeError):
cfg.createConfig()
Config("blank_file.yaml")


def test_createConfig_nonsense_file(set_configdir, caplog):
"""Tests if an improperly formatted config raises an error."""

cfg = config("nonsense.yaml")
with pytest.raises(TypeError):
cfg.createConfig()
Config("nonsense.yaml")


def test_acyclic_graph(set_configdir):
Expand All @@ -175,7 +173,7 @@ def test_cyclic_graph(set_configdir):
def test_saveActors_clean(set_configdir):
"""Compares internal actor representation to what was saved in the file."""

cfg = config("good_config.yaml")
cfg = Config("good_config.yaml")
cfg.createConfig()
cfg.saveActors()

Expand All @@ -186,3 +184,10 @@ def test_saveActors_clean(set_configdir):
originalKeys = len(cfg.actors.keys())

assert savedKeys == originalKeys


def test_config_settings_read(set_configdir):
cfg = Config("minimal_with_settings.yaml")
cfg.createConfig()

assert "store_size" in cfg.settings
Loading

0 comments on commit 5c2cf8b

Please sign in to comment.