In [1]:
import sys, os
import numpy as np
import pandas as pd
import matplotlib
from matplotlib import pyplot as plt

sys.path.insert(1, os.path.join(sys.path[0], '../../'))  # for importing local packages from src

In [189]:
class ExperimentDataset:
    """Wrapper class for handling a dataset from a single RC experiment run."""

    def __init__(self, dataset_df=None, csv_path=None):
        """Load a experiment dataset from csv file."""
        self._inputs: pd.DataFrame = None
        self._outputs: pd.DataFrame = None
        self._state: pd.DataFrame = None

        self._input_keys = None
        self._output_keys = None
        self._state_keys = None

        if dataset_df is not None:
            self.load_data(dataset_df)
        elif csv_path is not None:
            self.load_dataframe(csv_path)
        else:
            raise Exception('Must set kwarg "dataset_df" or "csv_path"')

    def load_dataframe(self, csv_path):
        df = pd.read_csv(csv_path)
        self.load_data(df)

    def load_data(self, dataset_df):
        self._inputs = dataset_df[dataset_df["type"] == "INPUT"].dropna(
            how="all", axis=1
        )
        self._outputs = dataset_df[dataset_df["type"] == "OUTPUT"].dropna(
            how="all", axis=1
        )
        self._state = dataset_df[dataset_df["type"] == "STATE"].dropna(
            how="all", axis=1
        )
        self._state["state_id"] = self._state["state_id"].astype(int)
        assert len(self._inputs) == len(
            self._outputs
        ), "Input and output set have different lengths."

        input_col_names = self._inputs.columns
        self._input_keys = tuple(filter(lambda x: x.startswith("input_"), input_col_names))
        output_col_names = self._outputs.columns
        self._output_keys = tuple(filter(lambda x: x.startswith("output_"), output_col_names))
        state_col_names = self._state.loc[
            :, ~self._state.columns.isin(["state_id", "state_type"])
        ].columns
        self._state_keys = tuple(filter(lambda x: x.startswith("state_"), state_col_names))

        self.cache_state()

    def cache_state(self) -> tuple:
        states = self._state
        n_runs = self.n_runs()
        n_steps = self.n_steps()
        state_size = self.state_size()
        state_vars = self.get_state_variables()
        n_vars = len(state_vars)

        self._state_nd = np.empty((n_runs, n_steps, state_size, n_vars))
        
        node_ids = states['state_id'].unique()
        node_map = { node_id : i for (i, node_id) in enumerate(node_ids)}
        runs_df = states.groupby(['run_id', 'state_id'])
        for (i_run, i_node), run_df in runs_df:
            for i_var, var in enumerate(state_vars):
                self._state_nd[i_run, :, node_map[i_node], i_var] = run_df.loc[:, var]

    def get_input_variables(self) -> tuple:
        """Get the input keys available."""
        return self._input_keys

    def get_output_variables(self) -> tuple:
        """Get the input output available."""
        return self._output_keys

    def get_targets(self) -> tuple:
        """Get the target keys available."""
        return (*self.get_input_variables(), *self.get_output_variables())

    def get_state_variables(self) -> tuple:
        """Get the state variables available."""
        return self._state_keys

    def n_runs(self) -> int:
        return len(self._inputs.groupby("run_id"))

    def n_steps(self) -> int:
        return self._inputs.groupby("run_id").size()[0]

    def state_size(self) -> int:
        return len(self._state.groupby("state_id"))

    def get_target(self, target_key, run_id) -> pd.Series:
        "Get a target signal as pandas Series by the target key."
        assert (
            target_key in self.get_targets()
        ), f"{target_key} not in available targets."
        source = self._inputs if target_key.startswith("input_") else self._outputs
        source = source.groupby("run_id").get_group(run_id)
        target_series = source[target_key]
        target_series.index = source["time"]
        return target_series

    def get_state(self, state_key, run_id) -> pd.DataFrame:
        "Get the entire reservoir state of variable as pandas DataFrame by the state key."
        assert (
            state_key in self.get_state_variables()
        ), f"{state_key} not in available state variables."

        i_state = self.get_state_variables().index(state_key)
        return self._state_nd[run_id, :, :, i_state]
        # source = self._state.groupby("run_id").get_group(run_id)
        # return source.pivot(index="time", columns=["state_id"], values=state_key)

    def __repr__(self) -> str:
        return (
            f"Dataset properties:\n"
            f"\tn_runs:     {self.n_runs():>3}\n"
            f"\tn_steps:    {self.n_steps():>3}\n"
            f"\tstate_size: {self.state_size():>3}\n"
            f"\nAvailable targets: \n\t{', '.join(self.get_targets())}\n"
            f"\nAvailable state variables: \n\t{', '.join(self.get_state_variables())}\n"
        )



In [191]:
dataset = ExperimentDataset(dataset_df=dataset_df)

