Skip to content

Commit

Permalink
Implementing a callback using multiprocessing
Browse files Browse the repository at this point in the history
Implementing an iterator on the SimRunner for retrieving the results.
Fixes in all modules
Much better documentation
Going to be released as 4.0
  • Loading branch information
nunobrum committed Apr 23, 2023
1 parent e3b6dce commit 9958a26
Show file tree
Hide file tree
Showing 17 changed files with 466 additions and 118 deletions.
50 changes: 39 additions & 11 deletions PyLTSpice/client_server/sim_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,12 +23,19 @@
import io
import pathlib
import time
from collections import OrderedDict
from dataclasses import dataclass


class SimClientInvalidRunId(LookupError):
"""Raised when asking for a run_no that doesn't exist"""
...

@dataclass
class JobInformation:
"""Contains information about pending simulation jobs"""
run_number: int # The run id that is returned by the Server and which identifies the server
file_dir : pathlib.Path

# class RunIterator(object):
#
Expand All @@ -54,6 +61,10 @@ class SimClient(object):
The run() method will transfer the netlist for the server, execute a simulation and transfer the simulation results
back to the client.
Data is returned from the server inside a zipfie which is copied into the directory defined when the job was
created, /i.e./ run() method called.
Two lists are kept by this class:
* A list of started jobs (started_jobs) and,
Expand All @@ -62,6 +73,10 @@ class SimClient(object):
This distinction is important because the data is erased on the server side when the data is transferred.
This class implements an iterator that is to be used for retrieving the job. See the example below.
The iterator polls the server with a time interval defined by the attribute ``minimum_time_between_server_calls``.
This attribute is set to 0.2 seconds by default, but it can be overriden.
Usage:
.. code-block:: python
Expand Down Expand Up @@ -90,9 +105,11 @@ def __init__(self, host_address, port):
# print(self.server.system.listMethods())
self.session_id = self.server.start_session()
print("started ", self.session_id)
self.started_jobs = [] # This list keeps track of started jobs on the server
self.stored_jobs = [] # This list keeps track of finished simulations that haven't yet been transferred.
self.started_jobs = OrderedDict() # This list keeps track of started jobs on the server
self.stored_jobs = OrderedDict() # This list keeps track of finished simulations that haven't yet been transferred.
self.completed_jobs = 0
self.minimum_time_between_server_calls = 0.2 # Minimum time between server calls
self._last_server_call = time.clock()

def __del__(self):
print("closing session ", self.session_id)
Expand All @@ -107,9 +124,10 @@ def run(self, circuit):
:param circuit: path to the netlist file containing the simulation directives.
:type circuit: pathlib.Path or str
:returns: identifier on the server of the simulation.
:rtype int:
:rtype: int
"""
circuit_name = pathlib.Path(circuit).name
circuit_path = pathlib.Path(circuit)
circuit_name = circuit_path.name
if os.path.exists(circuit):
# Create a buffer to store the zip file in memory
zip_buffer = io.BytesIO()
Expand All @@ -125,22 +143,28 @@ def run(self, circuit):
zip_data = zip_buffer.read()

run_id = self.server.run(self.session_id, circuit_name, zip_data)
self.started_jobs.append(run_id)
job_info = JobInformation(run_number=run_id, file_dir=circuit_path.parent)
self.started_jobs[job_info] = job_info
return run_id
else:
print(f"Circuit {circuit} doesn't exit")
return -1

def get_runno_data(self, runno) -> str:
"""Returns the simulation output data inside a zip file name."""
"""
Returns the simulation output data inside a zip file name.
:rtype: str
"""
if runno not in self.stored_jobs:
raise SimClientInvalidRunId(f"Invalid Job id {runno}")

zip_filename, zipdata = self.server.get_files(self.session_id, runno)
self.stored_jobs.remove(runno)
job = self.stored_jobs.pop(runno) # Removes it from stored jobs
self.completed_jobs += 1
if zip_filename != '':
with open(zip_filename, 'wb') as f:
store_path = job.file_dir
with open(store_path / zip_filename, 'wb') as f:
f.write(zipdata.data)
return zip_filename

Expand All @@ -152,11 +176,15 @@ def __next__(self):
status = self.server.status(self.session_id)
if len(status) > 0:
runno = status.pop(0)
self.started_jobs.remove(runno) # Job is taken out of the started jobs list
self.stored_jobs.append(runno) # and is appended to the stored jobs
self.stored_jobs[runno] = self.started_jobs.pop(runno) # Job is taken out of the started jobs list and
# is added to the stored jobs
return runno
else:
time.sleep(0.2) # Go asleep for a sec
now = time.clock()
delta = self.minimum_time_between_server_calls - (now - self._last_server_call)
if delta > 0:
time.sleep(delta) # Go asleep for a sec
self._last_server_call = now

# when there are no pending jobs left, exit the iterator
raise StopIteration
Expand Down
6 changes: 3 additions & 3 deletions PyLTSpice/client_server/sim_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ def __init__(self, simulator, parallel_sims=4, output_folder='./temp1', port=900
)
self.server.register_introspection_functions()
self.server.register_instance(self)
self.sessions = {} # this will contain the session_id ids hashing their respective list of sim_taks
self.sessions = {} # this will contain the session_id ids hashing their respective list of sim_tasks
self.simulation_manager.start()
self.server_thread = threading.Thread(target=self.server.serve_forever, name="ServerThread")
self.server_thread.start()
Expand All @@ -64,7 +64,7 @@ def run(self, session_id, circuit_name, zip_data):
return runno

def start_session(self):
"""Returns an unique key that represents the session. It will be later used to sort the sim_taks belonging
"""Returns an unique key that represents the session. It will be later used to sort the sim_tasks belonging
to the session."""
session_id = str(uuid.uuid4()) # Needs to be a string, otherwise the rpc client can't handle it
print("Starting session ", session_id)
Expand Down Expand Up @@ -111,7 +111,7 @@ def get_files(self, session_id, runno) -> Tuple[str, Binary]:
return "", Binary(b'') # Returns and empty data

