Skip to content

Commit

Permalink
core - updated socket server to be able to execute device actions
Browse files Browse the repository at this point in the history
  • Loading branch information
superstes committed Sep 20, 2021
1 parent 979c900 commit 4922a7f
Show file tree
Hide file tree
Showing 9 changed files with 211 additions and 101 deletions.
28 changes: 24 additions & 4 deletions code/core/device/input/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,36 +16,56 @@ class Go:
SQL_DATA_COMMAND = DEVICE_DICT['input']['data']
SQL_TASK_COMMAND = DEVICE_DICT['task']

def __init__(self, instance):
def __init__(self, instance: (GaInputDevice, GaInputModel), manually: bool = False):
self.instance = instance
self.database = GaDataDb()
self.name = instance.name
self.manually = manually

def start(self):
def start(self) -> bool:
task_instance_list = Check(
instance=self.instance,
model_obj=GaInputModel,
device_obj=GaInputDevice,
).get()

results = []

for task_dict in task_instance_list:
device = task_dict['device']
task_name = device.name
task_id = device.object_id
device_log(f"Processing device instance: \"{device.__dict__}\"", add=self.name, level=7)

if 'downlink' in task_dict:
data = Process(instance=task_dict['downlink'], nested_instance=device, script_dir='connection').start()
data = Process(
instance=task_dict['downlink'],
nested_instance=device,
script_dir='connection',
manually=self.manually,
).start()

else:
data = Process(instance=device, script_dir='input').start()
data = Process(
instance=device,
script_dir='input',
manually=self.manually,
).start()

if data is None:
device_log(f"No data received for device \"{task_name}\"", add=self.name, level=3)
results.append(False)

elif data is False:
device_log(f"Device \"{task_name}\" is in fail-sleep", add=self.name, level=4)
results.append(False)

else:
self.database.put(command=self.SQL_DATA_COMMAND % (datetime.now(), data, task_id))
device_log(f"Processing of input-device \"{task_name}\" succeeded", add=self.name, level=7)
results.append(True)

if len(results) > 0:
return all(results)

return False
81 changes: 54 additions & 27 deletions code/core/device/output/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from core.utils.threader import Loop as Thread
from core.device.output.condition.link import Go as GetGroupResult
from core.device.process import Go as Process
from core.device.log import device_logger
from core.utils.debug import device_log

from core.config.object.device.output import GaOutputDevice, GaOutputModel

Expand All @@ -19,28 +19,33 @@ class Go:
REVERSE_KEY_CONDITION = 'condition'
REVERSE_CONDITION_INTERVAL = 60

def __init__(self, instance):
def __init__(self, instance: (GaOutputModel, GaOutputDevice), action: str = None, manually: bool = True):
self.instance = instance
self.database = GaDataDb()
self.output_instance_list = []
self.processed_list = []
self.logger = device_logger(addition=instance.name)
self.action = action
self.name = instance.name
self.manually = manually

def start(self):
def start(self) -> bool:
condition_result = GetGroupResult(group=self.instance).go()
self._evaluate(condition_result=condition_result)
return self._evaluate(condition_result=condition_result)

def _evaluate(self, condition_result) -> None:
def _evaluate(self, condition_result) -> bool:
# todo: reverse type condition implementation => Ticket#11

if condition_result:
self.logger.write(f"Conditions for \"{self.instance.name}\" were met", level=6)

if self.manually or condition_result:
if condition_result:
device_log(f"Conditions for \"{self.instance.name}\" were met", add=self.name, level=6)

else:
device_log(f"{self.action.capitalize()}ing \"{self.instance.name}\" manually", add=self.name, level=5)

output_list = self.instance.output_object_list.copy()

self.logger.write(f"Output list of \"{self.instance.name}\": \"{output_list}\"", level=7)
device_log(f"Output list of \"{self.instance.name}\": \"{output_list}\"", add=self.name, level=7)
output_list.extend(self.instance.output_group_list)
self.logger.write(f"Output-group list of \"{self.instance.name}\": \"{self.instance.output_group_list}\"", level=7)
device_log(f"Output-group list of \"{self.instance.name}\": \"{self.instance.output_group_list}\"", add=self.name, level=7)

task_instance_list = []

Expand All @@ -54,35 +59,57 @@ def _evaluate(self, condition_result) -> None:
).get()
)

results = []

for task_dict in task_instance_list:
self._process(task_dict=task_dict)
results.append(self._process(task_dict=task_dict))

if len(results) > 0:
return all(results)

