# OSP Command Line Interface

> different functions for handling FMU or running co-simulations using cosim-cli
> (https://github.com/open-simulation-platform/cosim-cli)

In [None]:
#| default_exp osp_command_line

In [None]:
%load_ext autoreload
%autoreload 2

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


In [None]:
#| hide
from nbdev.showdoc import *

In [None]:
#| export
import datetime as dt
import io
import logging
import os
from dataclasses import dataclass
from enum import Enum
from subprocess import Popen, PIPE
from sys import platform
from typing import NamedTuple, List, Dict, Union, Tuple

import pandas
import yaml
from pyOSPParser.scenario import OSPScenario
from pyOSPParser.logging_configuration import OspLoggingConfiguration

In [None]:
#| export
try:
    _MODULE_PATH = os.path.dirname(os.path.abspath(__file__))
except NameError:
    _MODULE_PATH = os.path.dirname(os.path.abspath(""))

if platform.startswith("linux") or platform.startswith("darwin"):
    PATH_TO_COSIM = os.path.join(_MODULE_PATH, "..", "osp_cosim", "linux", "bin", "cosim")
else:
    PATH_TO_COSIM = os.path.join(_MODULE_PATH, "..", "osp_cosim", "win64", "bin", "cosim.exe")

@dataclass
class SimulationResult:
    """Simulation result"""
    result: Dict[str, pandas.DataFrame]
    log: str
    error: str = None


class SimulationError(Exception):
    """Exception for simulation error"""


class ModelVariables(NamedTuple):
    """ Representation of model variables from FMU's model description

    Attributes:
        parameters (List[Dict[str,str]], optional)
        inputs (List[Dict[str,str]], optional)
        outputs (List[Dict[str,str]], optional)
        others (List[Dict[str,str]], optional)
    """
    parameters: List[Dict[str, str]] = []
    inputs: List[Dict[str, str]] = []
    outputs: List[Dict[str, str]] = []
    others: List[Dict[str, str]] = []

    def get_parameters_names(self) -> List:
        """ Returns a list of the parameter names """
        return [variable['name'] for variable in self.parameters]

    def get_input_names(self) -> List:
        """ Returns a list of the parameter names """
        return [variable['name'] for variable in self.inputs]

    def get_output_names(self) -> List:
        """ Returns a list of the output names """
        return [variable['name'] for variable in self.outputs]

    def get_other_variable_names(self) -> List:
        """ Returns a list of the parameter names """
        return [variable['name'] for variable in self.others]


class FMUModelDescription(NamedTuple):
    """ Model description summary

    Model description summary used as a return type for get_model_description

    Attributes:
        name(str)
        uuid(str)
        model_variable (ModelVariables)
        description (str, optional)
        author (str, optional)
        version (str, optional)
    """
    name: str
    uuid: str
    model_variable: ModelVariables
    description: str = ''
    author: str = ''
    version: str = ''


class LoggingLevel(Enum):
    """Enum for logging level"""
    error = 40
    warning = 30
    info = 20
    debug = 10


def run_cli(args):
    """Run the command line """
    try:
        with Popen(args=args, shell=True, stdout=PIPE, stderr=PIPE) as proc:
            output = proc.stdout.read()
            log = proc.stderr.read()
    except OSError as exception:
        raise OSError(f'{output}, {log}, {exception}')

    # Catch errors

    return output.decode('utf-8'), log.decode('utf-8')


def run_single_fmu(
        path_to_fmu: str,
        initial_values: Dict[str, Union[float, bool]] = None,
        output_file_path: str = None,
        duration: float = None,
        step_size: float = None,
) -> SimulationResult:
    """Runs a single fmu simulation

    Args:
        path_to_fmu(str): file path to the target fmu
        initial_values(Dict[str, Union[float, bool]], optional): dictionary of initial values
        output_file_path(str, optional): file path for the output
        duration(float, optional): duration of simulation in seconds
        step_size(float, optional): duration
    Return:
        (tuple): tuple containing:
            result(pandas.DataFrame) simulation result
            log(str) simulation logging
    """
    fmu_name = os.path.splitext(os.path.basename(path_to_fmu))[0]
    delete_output = False
    if initial_values is None:
        initial_values = {}
    if output_file_path is None:
        # Delete output if the output file path is not given
        output_file_path = 'model-output.csv'
        delete_output = True
    mode = "run-single"

    assert os.path.isfile(PATH_TO_COSIM), f"The cosim CLI is not found: {PATH_TO_COSIM}"
    assert os.path.isfile(path_to_fmu), f"The fmu file is not found: {path_to_fmu}"

    # Create a list of initial values and set arguments for simulation
    args = [PATH_TO_COSIM, mode, path_to_fmu]
    args.extend('%s=%s' % (key, value) for key, value in initial_values.items())
    args.append('--output-file=%s' % output_file_path)
    if duration:
        args.append('-d%f' % duration)
    if step_size:
        args.append('-s%f' % step_size)

    #: Run the cosim to get the result in yaml format
    log, _ = run_cli(args)

    # Parse the output
    result_df = pandas.read_csv(output_file_path, index_col="Time")
    new_column_name = list(map(clean_header, result_df.columns))
    result_df.columns = new_column_name
    result = {fmu_name: result_df}
    if delete_output:
        os.remove(output_file_path)

    return SimulationResult(
        result=result,
        log=log
    )


def deploy_output_config(output_config: OspLoggingConfiguration, path: str):
    """Deploys a logging configiguration."""
    file_path = os.path.join(path, 'LogConfig.xml')

    xml_text = output_config.to_xml_str()

    with open(file_path, 'w+') as file:
        file.write(xml_text)


def deploy_scenario(scenario: OSPScenario, path: str):
    """Deploys a scenario"""
    file_path = os.path.join(path, scenario.get_file_name())

    with open(file_path, 'w+') as file:
        file.write(scenario.to_json())

    return file_path


def clean_header(header: str):
    """Clean header for simulation outputs"""
    if '[' in header:
        return header[0:header.rindex('[')-1]
    return header


def run_cosimulation(
        path_to_system_structure: str,
        # initial_values=None,
        logging_config: OspLoggingConfiguration = None,
        output_file_path: str = None,
        scenario: OSPScenario = None,
        duration: float = None,
        logging_level: LoggingLevel = LoggingLevel.warning,
        logging_stream: bool = False
) -> SimulationResult:
    """Runs a co-simulation

    Args:
        path_to_system_structure(str): The path to the system structure definition file/directory.
              If this is a file with .xml extension, or a directory that contains a file named
              OspSystemStructure.xml, it will be interpreted as a OSP system structure
              definition.
        logging_config(Dict[str, str], optional): dictionary of output configuration
        output_file_path(str, optional): file path for the output
        scenario(Dict[str, str], optional), dictionary of scenario
        duration(float, optional): duration of simulation in seconds
        logging_level(LoggingLevel, optional): Sets the detail/severity level of diagnostic output.
            Valid arguments are 'error', 'warning', 'info', and 'debug'. Default is 'warning'.
        logging_stream(bool, optional): logging will be returned as a string if True value is given.
            Otherwise, logging will be only displayed.
    Return:
        SimulationResult: object containing:
            result: simulation result
            log: simulation logging
            error: error from simulation
    """
    # Set loggers
    logger = logging.getLogger()
    if logging_stream:
        log_stream = io.StringIO()
        log_handler = logging.StreamHandler(log_stream)
        log_handler.setLevel(logging.INFO)
        logger.addHandler(log_handler)
    logger.setLevel(logging_level.value)

    # Set simulation parameters
    delete_output = False
    mode = "run"

    # Check if the cosim-cli exists and the system structure exists
    assert os.path.isfile(PATH_TO_COSIM), 'The cosim CLI is not found: %s' % PATH_TO_COSIM
    assert os.path.isdir(path_to_system_structure), \
        'The system structure directory is not found: %s' % path_to_system_structure
    path_to_osp_sys_structure = os.path.join(path_to_system_structure, 'OspSystemStructure.xml')
    assert os.path.isfile(path_to_osp_sys_structure), \
        'The system structure directory is not found: %s' % path_to_system_structure
    args = [PATH_TO_COSIM, mode, path_to_system_structure]

    if logging_config is not None:
        logger.info('Deploying the logging configuration.')
        deploy_output_config(logging_config, path_to_system_structure)
    if output_file_path is None:
        output_file_path = path_to_system_structure
        delete_output = True
    else:
        assert os.path.isdir(output_file_path), \
            f"The directory for the output does not exist: {output_file_path}."
        logger.info(
            'Output csv files will be saved in the following directory: %s', output_file_path
        )
    args.append('--output-dir=%s' % output_file_path)
    if scenario is not None:
        logger.info('Deploying the scenario.')
        scenario_file_path = deploy_scenario(scenario, path_to_system_structure)
        args.append('--scenario=%s' % scenario_file_path)
    if duration:
        logger.info('Simulation will run until {%s} seconds.', duration)
        args.append(f'--duration={duration}')
    args.append(f'--log-level={logging_level.name}')

    # Run simulation
    logger.info('Running simulation.')
    _, log = run_cli(args)
    logger.info(log)
    error = [  # Find a error in the lines of logging and gather with a line break in between
        line_with_break for line in log.split('\n') if line.startswith('error')
        for line_with_break in [line, '\n']
    ]
    if len(error) > 1:
        error = error[:-1]
    error = ''.join(error)

    # construct result from csvs that are created within last 30 seconds
    output_files = [
        file_name for file_name in os.listdir(output_file_path) if file_name.endswith('csv')
    ]
    ago = dt.datetime.now() - dt.timedelta(seconds=30)
    output_files = [
        file_name for file_name in output_files
        if dt.datetime.fromtimestamp(
            os.stat(os.path.join(output_file_path, file_name)).st_mtime
        ) > ago
    ]
    result = {}
    for file in output_files:
        simulator_name = file
        for _ in range(3):
            simulator_name = simulator_name[:simulator_name.rfind('_')]
        result[simulator_name] = pandas.read_csv(os.path.join(output_file_path, file), index_col="Time")
        result[simulator_name].drop(["StepCount"], axis=1, inplace=True)
        new_column_name = list(map(clean_header, result[simulator_name].columns))
        result[simulator_name].columns = new_column_name
    if delete_output:
        for file_name in output_files:
            os.remove(os.path.join(output_file_path, file_name))

    # Get logging data
    if logging_stream:
        # noinspection PyUnboundLocalVariable
        logger.removeHandler(log_handler)
        log_handler.flush()
        # noinspection PyUnboundLocalVariable
        log_stream.flush()
        log = log_stream.getvalue()
    else:
        log = ''

    return SimulationResult(result=result, log=log, error=error)


In [None]:
#| hide
import nbdev; nbdev.nbdev_export()

AssertionError: 