In [193]:
%time dataset.get_state('state_Tlc', 0)
pass

Wall time: 0 ns


In [138]:
%load_ext line_profiler

In [181]:
states = dataset._state

n_runs = dataset.n_runs()
n_steps = dataset.n_steps()
state_size = dataset.state_size()
n_vars = len(dataset.get_state_variables())

states_nd = np.empty((n_runs, n_steps, state_size, n_vars))

node_ids = states['state_id'].unique()
node_map = { node_id : i for (i, node_id) in enumerate(node_ids)}

runs_df = states.groupby(['run_id', 'state_id'])
state_vars = dataset.get_state_variables()
for (i_run, i_node), run_df in runs_df:
  for i_var, var in enumerate(state_vars):
    states_nd[i_run, :, node_map[i_node], i_var] = run_df.loc[:, var]

# states.groupby(['run_id']).apply(lambda x : x.pivot(index="time", columns=["state_id"], values='state_Tlc'))

# for dataset.

In [203]:
def direct_target_generator(dataset: ExperimentDataset, target: str, run_ids: [int]):
  """Returns a function that generates the target from the run id."""
  assert target in dataset.get_targets(), f"{target} not available in dataset."

  # preload data in numpy array for performance reasons
  data = np.empty((len(run_ids), dataset.n_steps()))
  for run_id in run_ids:
    data[run_id, :] = dataset.get_target(target, run_id).to_numpy()

  for run_id in run_ids:
    yield data[run_id, :]


def direct_reservoir_generator(dataset: ExperimentDataset, state_var: str, run_ids: [int]):
  """Returns a function that generates the reservoir from the run id."""
  assert state_var in dataset.get_state_variables(), f"{state_var} not available in dataset."

  for run_id in run_ids:
    yield dataset.get_state(state_var, run_id)


target_generator = direct_target_generator(dataset, TARGET, RUN_IDS)
reservoir_generator = direct_reservoir_generator(dataset, STATE_VAR, RUN_IDS)

print(next(target_generator).shape)
print(next(reservoir_generator).shape)

(168,)
(168, 360)


In [28]:
def get_state_random_subset(state: pd.DataFrame, state_size: int) -> pd.DataFrame:
  choice = np.random.choice(state.shape[1], size=state_size, replace=False)
  return state.iloc[:, choice]


def preprocess_data(dataset, run_ids, target_generator, reservoir_generator,  
                    state_size=32, warmup_steps=0, day_mask=None):
  # 1. Take a random subsample of observation nodes
  state_choice = np.random.choice(dataset.state_size(), size=state_size, replace=False)
  
  # 2. Cast target and reservoir state into NumPy ndarrays.
  X = np.empty((len(run_ids), dataset.n_steps(), state_size)) # shape (runs, time_steps, nodes)
  y = np.empty((len(run_ids), dataset.n_steps()))             # shape (runs, time_steps)
  
  for i_run, run_state in enumerate(reservoir_generator):
    X[i_run, :, :] = run_state[:, state_choice]
    
  for i_run, run_target in enumerate(target_generator):
    y[i_run, :] = run_target

  # 3. Masks are applied.
  if day_mask is None: 
    time_mask = np.ones(X.shape[1], dtype=bool)
  else:
    n_days = X.shape[1] // len(day_mask)
    assert dataset.n_steps() % len(day_mask) == 0, "Dataset time steps must be multiple of day mask."
    time_mask = np.tile(day_mask, n_days) 
  
  time_mask[:warmup_steps] = False
  X = X[:, time_mask, :]
  y = y[:, time_mask]

  # 4. Normalize target and reservoir states
  X = (X - X.mean()) / X.std()
  y = (y - y.mean()) / y.std()

  return X, y

### Import dataset

In [194]:
from src.learning.preprocessing import generate_mask

csv_path = '../datasets/hydroshoot_large_trimmed.csv'
dataset = ExperimentDataset(csv_path=csv_path)

RUN_IDS = np.arange(dataset.n_runs())
WARMUP_STEPS = 4 * 24
DAY_MASK = generate_mask(5, 21)

STATE_SIZE = 16

TARGET = 'input_Tac'
STATE_VAR = 'state_An'

In [204]:
target_generator = direct_target_generator(dataset, TARGET, RUN_IDS)
reservoir_generator = direct_reservoir_generator(dataset, STATE_VAR, RUN_IDS)

In [208]:
np.random.seed(42)

X, y = preprocess_data(dataset, RUN_IDS, target_generator, reservoir_generator, state_size=STATE_SIZE,  warmup_steps=WARMUP_STEPS, day_mask=DAY_MASK)

print(X.shape)
print(y.shape)   

(84, 48, 16)
(84, 48)


  X = (X - X.mean()) / X.std()
  y = (y - y.mean()) / y.std()
  y = (y - y.mean()) / y.std()
