# How to perform experiments with damast

One of the main motivation of this library is to facilitate the development and evaluation of Machine-Learning models.
Hence, 'damast' offers a mini-framework and API to simplify the development of machine learning models.
This requires a 'hopefully' minimal set of constraints - as what is envisioned by the authors of this library - so that researchers and ML-starters have lower entry barrier into running machine learning.

That being said, we give an example here on a minimal experiment.

In [1]:
# The list of modules used for this example
from collections import OrderedDict
from typing import List, Optional
from pathlib import Path

# For performance reasons the underlying data handling library is 'vaex'
import vaex

# The current development has focused on a keras+tensorflow based Machine Learning setup
import tensorflow as tf
import keras

import damast
# You can define custom units to annotate data, but otherwise astropy units will be used
from damast.core.units import units
# Data ranges can be defined as list, or marked with a lower-bound (min), upper-bound (max)
from damast.core.datarange import MinMax, CyclicMinMax
# The AnnotatedDataFrame combines a data specification and actual 'numeric' data
from damast.core.dataframe import AnnotatedDataFrame
# An AnnotatedDataFrame contains MetaData to describe the data
from damast.core.metadata import MetaData

# Data processing is centered around a DataProcessingPipeline which consists of multiple PipelineElement being run
# in sequence
from damast.core.dataprocessing import DataProcessingPipeline, PipelineElement


# To allow the machine learning process to be simplified, we offer a 'BaseModel' that should be inherited from
from damast.ml.models.base import BaseModel

# The experiment setup
from damast.ml.experiments import Experiment, LearningTask, ForecastTask, ModelInstanceDescription, TrainingParameters

# Allow to generate data for this particular example that uses data from the maritime domain
from damast.domains.maritime.ais.data_generator import AISTestData, AISTestDataSpec

2023-03-31 12:40:42.548991: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F AVX512_VNNI FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-03-31 12:40:42.678089: I tensorflow/core/util/port.cc:104] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2023-03-31 12:40:43.256819: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2023-03-31 12:40:43.256866: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] 

To illustrate a full experiment, we require a data processing pipeline to be set up. This pipeline will extract all those features, that are necessary to train the Machine Learning model(s). The pipeline will run transformations on the data, as provided here by a LatLonTransformer.

In [2]:
class LatLonTransformer(PipelineElement):
    """
    The LatLonTransformer will consume a lat(itude) and a lon(gitude) column and perform
    cyclic normalization. It will add four column to a dataframe, namely lat_x, lat_y, lon_x, lon_y.
    """
    @damast.core.describe("Lat/Lon cyclic transformation")
    @damast.core.input({
        "lat": {"unit": units.deg},
        "lon": {"unit": units.deg}
    })
    @damast.core.output({
        "lat_x": {"value_range": MinMax(-1.0, 1.0)},
        "lat_y": {"value_range": MinMax(-1.0, 1.0)},
        "lon_x": {"value_range": MinMax(-1.0, 1.0)},
        "lon_y": {"value_range": MinMax(-1.0, 1.0)}
    })
    def transform(self, df: AnnotatedDataFrame) -> AnnotatedDataFrame:        
        lat_cyclic_transformer = vaex.ml.CycleTransformer(features=["lat"], n=180.0)
        lon_cyclic_transformer = vaex.ml.CycleTransformer(features=["lon"], n=360.0)

        _df = lat_cyclic_transformer.fit_transform(df=df)
        _df = lon_cyclic_transformer.fit_transform(df=_df)
        df._dataframe = _df
        return df


The selected example model here, will require the above listed features as input - and will likewise a likewise shaped output for simplicity.

In [3]:
class Baseline(BaseModel):
    """
    This is a placeholder ML model that illustrates the minimal
    requirements.
    """
    input_specs = OrderedDict({
        "lat_x": {"length": 1},
        "lat_y": {"length": 1},
        "lon_x": {"length": 1},
        "lon_y": {"length": 1}
    })

    output_specs = OrderedDict({
        "lat_x": {"length": 1},
        "lat_y": {"length": 1},
        "lon_x": {"length": 1},
        "lon_y": {"length": 1}
    })

    def __init__(self,
                 name: str,
                 features: List[str],
                 timeline_length: int,
                 output_dir: Path,
                 targets: Optional[List[str]] = None):
        self.timeline_length = timeline_length

        super().__init__(name=name,
                         output_dir=output_dir,
                         features=features,
                         targets=targets)

    def _init_model(self):
        features_width = len(self.features)
        targets_width = len(self.targets)

        self.model = tf.keras.models.Sequential([
            keras.layers.Flatten(input_shape=[self.timeline_length, features_width]),
            keras.layers.Dense(targets_width)
        ])


class BaselineA(Baseline):
    """Placeholder Model to illustrate the use of multiple models"""
    pass


class BaselineB(Baseline):
    """Placeholder Model to illustrate the use of multiple models"""
    pass

This example operates with synthetic, i.e. automatically generated data which is specific to the maritime domain.
You will see a previous of the first 10 columns when running the following cell.

In [4]:
import tempfile
import shutil

tmp_path = Path(tempfile.gettempdir()) / "test-output-ais_preparation"
if tmp_path.exists():
    shutil.rmtree(tmp_path)
tmp_path.mkdir(parents=True)

pipeline = DataProcessingPipeline(name="ais_preparation",
                                  base_dir=tmp_path) \
    .add("cyclic", LatLonTransformer())
features = ["lat_x", "lat_y", "lon_x", "lon_y"]