def close_session(self, session_id):
"""Cleans all the pending sim_taks with """
"""Cleans all the pending sim_tasks with """
for runno in self.sessions[session_id]:
self.simulation_manager.erase_files_of_runno(runno)
return True # Needs to return always something. None is not supported
Expand Down
8 changes: 4 additions & 4 deletions PyLTSpice/client_server/srv_sim_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
# |___/ |_|
#
# Name: srv_sim_runner.py
# Purpose: Manager of the simulation sim_taks on the server side
# Purpose: Manager of the simulation sim_tasks on the server side
#
# Author: Nuno Brum (nuno.brum@gmail.com)
#
Expand Down Expand Up @@ -56,8 +56,8 @@ def run(self) -> None:
"""This function makes a direct manipulation of the structures of SimRunner. This option is """
while True:
i = 0
while i < len(self.runner.sim_taks):
task = self.runner.sim_taks[i]
while i < len(self.runner.sim_tasks):
task = self.runner.sim_tasks[i]
if task.is_alive():
i += 1
else:
Expand All @@ -72,7 +72,7 @@ def run(self) -> None:
'stop': task.stop_time,
})
print(task, "is finished")
del self.runner.sim_taks[i]
del self.runner.sim_tasks[i]
print(self.completed_tasks[-1])
print(len(self.completed_tasks))

Expand Down
48 changes: 48 additions & 0 deletions PyLTSpice/sim/process_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
#!/usr/bin/env python
# coding=utf-8

# -------------------------------------------------------------------------------
# ____ _ _____ ____ _
# | _ \ _ _| | |_ _/ ___| _ __ (_) ___ ___
# | |_) | | | | | | | \___ \| '_ \| |/ __/ _ \
# | __/| |_| | |___| | ___) | |_) | | (_| __/
# |_| \__, |_____|_| |____/| .__/|_|\___\___|
# |___/ |_|
#
# Name: process_callback.py
# Purpose: Being able to execute callback in a separate process
#
# Author: Nuno Brum (nuno.brum@gmail.com)
#
# Created: 23-04-2023
# Licence: refer to the LICENSE file
# -------------------------------------------------------------------------------
"""
"""
from __future__ import annotations

from multiprocessing import Process, Queue
from typing import Any


class ProcessCallback(Process):
"""
Wrapper for the callback function
"""
def __init__(self, raw, log, group=None, name=None, *, daemon: bool | None = ...) -> None:
super().__init__(group=group, name=name, daemon=daemon)
self.queue = Queue()
self.raw_file = raw
self.log_file = log

def run(self):
ret = self.callback(self.raw_file, self.log_file)
if ret is None:
ret = "Callback doesn't return anything"
self.queue.put(ret)

@staticmethod
def callback(raw_file, log_file) -> Any:
"""This function needs to be overriden"""
...
47 changes: 36 additions & 11 deletions PyLTSpice/sim/run_task.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,8 @@
import time
import traceback
from time import sleep
from typing import Callable, Union, Any, Tuple
from typing import Callable, Union, Any, Tuple, Type
from .process_callback import ProcessCallback


