# Data-side Pipelining: Fusion [Demo]

In this document, we demo the *Fusion* module in our on-the-fly data pipelinings.

## Preparations

In [1]:
import os
import sys
sys.path.append(os.path.abspath('../..'))
# from tabluence.data.api.smartwatch.utilities.preparations import *
import pandas
import plotly_express as px
import numpy
import json
import pickle
import gzip
import matplotlib.pyplot as plt
from copy import deepcopy
from functools import reduce
from tqdm import tqdm
from datetime import datetime, timezone
from typing import Dict, List, Tuple, Union, Any, Iterator
import torch.utils.data.dataloader
from tabluence.data.api.smartwatch.utilities.timestamp import get_utc_date_from_utc_timestamp
from tabluence.data.api.smartwatch.data_manager.module import SmartwatchDataManager
from tabluence.deep_learning.data.dataset.smartwatch_study.single_slice import get_dataloaders, SmartwatchStudySingleSliceDataset, single_slice_collate_fn
from tabluence.deep_learning.data.pipeline.fusion.single_slice import SliceToSliceFusion

## Preparing data

In [2]:
dataloaders = get_dataloaders(
    batch_size=50,
    root_dir='../../resources/warrior_wellness/Analysis/local_repo/',
    data_manager_cache_filepath='../../resources/smartwatch_study/dataset_cache/window_1hour_stride_1second-datamanger001.pkl.gz',
    subject_splits={
        "train": [f'SWS_{i:02d}' for i in range(0,10)],
        "test": [f'SWS_{i:02d}' for i in range(10,15)]},
    dataset_config={
        'slice_lengths': [3600],
        'slice_time_step': 1,
        'label_milestone_per_window': 1.0,
        'metadata_cache_filepath': '../../resources/smartwatch_study/dataset_cache/window_1hour_stride_1second.pkl.gz',
        'no_cache': False,
        'parallel_threads': 10,
        'specific_stress_quantization_bins': [0.0, 0.5, 10.0],
        'overall_stress_quantization_bins': [0.0, 0.5, 10.0]
    },
    sampler_configs=dict(
       train=dict(
           negative_sample_count=1000,
            positive_sample_count=500,
            target_variable='overall_quantized_stress_value',
           split_name="train"
       ),
       test=dict(
               negative_sample_count=200,
            positive_sample_count=100,
            target_variable='overall_quantized_stress_value',
           split_name="test"
       )
        )
)

2022-07-27 10:29:50,695 - tabluence.deep_learning.data.dataset.smartwatch_study.single_slice.interface - INFO - initializing data manager...
2022-07-27 10:29:52,570 - tabluence.deep_learning.data.dataset.smartwatch_study.single_slice.interface - INFO - preparing the dataset...
2022-07-27 10:29:56,976 - tabluence.deep_learning.data.dataset.smartwatch_study.single_slice.dataset - INFO - 
	~> processing the metadata for building quantization based label layout
100%|██████████████████████████████| 1036509/1036509 [01:06<00:00, 15592.19it/s]
2022-07-27 10:31:03,454 - tabluence.deep_learning.data.dataset.smartwatch_study.single_slice.dataset - INFO - 
		~> label layout for general_quantized_stress_value stress category: [0.0, 0.5]
2022-07-27 10:31:03,455 - tabluence.deep_learning.data.dataset.smartwatch_study.single_slice.dataset - INFO - 
		~> label layout for induced_quantized_stress_value stress category: [0.0, 0.5]
2022-07-27 10:31:03,455 - tabluence.deep_learning.data.dataset.smartwatch

Let's read a single batch and analyze it:

In [3]:
for batch in dataloaders['train']:
    break

Here we see the different keys associated with different physiological signal types.

In [4]:
batch['slice'][0].keys()

dict_keys(['daily', 'respiration', 'stress', 'pulseOx'])

Now let's confirm that the splits have taken place correctly (please note that the provided set of subjects is the `super-set` and a `constraint`, therefore it is possible that some of them don't appear in the data on the account of the subject id not existing in the dataset.

In [5]:
train_subject_ids = set()
for batch in tqdm(dataloaders['train']):
    train_subject_ids = train_subject_ids.union(set(
        [e['subject_id'] for e in batch['meta']]))

100%|███████████████████████████████████████████| 30/30 [00:08<00:00,  3.37it/s]


In [6]:
train_subject_ids

{'SWS_01',
 'SWS_02',
 'SWS_03',
 'SWS_04',
 'SWS_05',
 'SWS_06',
 'SWS_07',
 'SWS_08',
 'SWS_09'}

