# 1. Loading partitioned dataset

In [1]:
sgs_dataset = catalog.load('sgs_dataset')

2021-02-23 23:48:24,672 - kedro.io.data_catalog - INFO - Loading data from `sgs_dataset` (PartitionedDataSet)...


  and should_run_async(code)


In [2]:
sgs_dataset['22_0531_pos.csv']().head()

Unnamed: 0,x,y,z,xx,yy,zz,speed_x,speed_y,speed_z,speed_xx,...,rupture_time,session_id,swell_dir,swell_hs,swell_tp,wave_dir,wave_hs,wave_tp,win_dir,wind_speed
0,-0.3,15.7,-16.0,0.0,0.0,244.5,0.0,0.0,0.0,0.0,...,0.0,22,93.0,0.64,3.86,201.2,2.71,13.3,85.7,5.37
1,-0.316316,15.708154,-16.020592,-0.061946,0.011578,244.50244,0.005226,-0.00177,-0.039725,-0.120705,...,0.0,22,93.0,0.64,3.86,201.2,2.71,13.3,85.7,5.37
2,-0.35929,15.730342,-16.075703,-0.231323,0.03553,244.50719,0.004281,0.000331,-0.067912,-0.212014,...,0.0,22,93.0,0.64,3.86,201.2,2.71,13.3,85.7,5.37
3,-0.416484,15.761158,-16.14907,-0.471763,0.053906,244.50937,-0.00048,0.003851,-0.075733,-0.261404,...,0.0,22,93.0,0.64,3.86,201.2,2.71,13.3,85.7,5.37
4,-0.478907,15.794098,-16.218199,-0.738104,0.057024,244.50912,-0.002962,0.003428,-0.060947,-0.263505,...,0.0,22,93.0,0.64,3.86,201.2,2.71,13.3,85.7,5.37


# 2.Transforming Positions

In [3]:
import math as mt
import numpy as np
import pandas as pd
from tqdm import tqdm
from typing import Tuple, Dict, Callable, Any, List, Union


def transform_positions(
        partitioned_input: Dict[str, Callable[[], Any]],
    ) -> Dict[str, pd.DataFrame]:

    result = {}

    for partition_key, partition_load_func in tqdm(
            sorted(partitioned_input.items())
        ):
        partition_data = partition_load_func()  # load the actual partition data
        result[partition_key] = apply_rotation_matrix(
            partition_data['x'].values,
            partition_data['y'].values,
            partition_data['z'].values,
            partition_data['xx'].values,
            partition_data['yy'].values,
            partition_data['zz'].values,
        )

    return result


def apply_rotation_matrix(
        X: np.ndarray,
        Y: np.ndarray,
        Z: np.ndarray,
        XX: np.ndarray,
        YY: np.ndarray,
        ZZ: np.ndarray,
        ignore_size: int = 500
    ) -> np.ndarray:
    '''
    Applies the rotation matrix to transform absolute coordinates to
    local coordinates.

        Parameters:
            X (np.ndarray): X in absolute coordinates
            Y (np.ndarray): Y in absolute coordinates
            Z (np.ndarray): Z in absolute coordinates
            XX (np.ndarray): XX in absolute coordinates
            YY (np.ndarray): YY in absolute coordinates
            ZZ (np.ndarray): ZZ in absolute coordinates
            ignore_size (:obj:`int`, optional): Steps to ignore in the
                beggining of the series to remove transitive effects.
                Default value is 500 points

        Returns:
            rotated (np.ndarray): local coordinates
    '''
    # Transform roll, pitch, and yaw to radians
    roll = (XX*mt.pi)/180
    pitch = (YY*mt.pi)/180
    yaw = (ZZ*mt.pi)/180

    # Apply rotation matrix to X and Y
    x = X * np.cos(yaw) + Y * np.sin(yaw)
    y = -X * np.sin(yaw) + Y * np.cos(yaw)

    #Ignore the first points due to transitions effects
    return pd.DataFrame({
        'x': x[ignore_size:],
        'y': y[ignore_size:],
        'z': Z[ignore_size:],
        'roll': roll[ignore_size:],
        'pitch': pitch[ignore_size:],
        'yaw': yaw[ignore_size:],
    })


  and should_run_async(code)


In [4]:
transformed_sgs_dataset = transform_positions(
    partitioned_input = sgs_dataset
)

100%|██████████| 100/100 [00:08<00:00, 11.47it/s]


In [5]:
transformed_sgs_dataset['22_0641_pos.csv']