else:
self.logger.write(f"Conditions for \"{self.instance.name}\" were not met", level=3)
device_log(f"Conditions for \"{self.instance.name}\" were not met", add=self.name, level=3)

return False

def _process(self, task_dict: dict, reverse=False) -> bool:
device = task_dict['device']
self.logger.write(f"Processing device instance: \"{device.__dict__}\"", level=7)

if self.action == 'stop':
reverse = True

device_log(f"Processing device instance: \"{device.__dict__}\"", add=self.name, level=7)

if 'downlink' in task_dict:
result = Process(instance=task_dict['downlink'], nested_instance=device, script_dir='connection', reverse=reverse).start()
result = Process(
instance=task_dict['downlink'],
nested_instance=device,
script_dir='connection',
reverse=reverse,
manually=self.manually,
).start()

else:
result = Process(instance=device, script_dir='output', reverse=reverse).start()
result = Process(
instance=device,
script_dir='output',
reverse=reverse,
manually=self.manually,
).start()

if result is None:
self.logger.write(f"Processing of output-device \"{device.name}\" failed", level=3)
device_log(f"Processing of output-device \"{device.name}\" failed", add=self.name, level=3)
return False

elif result is False:
self.logger.write(f"Device \"{device.name}\" is in fail-sleep", level=4)
device_log(f"Device \"{device.name}\" is in fail-sleep", add=self.name, level=4)
return False

else:
self.logger.write(f"Processing of output \"{device.name}\" succeeded", level=7)
device_log(f"Processing of output \"{device.name}\" succeeded", add=self.name, level=7)

self.logger.write(f"Checking device \"{device.name}\" for reversion: reversible - {device.reverse}, active - {device.active}, "
f"reverse-type - {device.reverse_type}={self.REVERSE_KEY_TIME}", level=7)
device_log(f"Checking device \"{device.name}\" for reversion: reversible - {device.reverse}, active - {device.active}, "
f"reverse-type - {device.reverse_type}={self.REVERSE_KEY_TIME}", add=self.name, level=7)

if device.reverse == 1 and device.active:
# todo: write state to database => a service restart MUST NOT keep devices running
Expand Down Expand Up @@ -114,7 +141,7 @@ def _condition_members(self) -> dict:

def _reverse_timer(self, task_dict: dict) -> None:
device = task_dict['device']
self.logger.write(f"Entering wait timer ({device.reverse_type_data} secs) for output-device \"{device.name}\"", level=6)
device_log(f"Entering wait timer ({device.reverse_type_data} secs) for output-device \"{device.name}\"", add=self.name, level=6)

thread = Thread()

Expand All @@ -126,14 +153,14 @@ def _reverse_timer(self, task_dict: dict) -> None:
)
def thread_task(data):
self._process(task_dict=data, reverse=True)
self.logger.write(f"Reversing of device \"{device.name}\" finished", level=6)
device_log(f"Reversing of device \"{device.name}\" finished", add=self.name, level=6)
thread.stop_thread(description=device.name)

thread.start()

def _reverse_condition(self, task_dict: dict) -> None:
device = task_dict['device']
self.logger.write(f"Entering reverse-condition loop for output-device \"{device.name}\"", level=6)
device_log(f"Entering reverse-condition loop for output-device \"{device.name}\"", add=self.name, level=6)

thread = Thread()

Expand All @@ -145,10 +172,10 @@ def _reverse_condition(self, task_dict: dict) -> None:
)
def thread_task(data):
while not self._process(task_dict=data, reverse=True):
self.logger.write(f"Reversing of device \"{device.name}\" continues", level=8)
device_log(f"Reversing of device \"{device.name}\" continues", add=self.name, level=8)
sleep(self.REVERSE_CONDITION_INTERVAL)

self.logger.write(f"Reversing of device \"{device.name}\" finished", level=6)
device_log(f"Reversing of device \"{device.name}\" finished", add=self.name, level=6)
thread.stop_thread(description=device.name)

thread.start()
5 changes: 3 additions & 2 deletions code/core/device/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,13 @@ class Go:
INPUT_DATA_KEY = 'data'
SCRIPT_SUBPATH = "device/%s"

def __init__(self, instance, script_dir: str, reverse: bool = False, nested_instance=None):
def __init__(self, instance, script_dir: str, reverse: bool = False, nested_instance=None, manually: bool = False):
self.instance = instance
self.reverse = reverse
self.nested_instance = nested_instance
self.name = instance.name
self.script_dir = script_dir
self.manually = manually