Given that our dataloader is a single-slice one, each batch-element is a single slice from a single subject (please note that for details such as subject_id, etc. you can refer to `batch['meta']` as follows:

In [14]:
batch['meta'][0]

{'subject_id': 'SWS_07',
 'utc_timestamp_window': (1621965634.0, 1621969234.0),
 'overall_stress_value': 0.0,
 'general_stress_value': 0.0,
 'interpersonal_stress_value': 0.0,
 'utc_timestamp_for_stress_query': 1621969234.0,
 'induced_stress_value': 0.0,
 'general_quantized_stress_value': 0.0,
 'induced_quantized_stress_value': 0.0,
 'interpersonal_quantized_stress_value': 0.0,
 'overall_quantized_stress_value': 0.0}

In [15]:
batch['slice'][0]['stress'].head()

Unnamed: 0,utc_timestamp,stress_level_tsvalue,durationInSeconds,calendarDate,summaryId,user_id,startTimeInSeconds,startTimeOffsetInSeconds,utc_date,body_battery_tsvalue
9673,1621965780,82,86400,2021-05-25,x3a74ee1-60aca070-15180,SWS_07,1621926000,-25200,2021-05-25 18:03:00+00:00,
9674,1621965960,85,86400,2021-05-25,x3a74ee1-60aca070-15180,SWS_07,1621926000,-25200,2021-05-25 18:06:00+00:00,
9675,1621966140,82,86400,2021-05-25,x3a74ee1-60aca070-15180,SWS_07,1621926000,-25200,2021-05-25 18:09:00+00:00,
9676,1621966320,80,86400,2021-05-25,x3a74ee1-60aca070-15180,SWS_07,1621926000,-25200,2021-05-25 18:12:00+00:00,
9677,1621966500,76,86400,2021-05-25,x3a74ee1-60aca070-15180,SWS_07,1621926000,-25200,2021-05-25 18:15:00+00:00,


To demonstrate what `SliceToSliceFusion` does, we can first look at the instantiation described below.

Here is what it means:
* `timestamp_column`: indicating that in each one of the tables, what column-element should we use for the measurements of time.
* `sources`: the dictionary of mappings from one to another, in this case, we want to end up with one `source` in the *output* slice called `all_timeseries`.
    * Each element would indicate a source in the input slice and the list of columns which we want to include from it.
* The `nan_fill_method` describes the methodologies used for dealing with nans (assuming they will be sorted by the `timestamp_column`, which is done automatically by the package). Please note that they will be done in the order they are provided.

In [16]:
fusion = SliceToSliceFusion(
    config={
                'timestamp_column': 'utc_timestamp',
                'sources': {
                    'all_timeseries': {
                        'daily': ['heart_rate_tsvalue'],
                        'pulseOx': ['spo2_tsvalue'],
                        'respiration': ['epoch_to_breath_tsvalue'],
                        'stress': ['stress_level_tsvalue']
                    }
                },
                'nan_fill_method': ['ffill', 'bfill', 'fill_constant_0']
            }
)

Now let's apply it on the batch

In [17]:
fused_batch = fusion(batch)

Voila!

In [18]:
fused_batch['slice'][0].keys()

dict_keys(['all_timeseries'])

In [19]:
fused_batch['slice'][0]['all_timeseries'].head()

Unnamed: 0,utc_timestamp,heart_rate_tsvalue,spo2_tsvalue,epoch_to_breath_tsvalue,stress_level_tsvalue
0,1621965645,89,0.0,0.0,82.0
1,1621965660,89,0.0,0.0,82.0
2,1621965675,92,0.0,0.0,82.0
3,1621965690,92,0.0,0.0,82.0
4,1621965705,92,0.0,0.0,82.0


Let's see it in action:

In [20]:
from tabluence.deep_learning.pipeline.model import EarlyFusedSingleRNNSliceModel
from tabluence.deep_learning.data.tensorizer import CustomTensorizer

In [21]:
tensorizer = CustomTensorizer(
    config=dict(
        timestamp_column='utc_timestamp',
        value_config=dict(
            all_timeseries=dict(
                bring=[
                    'heart_rate_tsvalue',
                    'spo2_tsvalue',
                    'epoch_to_breath_tsvalue',
                    'stress_level_tsvalue'
                ]
            ),
        )),
    device=torch.device('cpu')
)

In [22]:
t_batch = tensorizer(fused_batch)

In [24]:
t_batch['slice'][4]['all_timeseries'].shape

torch.Size([240, 4])

In [25]:
[torch.isnan(e['all_timeseries']).any() for e in t_batch['slice']]

[tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False),
 tensor(False)]

In [26]:
model = EarlyFusedSingleRNNSliceModel(
    tensorizer=tensorizer,
    config=dict(
        single_source='all_timeseries',
        main_rnn=dict(
            rnn_model="LSTM",
            rnn_args=dict(
                input_size=4,
                hidden_size=32,
                bidirectional=True,
                batch_first=True,
                bias=False,
                dropout=0.2
            ),
            project_args=dict(
                input_dim=64,
                projection_dim=32
            ),  # will be projected to this dimension if not None.
        ),
        task=dict(
            target_in_meta='overall_quantized_stress_value',
            type='classification',
            loss_class='CrossEntropyLoss',
            loss_args=dict(),
            label_layout=[0, 0.5, 10]
        )
    ))

  "num_layers={}".format(dropout, num_layers))


In [27]:
model(fused_batch, mode='train')

{'model_outputs': {'latent_representations': tensor([[-0.3007,  0.5111,  0.2876,  ...,  0.6876,  0.0716, -0.1069],
          [-0.8048,  0.1494,  1.0275,  ...,  0.5339,  0.1797, -0.3993],
          [-0.9884,  0.3648,  1.2366,  ...,  0.3502,  0.2921, -0.7258],
          ...,
          [-0.6736,  0.4052,  1.2037,  ...,  0.1370,  0.3265, -0.6584],
          [-1.1156,  0.4234,  0.8974,  ...,  0.6583, -0.0040, -0.2825],
          [-0.5121,  0.1681,  0.3052,  ...,  1.0869, -0.2166,  0.3751]],
         grad_fn=<AddmmBackward0>),
  'logits': tensor([[-3.0806e-01,  9.4836e-02, -4.4606e-01],
          [ 5.5205e-01,  3.0528e-01,  3.0849e-01],
          [ 1.1540e+00,  8.1849e-01,  5.2714e-01],
          [ 1.6163e-01, -5.3412e-02, -2.6436e-01],
          [ 5.4339e-01,  4.0233e-01,  2.8843e-01],
          [ 6.0455e-01,  1.9705e-01,  4.0955e-01],
          [-3.1678e-01, -6.2187e-02, -2.3715e-01],
          [ 4.2760e-01, -4.5173e-01, -9.3514e-01],
          [ 6.7722e-01, -1.2872e-01, -3.8607e-01],
    