Unnamed: 0,x,y,z,roll,pitch,yaw
0,-13.766946,-8.285960,-16.280176,-0.013512,0.000118,4.271540
1,-13.746646,-8.279071,-16.282722,-0.015320,0.000157,4.271894
2,-13.726923,-8.256329,-16.266628,-0.016726,0.000487,4.272355
3,-13.706932,-8.220344,-16.240454,-0.017374,0.000828,4.272714
4,-13.686217,-8.176853,-16.211172,-0.017050,0.000886,4.272765
...,...,...,...,...,...,...
10296,-13.345962,-7.450245,-16.820381,-0.019447,0.000266,4.272202
10297,-13.212179,-7.291303,-16.704054,-0.013375,-0.004215,4.270268
10298,-13.128090,-7.172471,-16.438196,-0.006742,-0.007402,4.267761
10299,-13.115679,-7.122371,-16.095242,-0.001158,-0.008409,4.265383


# 2. Creating Node to generate Master Data

In [6]:
from kedro_mlflow_tutorial.utils.estimator import estimate_natural_period

def generate_master_data(
        partitioned_input: Dict[str, Callable[[], Any]],
        expected_tp: float,
        target_column: 'str',
        delta: float,
        repetitions: int,
        window_size: int,
    ) -> pd.DataFrame:
    '''
    Generates the master table for training the regressor model, given
    a partitioned dataset of time series. It estimates the natural period
    of time series using the Welch's Method. It also calcuates statistics
    of the time series.

    Parameters:
        partitioned_input (Dict[str, Callable[[], Any]]): kedro partitioned
            dataset, which is dict of callables.
        expected_tp (float): expected value for natural period
        delta (float): the size of the segment used to filter
            around the given center [center-delta,center+delta].
        repetitions (int): number of repetitions to apply to
            each window extracted from the time serie.
        window_size (int): size of the window extracted from
            the time serie.

    Returns:

        (pd.DataFrame): generated data


    '''

    result = []

    for partition_key, partition_load_func in tqdm(
            sorted(partitioned_input.items())
        ):
        # Initializes regressor_data dict and sets partition key
        master_data = dict()
        master_data['partition_key'] = partition_key

        # Loading data with the partition function
        if isinstance(partition_load_func, pd.DataFrame):
            partition_data = partition_load_func
        else:
            partition_data = partition_load_func()

        # Calculating statistics
        statistics_data = calculate_position_statistics(partition_data)
        master_data = {**master_data, **statistics_data}

        # Calculating natural period
        master_data[target_column], _, _, _, _ =  estimate_natural_period(
            time_serie = partition_data[target_column.replace('tp_','')].values,
            expected_tp = expected_tp,
            delta = delta,
            repetitions = repetitions,
            window_size = window_size,
        )

        # Append generated data to final result
        result.append(master_data)

    return pd.DataFrame(result)


def calculate_position_statistics(
        data: pd.DataFrame,
    ):

    return {
        'off_x': np.mean(data['x'].values),
        'off_y': np.mean(data['y'].values),
        'off_z': np.mean(data['z'].values),
        'off_roll': np.mean(data['roll'].values),
        'off_pitch': np.mean(data['pitch'].values),
        'off_yaw': np.mean(data['yaw'].values),
        'std_x': np.std(data['x'].values),
        'std_y': np.std(data['y'].values),
        'std_z': np.std(data['z'].values),
        'std_roll': np.std(data['roll'].values),
        'std_pitch': np.std(data['pitch'].values),
        'std_yaw': np.std(data['yaw'].values),
   }

  and should_run_async(code)


In [7]:
master_dataset = generate_master_data(
    partitioned_input = transformed_sgs_dataset,
    expected_tp = catalog.load("params:estimator.expected_tp"),
    target_column = catalog.load("params:estimator.target_column"),
    delta = catalog.load("params:estimator.delta"),
    repetitions = catalog.load("params:estimator.repetitions"),
    window_size = catalog.load("params:estimator.window_size"),
)

master_dataset.head()

2021-02-23 23:48:35,025 - kedro.io.data_catalog - INFO - Loading data from `params:estimator.expected_tp` (MemoryDataSet)...
2021-02-23 23:48:35,029 - kedro.io.data_catalog - INFO - Loading data from `params:estimator.target_column` (MemoryDataSet)...
2021-02-23 23:48:35,031 - kedro.io.data_catalog - INFO - Loading data from `params:estimator.delta` (MemoryDataSet)...
2021-02-23 23:48:35,036 - kedro.io.data_catalog - INFO - Loading data from `params:estimator.repetitions` (MemoryDataSet)...
2021-02-23 23:48:35,038 - kedro.io.data_catalog - INFO - Loading data from `params:estimator.window_size` (MemoryDataSet)...


  and should_run_async(code)
100%|██████████| 100/100 [00:21<00:00,  4.61it/s]