from .simulator import Simulator
Expand All @@ -48,7 +49,7 @@
class RunTask(threading.Thread):
"""This is an internal Class and should not be used directly by the User."""

def __init__(self, simulator: Simulator, runno, netlist_file: Path, callback: Callable[[Path, Path], Any],
def __init__(self, simulator: Simulator, runno, netlist_file: Path, callback: Union[Type[ProcessCallback], Callable[[Path, Path], Any]],
switches, timeout=None, verbose=True):
super().__init__(name=f"RunTask#{runno}")
self.start_time = None
Expand Down Expand Up @@ -95,12 +96,21 @@ def run(self):

if self.raw_file.exists() and self.log_file.exists():
if self.callback:
self.print_info(logger.info, "Simulation Finished. Calling...{}(rawfile, logfile)".format(self.callback.__name__))
self.print_info(logger.info, "Simulation Finished. Calling...{}(rawfile, logfile)".format(
self.callback.__name__))
try:
self.callback_return = self.callback(self.raw_file, self.log_file)
return_or_process = self.callback(self.raw_file, self.log_file)
except Exception as err:
error = traceback.format_tb()
error = traceback.format_tb(err.__traceback__)
self.print_info(logger.error, error)

if isinstance(return_or_process, ProcessCallback):
proc = return_or_process
proc.start()
self.callback_return = proc.queue.get()
proc.join()
else:
self.callback_return = return_or_process
else:
self.print_info(logger.info, 'Simulation Finished. No Callback function given')
else:
Expand All @@ -111,17 +121,32 @@ def run(self):
if self.log_file.exists():
self.log_file = self.log_file.replace(self.log_file.with_suffix('.fail'))

def wait_results(self) -> Tuple[str, str]:
def get_results(self) -> Union[None, Any, Tuple[str, str]]:
"""
Returns the simulation outputs if the simulation and callback function has already finished.
If the simulation is not finished, it simply returns None. If no callback function is defined, then
it returns a tuple with (raw_file, log_file). If a callback function is defined, it returns whatever
the callback function is returning.
"""
if self.is_alive():
return None

if self.retcode == 0: # All finished OK
if self.callback:
return self.callback_return
else:
return self.raw_file, self.log_file
else:
return '', ''

def wait_results(self) -> Union[Any, Tuple[str, str]]:
"""
Waits for the completion of the task and returns a tuple with the raw and log files.
:returns: Tupple with the path to the raw file and the path to the log file
:returns: Tuple with the path to the raw file and the path to the log file
:rtype: tuple(str, str)
"""
while self.is_alive() or self.retcode == -1:
sleep(0.1)
if self.retcode == 0: # All finished OK
return self.raw_file, self.log_file
else:
return '', ''
return self.get_results()


10 changes: 6 additions & 4 deletions PyLTSpice/sim/sim_batch.py
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ def __init__(self, netlist_file: Union[str, Path], parallel_sims: int = 4, timeo

def setLTspiceRunCommand(self, spice_tool: Union[str, Simulator]) -> None:
"""
*(Deprecated)*
Manually setting the LTSpice run command.
:param spice_tool: String containing the path to the spice tool to be used, or alternatively the Simulator
Expand All @@ -146,10 +147,11 @@ def setLTspiceRunCommand(self, spice_tool: Union[str, Simulator]) -> None:
:return: Nothing
:rtype: None
"""
self.runner.setRunCommand(spice_tool)
self.runner.set_run_command(spice_tool)

def add_LTspiceRunCmdLineSwitches(self, *args) -> None:
"""
*(Deprecated)*
Used to add an extra command line argument such as -I<path> to add symbol search path or -FastAccess
to convert the raw file into Fast Access.
The arguments is a list of strings as is defined in the LTSpice command line documentation.
Expand Down Expand Up @@ -182,17 +184,17 @@ def wait_completion(self, timeout=None, abort_all_on_timeout=False) -> bool:

@property
def runno(self):
"""Legacy property"""
"""*(Deprecated)* Legacy property"""
return self.runner.runno

@property
def okSim(self):
"""Legacy property"""
"""*(Deprecated)* Legacy property"""
return self.runner.okSim

@property
def failSim(self):
"""Legacy property"""
"""*(Deprecated)* Legacy property"""
return self.runner.failSim


Expand Down
Loading

0 comments on commit 9958a26

Please sign in to comment.