def start(self) -> (str, None, bool):
# output:
Expand Down Expand Up @@ -61,7 +62,7 @@ def start(self) -> (str, None, bool):
return None

def _fail_check(self):
if self.instance.fail_sleep is not None:
if not self.manually and self.instance.fail_sleep is not None: # skip fail-sleep if executed manually
if datetime.now() < self.instance.fail_sleep:
device_log(f"Skipping execution of device \"{self.instance.name}\" since it has reached the max error threshold", add=self.name, level=4)
device_log(f"Device \"{self.instance.name}\" will be skipped until \"{self.instance.fail_sleep.strftime('%Y-%m-%d %H:%M:%S:%f')}\"", add=self.name, level=6)
Expand Down
38 changes: 12 additions & 26 deletions code/core/service/decision.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,35 +8,21 @@
from core.utils.debug import log


class Go:
def __init__(self, instance):
self.instance = instance

def start(self):
if isinstance(self.instance, (GaInputDevice, GaInputModel)):
self._device_input()

elif isinstance(self.instance, GaConditionGroup):
self._device_output()

elif isinstance(self.instance, GaTaskDevice):
self._core_timer()

elif isinstance(self.instance, SystemTask):
self.instance.execute()

else:
log(f"Service could not find a matching decision for instance: '{self.instance}'", level=5)

def _device_input(self):
def start(instance):
if isinstance(instance, (GaInputDevice, GaInputModel)):
from core.device.input.main import Go
Go(instance=self.instance).start()
Go(instance=instance).start()

def _device_output(self):
elif isinstance(instance, GaConditionGroup):
from core.device.output.main import Go
Go(instance=self.instance).start()
Go(instance=instance).start()

@staticmethod
def _core_timer():
elif isinstance(instance, GaTaskDevice):
log('Core timers are not yet implemented', level=3)
pass

elif isinstance(instance, SystemTask):
instance.execute()

else:
log(f"Service could not find a matching decision for instance: '{instance}'", level=5)
18 changes: 9 additions & 9 deletions code/core/service/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@
from core.service.timer import get as get_timer
from core.config import shared as shared_vars
from core.utils.debug import fns_log
from core.service.decision import Go as Decision
from core.service.decision import start as decision
from core.config.object.data.file import GaDataFile
from core.factory import config as factory_config

Expand Down Expand Up @@ -62,7 +62,7 @@ def __init__(self):
self.timer_list = get_timer(config_dict=self.CONFIG)
self.CONFIG_FILE = GaDataFile()
self._update_config_file()
self.THREAD = Thread()
self.THREADER = Thread()

def start(self):
try:
Expand All @@ -71,7 +71,7 @@ def start(self):
for instance in self.timer_list:
self._thread(instance=instance)

self.THREAD.start()
self.THREADER.start()
fns_log('Start - finished starting threads.')
self._status()

Expand Down Expand Up @@ -101,8 +101,8 @@ def reload(self, signum=None, stack=None):
# re-create the list of possible timers
self.timer_list = get_timer(config_dict=self.CONFIG)
# stop and reset all current threads
self.THREAD.stop()
self.THREAD.jobs = []
self.THREADER.stop()
self.THREADER.jobs = []
self._wait(seconds=self.WAIT_TIME)
# re-create all the threads
self.start()
Expand All @@ -121,7 +121,7 @@ def stop(self, signum=None, stack=None):
fns_log('Stopping service.')
self._signum_log(signum=signum)
fns_log('Stopping timer threads', level=6)
self.THREAD.stop()
self.THREADER.stop()
self._wait(seconds=self.WAIT_TIME)
fns_log('Service stopped.')
self._exit()
Expand All @@ -146,13 +146,13 @@ def _update_config_file(self):
self.CONFIG_FILE.update()

def _thread(self, instance):
@self.THREAD.thread(
@self.THREADER.thread(
sleep_time=int(instance.timer),
thread_data=instance,
description=instance.name,
)
def thread_task(data):
Decision(instance=data).start()
decision(instance=data)

@staticmethod
def _wait(seconds: int):
Expand All @@ -178,7 +178,7 @@ def _signum_log(signum):
fns_log(f"Service received signal {signum}", level=3)

def _status(self):
thread_list = self.THREAD.list()
thread_list = self.THREADER.list()
detailed_thread_list = '\n'.join([str(thread.__dict__) for thread in thread_list])
simple_thread_list = [thread.name for thread in thread_list]
fns_log(f"Status - threads running: {simple_thread_list}")
Expand Down

0 comments on commit 4922a7f

Please sign in to comment.