data = AISTestData(1000)
adf = AnnotatedDataFrame(dataframe=data.dataframe,
                         metadata=MetaData.from_dict(data=AISTestDataSpec.copy()))
dataset_filename = tmp_path / "test.hdf5"
adf.save(filename=dataset_filename)

adf.head(10)

#,mmsi,lon,lat,date_time_utc,sog,cog,true_heading,nav_status,rot,message_nr,source
0,1277812958,-156.102,66.8031,1983-11-14 09:14:28,0.787616,-0.174439,-0.11852,7,0,2,s
1,1277812958,-156.094,66.817,1983-11-14 09:15:20,1.25607,-0.588664,-0.546531,0,0,2,g
2,1277812958,-156.108,66.8583,1983-11-14 09:16:39,1.85139,-1.06371,-1.04555,1,0,2,g
3,1277812958,-156.127,66.8768,1983-11-14 09:18:31,1.49558,-1.14412,-1.13982,7,0,2,s
4,1277812958,-156.167,66.9099,1983-11-14 09:19:05,3.38864,-1.52297,-1.51993,1,0,2,s
5,1277812958,-156.195,66.8728,1983-11-14 09:19:38,2.51732,-1.50282,-1.42802,1,0,2,s
6,1277812958,-156.181,66.8417,1983-11-14 09:20:50,2.90567,-1.00414,-0.985169,1,0,2,s
7,1277812958,-156.169,66.8574,1983-11-14 09:22:16,1.41212,-0.658907,-0.646031,7,0,2,g
8,1277812958,-156.14,66.8957,1983-11-14 09:23:02,2.19664,-0.855472,-0.759963,7,0,2,s
9,1277812958,-156.156,66.904,1983-11-14 09:23:32,2.16217,-0.881218,-0.852941,0,0,2,s


A central idea to the experiment framework lies in providing a means for a consistent input and output to perform experiments. Hence, define a LearningTask (here: ForecastTask) that collects the learning parameters that define this task.

In [5]:
forecast_task = ForecastTask(
    label="forecast-ais-short-sequence",
    pipeline=pipeline, features=features,
    models=[ModelInstanceDescription(BaselineA, {}),
            ModelInstanceDescription(BaselineB, {}),
            ],
    group_column="mmsi",
    sequence_length=5,
    forecast_length=1,
    training_parameters=TrainingParameters(epochs=1,
                                           validation_steps=1)
)

The actual experimentation takes a single LearningTask as input and it will output

In [None]:
experiment = Experiment(learning_task=forecast_task,
                        input_data=dataset_filename,
                        output_directory=tmp_path)
report = experiment.run()
    
with open(report, "r") as f:
    print(f.read())        

INFO:damast:Recommended steps_per_epoch=53490.0
INFO:damast:Recommended steps_per_epoch=53490.0


Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 flatten (Flatten)           (None, 20)                0         
                                                                 
 dense (Dense)               (None, 4)                 84        
                                                                 
Total params: 84
Trainable params: 84
Non-trainable params: 0
_________________________________________________________________


INFO:damast:Recommended steps_per_epoch=53490.0
INFO:damast:Recommended steps_per_epoch=53490.0


Model: "sequential_1"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 flatten_1 (Flatten)         (None, 20)                0         
                                                                 
 dense_1 (Dense)             (None, 4)                 84        
                                                                 
Total params: 84
Trainable params: 84
Non-trainable params: 0
_________________________________________________________________


INFO:damast:Recommended steps_per_epoch=53490.0


batch_size: 2
evaluation:
  BaselineA-forecast-ais-short-sequence:
    loss: 0.020285729318857193
    mse: 0.020285729318857193
  BaselineB-forecast-ais-short-sequence:
    loss: 0.1729123294353485
    mse: 0.1729123294353485
evaluation_steps: 1
input_data: /tmp/test-output-ais_preparation/test.hdf5
label: damast-ml-experiment
learning_task:
  class_name: ForecastTask
  features: &id001
  - lat_x
  - lat_y
  - lon_x
  - lon_y
  forecast_length: 1
  group_column: mmsi
  label: forecast-ais-short-sequence
  models:
  - class_name: BaselineA
    module_name: __main__
    parameters: {}
  - class_name: BaselineB
    module_name: __main__
    parameters: {}
  module_name: damast.ml.experiments
  pipeline:
    base_dir: /tmp/test-output-ais_preparation
    name: ais_preparation
    steps:
    - - cyclic
      - class_name: LatLonTransformer
        module_name: __main__
        name_mappings: {}
  sequence_length: 5
  targets: *id001
  training_parameters:
    epochs: 1
    learning_rate: 0.

The outputs of an experiment are collected inside a dedicated (timestamped) folder. This folder will also contain a subfolder for each of the parametrized models that defines a LearningTask.

In [None]:
last_experiments = sorted([str(f) for f in Path(experiment.output_directory).glob(pattern="*") if f.is_dir()])
print("Last experiment in: ", last_experiments[-1])

experiment_folder = sorted([str(f) for f in Path(last_experiments[-1]).glob(pattern="*")])
file_listing = '\n'.join(experiment_folder)
print("Contents:\n")
print(file_listing)


Last experiment in:  /tmp/test-output-ais_preparation/20230331-104103-damast-ml-experiment
Contents:

/tmp/test-output-ais_preparation/20230331-104103-damast-ml-experiment/BaselineA-forecast-ais-short-sequence
/tmp/test-output-ais_preparation/20230331-104103-damast-ml-experiment/BaselineB-forecast-ais-short-sequence
/tmp/test-output-ais_preparation/20230331-104103-damast-ml-experiment/experiment-report.yaml