Unnamed: 0,partition_key,off_x,off_y,off_z,off_roll,off_pitch,off_yaw,std_x,std_y,std_z,std_roll,std_pitch,std_yaw,tp_x
0,22_0001_pos.csv,-14.026536,-7.020101,-16.171808,-0.012766,0.0005,4.267703,0.000478,0.00252,6.504501e-07,7.134776e-07,1.5361e-08,6e-06,241.247035
1,22_0011_pos.csv,-13.371359,-5.791685,-16.171618,-0.011617,0.000485,4.263764,0.330609,0.209626,0.1255613,0.003697365,0.002940876,0.000752,244.871252
2,22_0021_pos.csv,-14.54789,-9.542539,-16.172542,-0.012871,0.000504,4.273202,0.219257,0.763296,0.06809988,0.001644666,0.001219647,0.002004,255.950522
3,22_0031_pos.csv,-15.055141,-9.202235,-16.171971,-0.013065,0.000517,4.272791,0.339604,0.494314,0.1648407,0.00634481,0.003801573,0.002368,247.662753
4,22_0041_pos.csv,-13.135397,-6.885676,-16.171561,-0.012622,0.00048,4.267296,0.31219,0.201261,0.1534357,0.005018621,0.003456325,0.001367,250.919454


# 3. Creating a Node to generate Training Data

In [10]:
from sklearn.preprocessing import StandardScaler
from sklearn.model_selection import train_test_split


def generate_training_data(
        master_dataset: pd.DataFrame,
        target_column: str,
        test_size: float,
        valid_size: float,
        shuffle: bool,
    ):

    # Generate Targets
    X, Y = define_target_data(target_column, master_dataset)

    # Scaling the data
    X_scaled, X_scaler, Y_scaled, Y_scaler = scale_regressor_data(X,Y)

    # Spliting the data
    X_train, X_valid, X_test,Y_train, Y_valid, Y_test = split_data(
        X,
        Y,
        test_size,
        valid_size,
        shuffle,
    )

    return X_train, X_valid, X_test, X_scaler, Y_train, Y_valid, Y_test, Y_scaler


def define_target_data(
        target_column: str,
        master_dataset: pd.DataFrame
    ) -> Tuple[pd.DataFrame]:

    # Drop target column in X
    X = master_dataset.drop(target_column, axis=1)

    # Get target data
    Y = master_dataset[target_column]

    return X, Y


def scale_regressor_data(
        X: pd.DataFrame,
        Y: pd.DataFrame,
    ) -> Tuple[pd.DataFrame]:

    # Set partition keys
    partition_keys = X['partition_key']

    # Scale X
    X_scaled, X_scaler = scale_data(X.drop('partition_key', axis=1))

    # Scale Y
    Y_scaled, Y_scaler = scale_data(Y.values.reshape(-1,1))

    X_scaled['partition_key'] = partition_keys

    return X_scaled, X_scaler, Y_scaled, Y_scaler


def scale_data(
        data: Union[pd.DataFrame, np.ndarray],
    ) -> Tuple:

    scaler = StandardScaler()
    scaler.fit(data)

    if isinstance(data, pd.DataFrame):
        scaled_data = pd.DataFrame(
            scaler.transform(data),
            columns = data.columns,
            index = data.index
        )
    elif isinstance(data, np.ndarray):
        scaled_data = pd.DataFrame(
            scaler.transform(data),
            columns=['y']
        )

    return scaled_data, scaler


def split_data(
        X: pd.DataFrame,
        Y: pd.DataFrame,
        test_size: float,
        valid_size: float,
        shuffle: bool,
    ) -> Tuple[pd.DataFrame]:

    x_train, x_test, y_train, y_test = train_test_split(
        X,
        Y,
        test_size=test_size,
        shuffle=shuffle
    )

    if valid_size:

        x_train, x_valid, y_train, y_valid = train_test_split(
            x_train,
            y_train,
            test_size=valid_size/(1 - test_size),
            shuffle=shuffle
        )

        return x_train, x_valid, x_test, y_train, y_valid, y_test
    else:
        return x_train, x_test, y_train, y_test

In [12]:
X_train, X_valid, X_test, X_scaler, Y_train, Y_valid, Y_test, Y_scaler = generate_training_data(
    master_dataset = master_dataset,
    target_column = catalog.load("params:estimator.target_column"),
    test_size = catalog.load("params:regressor.test_size"),
    valid_size = catalog.load("params:regressor.valid_size"),
    shuffle = catalog.load("params:regressor.shuffle"),
)

2021-02-23 23:50:03,487 - kedro.io.data_catalog - INFO - Loading data from `params:estimator.target_column` (MemoryDataSet)...
2021-02-23 23:50:03,488 - kedro.io.data_catalog - INFO - Loading data from `params:regressor.test_size` (MemoryDataSet)...
2021-02-23 23:50:03,491 - kedro.io.data_catalog - INFO - Loading data from `params:regressor.valid_size` (MemoryDataSet)...
2021-02-23 23:50:03,492 - kedro.io.data_catalog - INFO - Loading data from `params:regressor.shuffle` (MemoryDataSet)...


  and should_run_async(code)


# 5. Next steps

- 1. Update **nodes.py** file for data integration pipeline
- 2. Update **pipeline.py** file for data integration pipeline
- 3. Update **hooks.py** file
- 4. Update **conf/base/catalog.yml** file
- 5. **Commit code to repo**