In [1]:
import torch
from omegaconf import DictConfig
from pytorch_lightning import Trainer
from pytorch_lightning.callbacks import EarlyStopping, ModelCheckpoint
from pytorch_lightning.loggers import TensorBoardLogger, WandbLogger

import numpy as np
import pandas as pd

from tsl import logger
from tsl.data import ImputationDataset, SpatioTemporalDataModule
from tsl.data.preprocessing import StandardScaler
from tsl.datasets import AirQuality, MetrLA, PemsBay
from tsl.engines import Imputer
from tsl.experiment import Experiment
from tsl.metrics import numpy as numpy_metrics
from tsl.metrics import torch as torch_metrics
from tsl.nn.models import (BiRNNImputerModel, GRINModel, RNNImputerModel,
                           SPINHierarchicalModel, SPINModel)
from tsl.ops.imputation import add_missing_values
from tsl.transforms import MaskInput
from tsl.utils.casting import torch_to_numpy


def get_model_class(model_str):
    if model_str == 'rnni':
        model = RNNImputerModel
    elif model_str == 'birnni':
        model = BiRNNImputerModel
    elif model_str == 'grin':
        model = GRINModel
    elif model_str == 'spin':
        model = SPINModel
    elif model_str == 'spin-h':
        model = SPINHierarchicalModel
    else:
        raise NotImplementedError(f'Model "{model_str}" not available.')
    return model


def get_dataset(dataset_name: str, p_fault=0., p_noise=0.):
    if dataset_name.startswith('air'):
        return AirQuality(impute_nans=True, small=True)
    if dataset_name.endswith('_point'):
        p_fault, p_noise = 0., 0.25
        dataset_name = dataset_name[:-6]
    if dataset_name.endswith('_block'):
        p_fault, p_noise = 0.0015, 0.05
        dataset_name = dataset_name[:-6]
    if dataset_name == 'la':
        return add_missing_values(MetrLA(),
                                  p_fault=p_fault,
                                  p_noise=p_noise,
                                  min_seq=12,
                                  max_seq=12 * 4,
                                  seed=9101112)
    if dataset_name == 'bay':
        return add_missing_values(PemsBay(),
                                  p_fault=p_fault,
                                  p_noise=p_noise,
                                  min_seq=12,
                                  max_seq=12 * 4,
                                  seed=56789)
    raise ValueError(f"Dataset {dataset_name} not available in this setting.")

In [2]:
rand = np.random.random
randint = np.random.randint
shape = (32, 12, 6, 1)

mask = np.zeros(shape).astype(bool)
road_shape = mask.shape[1]
rand_mask = rand(road_shape) < 0.5
road_mask = np.zeros(shape).astype(bool)
road_mask[:, rand_mask] = True
mask = mask | road_mask

In [4]:
mask

array([[[[False],
         [False],
         [False],
         [False],
         [False],
         [False]],

        [[False],
         [False],
         [False],
         [False],
         [False],
         [False]],

        [[ True],
         [ True],
         [ True],
         [ True],
         [ True],
         [ True]],

        ...,

        [[False],
         [False],
         [False],
         [False],
         [False],
         [False]],

        [[False],
         [False],
         [False],
         [False],
         [False],
         [False]],

        [[False],
         [False],
         [False],
         [False],
         [False],
         [False]]],


       [[[False],
         [False],
         [False],
         [False],
         [False],
         [False]],

        [[False],
         [False],
         [False],
         [False],
         [False],
         [False]],

        [[ True],
         [ True],
         [ True],
         [ True],
         [ True],
         [ Tru

In [3]:
for i in range(12):
    print(np.sum(mask[:, i, :, :]))

0
0
192
192
192
0
0
0
0
0
0
0


In [2]:
dataset = get_dataset('air')

# encode time of the day and use it as exogenous variable
covariates = {'u': dataset.datetime_encoded('day').values}

# get adjacency matrix
adj = dataset.get_connectivity('distance')

In [3]:
dataset.mask.shape

(8759, 36)

In [4]:
np.sum(dataset.mask)/315324

0.8675299057477388

In [19]:
dataset.df

nodes,1001,1002,1003,1004,1005,1006,1007,1008,1009,1010,...,1027,1028,1029,1030,1031,1032,1033,1034,1035,1036
channels,0,0,0,0,0,0,0,0,0,0,...,0,0,0,0,0,0,0,0,0,0
datetime,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2,Unnamed: 13_level_2,Unnamed: 14_level_2,Unnamed: 15_level_2,Unnamed: 16_level_2,Unnamed: 17_level_2,Unnamed: 18_level_2,Unnamed: 19_level_2,Unnamed: 20_level_2,Unnamed: 21_level_2
2014-05-01 01:00:00,138.0,89.0,105.0,98.0,109.0,87.0,88.0,91.0,87.0,87.0,...,101.0,84.0,117.0,21.333334,97.0,87.0,74.0,94.0,112.0,109.0
2014-05-01 02:00:00,124.0,85.0,121.0,107.0,101.0,99.0,105.0,102.0,103.0,94.0,...,100.0,77.0,109.0,78.000000,97.0,84.0,84.0,101.0,123.0,114.0
2014-05-01 03:00:00,127.0,88.0,130.0,115.0,102.0,109.0,114.0,108.0,112.0,109.0,...,103.0,90.0,105.0,77.000000,103.0,83.0,100.0,112.0,143.0,126.0
2014-05-01 04:00:00,129.0,100.0,137.0,123.0,108.0,118.0,118.0,109.0,117.0,111.0,...,110.0,94.0,105.0,90.000000,107.0,88.0,103.0,120.0,138.0,130.0
2014-05-01 05:00:00,119.0,109.0,144.0,129.0,115.0,124.0,130.0,116.0,124.0,114.0,...,105.0,80.0,104.0,83.000000,111.0,85.0,108.0,125.0,145.0,137.0
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2015-04-30 19:00:00,86.0,72.0,70.0,72.0,73.0,63.0,65.0,69.0,74.0,66.0,...,130.0,136.0,79.0,69.000000,74.0,125.0,94.0,67.0,64.0,79.0
2015-04-30 20:00:00,87.0,73.0,73.0,76.0,83.0,63.0,61.0,62.0,62.0,68.0,...,133.0,124.0,64.0,67.000000,75.0,124.0,128.0,103.0,69.0,77.0
2015-04-30 21:00:00,80.0,74.0,80.0,70.0,84.0,69.0,72.0,75.0,77.0,73.0,...,117.0,123.0,72.0,70.000000,82.0,125.0,139.0,166.0,78.0,94.0
2015-04-30 22:00:00,98.0,87.5,87.0,95.5,90.5,76.0,79.0,83.0,82.0,93.5,...,178.0,99.5,94.0,74.000000,99.0,82.5,84.0,103.0,86.0,91.5


In [11]:
dataset.distance

array([[   0.        ,   10.01553231,   19.62147943, ..., 1903.71716932,
        1796.30609352, 1795.96589939],
       [  10.01553231,    0.        ,   10.11719073, ..., 1895.15226463,
        1787.72195802, 1787.39244677],
       [  19.62147943,   10.11719073,    0.        , ..., 1885.08052065,
        1777.64679953, 1777.31922415],
       ...,
       [1903.71716932, 1895.15226463, 1885.08052065, ...,    0.        ,
         107.63326768,  107.79738345],
       [1796.30609352, 1787.72195802, 1777.64679953, ...,  107.63326768,
           0.        ,    3.6701503 ],
       [1795.96589939, 1787.39244677, 1777.31922415, ...,  107.79738345,
           3.6701503 ,    0.        ]])

In [7]:
np.expand_dims(covariates['u'], axis=1).shape

(8759, 1, 2)

In [6]:
covariates['u'].shape

(8759, 2)

In [58]:
beijing_dataset = pd.read_csv('../../../AirData/AQI/Stations/merged_full.csv')
beijing_dataset['datetime'] = pd.to_datetime(beijing_dataset[['year', 'month', 'day', 'hour']])

In [67]:
stations = beijing_dataset.drop_duplicates(subset=["station"])[["station", "locationLatitude", "locationLongitude"]]


In [69]:
st_coord = stations.loc[:, ['locationLatitude', 'locationLongitude']]
from tsl.ops.similarities import geographical_distance
dist = geographical_distance(st_coord, to_rad=True).values

In [59]:
df_pivot = beijing_dataset.pivot(index="datetime", columns="station", values=["PM2.5", "TEMP"])
df_pivot.columns.names = ["channels", "nodes"]
df_pivot.columns = df_pivot.columns.swaplevel(0, 1) 
df_pivot.sort_index(axis=1, level=0, inplace=True)
df_pivot = df_pivot.rename(columns={'PM2.5': 0, 'TEMP': 1})

In [60]:
df_pivot

nodes,Aotizhongxin,Aotizhongxin,Changping,Changping,Dingling,Dingling,Dongsi,Dongsi,Guanyuan,Guanyuan,...,Nongzhanguan,Nongzhanguan,Shunyi,Shunyi,Tiantan,Tiantan,Wanliu,Wanliu,Wanshouxigong,Wanshouxigong
channels,0,1,0,1,0,1,0,1,0,1,...,0,1,0,1,0,1,0,1,0,1
datetime,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2,Unnamed: 13_level_2,Unnamed: 14_level_2,Unnamed: 15_level_2,Unnamed: 16_level_2,Unnamed: 17_level_2,Unnamed: 18_level_2,Unnamed: 19_level_2,Unnamed: 20_level_2,Unnamed: 21_level_2
2013-03-01 00:00:00,4.0,-0.7,3.0,-2.3,4.0,-2.3,9.0,-0.5,4.0,-0.7,...,5.0,-0.5,3.0,-0.9,6.0,-0.5,8.0,-0.7,9.0,0.3
2013-03-01 01:00:00,8.0,-1.1,3.0,-2.5,7.0,-2.5,4.0,-0.7,4.0,-1.1,...,8.0,-0.7,12.0,-1.1,6.0,-0.7,9.0,-1.1,11.0,-0.1
2013-03-01 02:00:00,7.0,-1.1,3.0,-3.0,5.0,-3.0,7.0,-1.2,3.0,-1.1,...,3.0,-1.2,14.0,-1.7,6.0,-1.2,3.0,-1.1,8.0,-0.6
2013-03-01 03:00:00,6.0,-1.4,3.0,-3.6,6.0,-3.6,3.0,-1.4,3.0,-1.4,...,5.0,-1.4,12.0,-2.1,6.0,-1.4,11.0,-1.4,8.0,-0.7
2013-03-01 04:00:00,3.0,-2.0,3.0,-3.5,5.0,-3.5,3.0,-1.9,3.0,-2.0,...,5.0,-1.9,12.0,-2.4,5.0,-1.9,3.0,-2.0,8.0,-0.9
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
2017-02-28 19:00:00,12.0,12.5,28.0,11.7,11.0,11.7,16.0,12.5,13.0,12.5,...,14.0,12.5,27.0,10.3,20.0,12.5,11.0,12.6,11.0,12.5
2017-02-28 20:00:00,13.0,11.6,12.0,10.9,13.0,10.9,18.0,11.6,20.0,11.6,...,18.0,11.6,47.0,9.8,11.0,11.6,15.0,9.4,13.0,11.6
2017-02-28 21:00:00,16.0,10.8,7.0,9.5,9.0,9.5,23.0,10.8,16.0,10.8,...,15.0,10.8,18.0,9.1,18.0,10.8,13.0,8.7,14.0,10.8
2017-02-28 22:00:00,21.0,10.5,11.0,7.8,10.0,7.8,23.0,10.5,11.0,10.5,...,11.0,10.5,18.0,7.1,15.0,10.5,12.0,7.8,12.0,10.5


In [6]:
import os
from typing import List, Optional, Sequence

import numpy as np
import pandas as pd

from tsl.data.datamodule.splitters import Splitter, disjoint_months
from tsl.data.synch_mode import HORIZON
from tsl.datasets.prototypes import DatetimeDataset
from tsl.datasets.prototypes.mixin import MissingValuesMixin
from tsl.utils import download_url, extract_zip


def infer_mask(df, infer_from='next'):
    """Infer evaluation mask from DataFrame. In the evaluation mask a value is 1
    if it is present in the DataFrame and absent in the :obj:`infer_from` month.

    Args:
        df (pd.Dataframe): The DataFrame.
        infer_from (str): Denotes from which month the evaluation value must be
            inferred. Can be either :obj:`previous` or :obj:`next`.

    Returns:
        pd.DataFrame: The evaluation mask for the DataFrame.
    """
    mask = (~df.isna()).astype('uint8')
    eval_mask = pd.DataFrame(index=mask.index, columns=mask.columns,
                             data=0).astype('uint8')
    if infer_from == 'previous':
        offset = -1
    elif infer_from == 'next':
        offset = 1
    else:
        raise ValueError('`infer_from` can only be one of {}'.format(
            ['previous', 'next']))
    months = sorted(set(zip(mask.index.year, mask.index.month)))
    length = len(months)
    for i in range(length):
        j = (i + offset) % length
        year_i, month_i = months[i]
        year_j, month_j = months[j]
        cond_j = (mask.index.year == year_j) & (mask.index.month == month_j)
        mask_j = mask[cond_j]
        offset_i = 12 * (year_i - year_j) + (month_i - month_j)
        mask_i = mask_j.shift(1, pd.DateOffset(months=offset_i))
        mask_i = mask_i[~mask_i.index.duplicated(keep='first')]
        mask_i = mask_i[np.in1d(mask_i.index, mask.index)]
        i_idx = mask_i.index
        eval_mask.loc[i_idx] = ~mask_i.loc[i_idx] & mask.loc[i_idx]
    return eval_mask


class AirQualitySplitter(Splitter):

    def __init__(self,
                 val_len: int = None,
                 test_months: Sequence = (3, 6, 9, 12)):
        super(AirQualitySplitter, self).__init__()
        self._val_len = val_len
        self.test_months = test_months

    def fit(self, dataset):
        nontest_idxs, test_idxs = disjoint_months(dataset,
                                                  months=self.test_months,
                                                  synch_mode=HORIZON)
        # take equal number of samples before each month of testing
        val_len = self._val_len
        if val_len < 1:
            val_len = int(val_len * len(nontest_idxs))
        val_len = val_len // len(self.test_months)
        # get indices of first day of each testing month
        delta = np.diff(test_idxs)
        delta_idxs = np.flatnonzero(delta > delta.min())
        end_month_idxs = test_idxs[1:][delta_idxs]
        if len(end_month_idxs) < len(self.test_months):
            end_month_idxs = np.insert(end_month_idxs, 0, test_idxs[0])
        # expand month indices
        month_val_idxs = [
            np.arange(v_idx - val_len, v_idx) - dataset.window
            for v_idx in end_month_idxs
        ]
        val_idxs = np.concatenate(month_val_idxs) % len(dataset)
        # remove overlapping indices from training set
        ovl_idxs, _ = dataset.overlapping_indices(nontest_idxs,
                                                  val_idxs,
                                                  synch_mode=HORIZON,
                                                  as_mask=True)
        train_idxs = nontest_idxs[~ovl_idxs]
        self.set_indices(train_idxs, val_idxs, test_idxs)


class AirQualitySmaller(DatetimeDataset, MissingValuesMixin):
    similarity_options = {'distance'}

    def __init__(self,
                 root: str = None,
                 impute_nans: bool = True,
                 test_months: Sequence = (3, 6, 9, 12),
                 infer_eval_from: str = 'next',
                 features: list = ['PM2.5'],
                 freq: Optional[str] = None,
                 masked_sensors: Optional[Sequence] = None):
        self.root = root
        self.test_months = test_months
        self.infer_eval_from = infer_eval_from  # [next, previous]
        self.features = features
        if masked_sensors is None:
            self.masked_sensors = []
        else:
            self.masked_sensors = list(masked_sensors)
        df, mask, eval_mask, dist = self.load(impute_nans=impute_nans)
        super().__init__(target=df,
                         mask=mask,
                         freq=freq,
                         similarity_score='distance',
                         temporal_aggregation='mean',
                         spatial_aggregation='mean',
                         default_splitting_method='air_quality',
                         name='AQI12')
        
        self.add_covariate('dist', dist, pattern='n n')
        self.set_eval_mask(eval_mask)

        self.df = df
        self.mask = mask
        self.eval_masks = eval_mask
        self.distance = dist

    @property
    def raw_file_names(self) -> List[str]:
        return ['merged_full.csv']

    @property
    def required_file_names(self) -> List[str]:
        return self.raw_file_names + ['aqi_dist.npy']

    def build(self):
        # compute distances from latitude and longitude degrees
        path = os.path.join(self.root_dir, 'merged_full.csv')
        stations = pd.DataFrame(pd.read_csv(path))
        stations = stations.drop_duplicates(subset=["station"])[["station", "locationLatitude", "locationLongitude"]]
        st_coord = stations.loc[:, ['locationLatitude', 'locationLongitude']]
        from tsl.ops.similarities import geographical_distance
        dist = geographical_distance(st_coord, to_rad=True).values
        np.save(os.path.join(self.root_dir, 'aqi_dist.npy'), dist)

    def load_raw(self):
        self.maybe_build()
        dist = np.load(os.path.join(self.root_dir, 'aqi_dist.npy'))
        path = os.path.join(self.root_dir, 'merged_full.csv')
        eval_mask = None
        df = pd.read_csv(path)
        df['datetime'] = pd.to_datetime(df[['year', 'month', 'day', 'hour']])
        df_pivot = df.pivot(index="datetime", columns="station", values=self.features)
        df_pivot.columns.names = ["channels", "nodes"]
        df_pivot.columns = df_pivot.columns.swaplevel(0, 1) 
        df_pivot.sort_index(axis=1, level=0, inplace=True)
        df_pivot = df_pivot.rename(columns={feat: ind for ind, feat in enumerate(self.features)})
        return pd.DataFrame(df_pivot), dist, eval_mask

    def load(self, impute_nans=True):
        # load readings and stations metadata
        df, dist, eval_mask = self.load_raw()
        # compute the masks:
        mask = (~np.isnan(df.values)).astype('uint8')  # 1 if value is valid
        if eval_mask is None:
            eval_mask = infer_mask(df, infer_from=self.infer_eval_from)
        # 1 if value is ground-truth for imputation
        eval_mask = eval_mask.values.astype('uint8')
        if len(self.masked_sensors):
            eval_mask[:, self.masked_sensors] = mask[:, self.masked_sensors]
        # eventually replace nans with weekly mean by hour
        if impute_nans:
            from tsl.ops.framearray import temporal_mean
            df = df.fillna(temporal_mean(df))
        return df, mask, eval_mask, dist

    def get_splitter(self, method: Optional[str] = None, **kwargs):
        if method == 'air_quality':
            val_len = kwargs.get('val_len')
            return AirQualitySplitter(test_months=self.test_months,
                                      val_len=val_len)

    def compute_similarity(self, method: str, **kwargs):
        if method == "distance":
            from tsl.ops.similarities import gaussian_kernel

            # use same theta for both air and air36
            theta = np.std(self.dist)
            return gaussian_kernel(self.dist, theta=theta)


In [75]:
df = AirQualitySmaller('../../../AirData/AQI/Stations', impute_nans=True)

In [83]:
df.eval_mask.shape, dataset.eval_mask.shape

((35064, 12, 1), (8759, 36, 1))

In [21]:
niwa_df = pd.read_csv('../../../AirData/Niwa/allNIWA_clarity.csv')

In [160]:
import itertools

COLS = ['pm10ConcNumIndividual.value', 'pm1ConcNumIndividual.value',
        'pm2_5ConcNumIndividual.value', 'relHumidInternalIndividual.value']
AUCKLAND = {
    'df' :      pd.DataFrame({
                'locationLatitude': [-36.844079, -36.844113, -36.711932, -36.898491, -36.906652, -36.876728],
                'locationLongitude': [174.762123, 174.761371, 174.740808, 174.591428, 174.633079, 174.703081]}), 
    'timezone': 'Pacific/Auckland'}

def AirQualityCreate(path, features=None, t_range=None):
    fin_cols = [COLS[i] for i in features]
    features = {feat:'mean' for feat in fin_cols}

    lat_long_vals = AUCKLAND["df"]
    time_zone = AUCKLAND['timezone']

    df = pd.read_csv(path)
    df['datetime'] = pd.to_datetime(df['time'], utc=True)
    df['locationLatitude'] = df['locationLatitude'].round(6)
    df['locationLongitude'] = df['locationLongitude'].round(6)
    cols_to_keep = ['datetime', 'locationLatitude', 'locationLongitude'] + list(features.keys())

    # Clean dataset
    if features:
        df = df[cols_to_keep]
    if t_range:
        df = df[(df['datetime'] > pd.to_datetime(t_range[0],unit="ns", utc=True)) 
                & (df['datetime'] < pd.to_datetime(t_range[1],unit="ns", utc=True))]
    if not lat_long_vals.empty:
        df = df.merge(lat_long_vals, on=['locationLatitude', 'locationLongitude'])

    fin_df = df.groupby([pd.Grouper(key='datetime', freq='h'), 'locationLatitude', 'locationLongitude']).agg(features).reset_index()

    unique_stations = fin_df[['locationLatitude', 'locationLongitude']].drop_duplicates().dropna().reset_index(drop=True)
    unique_stations['station_id'] = range(1, len(unique_stations) + 1)  
    
    fin_df = fin_df.merge(unique_stations, on=['locationLatitude', 'locationLongitude'], how='left')

    # Shape daset
    unique_datetimes = fin_df["datetime"].unique()

    datetime_range = pd.date_range(start=np.min(unique_datetimes), end=np.max(unique_datetimes), freq='h')
    unique_stations = fin_df["station_id"].unique()

    all_combinations = pd.DataFrame(
        list(itertools.product(datetime_range, unique_stations)),
        columns=["datetime", "station_id"]
    )

    df_complete = all_combinations.merge(fin_df, on=["datetime", "station_id"], how="left")
    df_complete[['locationLatitude', 'locationLongitude']] = \
        df_complete.groupby('station_id')[['locationLatitude', 'locationLongitude']].transform(lambda x: x.ffill().bfill())

    return df_complete

niwa_df = AirQualityCreate('../../../AirData/Niwa/allNIWA_clarity.csv', [1, 2], ['2022-04-01', '2022-12-01'])

In [163]:
niwa_df

Unnamed: 0,datetime,station_id,locationLatitude,locationLongitude,pm1ConcNumIndividual.value,pm2_5ConcNumIndividual.value
0,2022-04-01 00:00:00+00:00,1,-36.906652,174.633079,3.036667,3.100000
1,2022-04-01 00:00:00+00:00,2,-36.844113,174.761371,2.616667,2.673333
2,2022-04-01 00:00:00+00:00,3,-36.844079,174.762123,3.363333,3.410000
3,2022-04-01 00:00:00+00:00,4,-36.711932,174.740808,6.857143,7.001429
4,2022-04-01 00:00:00+00:00,5,-36.898491,174.591428,,
...,...,...,...,...,...,...
35131,2022-11-30 23:00:00+00:00,2,-36.844113,174.761371,,
35132,2022-11-30 23:00:00+00:00,3,-36.844079,174.762123,,
35133,2022-11-30 23:00:00+00:00,4,-36.711932,174.740808,,
35134,2022-11-30 23:00:00+00:00,5,-36.898491,174.591428,3.550000,3.803333


In [164]:
df_pivot = niwa_df.pivot(index="datetime", columns="station_id", values=[COLS[i] for i in [1, 2]])
df_pivot.columns.names = ["channels", "nodes"]
df_pivot.columns = df_pivot.columns.swaplevel(0, 1) 
df_pivot.sort_index(axis=1, level=0, inplace=True)
df_pivot = df_pivot.rename(columns={feat: ind for ind, feat in enumerate([COLS[i] for i in [1, 2]])})

In [165]:
df_pivot

nodes,1,1,2,2,3,3,4,4,5,5,6,6
channels,0,1,0,1,0,1,0,1,0,1,0,1
datetime,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2,Unnamed: 7_level_2,Unnamed: 8_level_2,Unnamed: 9_level_2,Unnamed: 10_level_2,Unnamed: 11_level_2,Unnamed: 12_level_2
2022-04-01 00:00:00+00:00,3.036667,3.100000,2.616667,2.673333,3.363333,3.410000,6.857143,7.001429,,,,
2022-04-01 01:00:00+00:00,3.117500,3.182500,2.635000,2.705000,3.072500,3.127500,6.986667,7.156667,,,,
2022-04-01 02:00:00+00:00,2.953333,3.036667,2.350000,2.400000,3.066667,3.140000,5.152500,5.253750,,,,
2022-04-01 03:00:00+00:00,3.480000,3.560000,2.320000,2.370000,2.977500,3.030000,4.665000,4.761667,,,,
2022-04-01 04:00:00+00:00,3.676667,3.796667,2.707500,2.780000,3.203333,3.263333,4.172500,4.273750,,,,
...,...,...,...,...,...,...,...,...,...,...,...,...
2022-11-30 19:00:00+00:00,4.142500,4.507500,,,,,,,4.296667,4.593333,30.396667,31.510000
2022-11-30 20:00:00+00:00,3.516667,3.830000,,,,,,,4.100000,4.425000,29.887500,31.017500
2022-11-30 21:00:00+00:00,3.435000,3.740000,,,,,,,4.170000,4.486667,30.740000,31.803333
2022-11-30 22:00:00+00:00,3.306667,3.623333,,,,,,,3.930000,4.220000,28.602500,29.595000


In [2]:
import itertools

COLS = ['pm10ConcNumIndividual.value', 'pm1ConcNumIndividual.value',
        'pm2_5ConcNumIndividual.value', 'relHumidInternalIndividual.value']
AUCKLAND = {
    'df' :      pd.DataFrame({
                'locationLatitude': [-36.844079, -36.844113, -36.711932, -36.898491, -36.906652, -36.876728],
                'locationLongitude': [174.762123, 174.761371, 174.740808, 174.591428, 174.633079, 174.703081]}), 
    'timezone': 'Pacific/Auckland'}

def AirQualityCreate(path, features=None, t_range=None):
    for feat in features:
        assert feat in COLS
    
    features = {feat:'mean' for feat in features}

    lat_long_vals = AUCKLAND["df"]
    time_zone = AUCKLAND['timezone']

    df = pd.read_csv(path)
    df['datetime'] = pd.to_datetime(df['time'], utc=True)
    df['locationLatitude'] = df['locationLatitude'].round(6)
    df['locationLongitude'] = df['locationLongitude'].round(6)
    cols_to_keep = ['datetime', 'locationLatitude', 'locationLongitude'] + list(features.keys())

    # Clean dataset
    if features:
        df = df[cols_to_keep]
    if t_range:
        df = df[(df['datetime'] > pd.to_datetime(t_range[0],unit="ns", utc=True)) 
                & (df['datetime'] < pd.to_datetime(t_range[1],unit="ns", utc=True))]
    if not lat_long_vals.empty:
        df = df.merge(lat_long_vals, on=['locationLatitude', 'locationLongitude'])

    fin_df = df.groupby([pd.Grouper(key='datetime', freq='h'), 'locationLatitude', 'locationLongitude']).agg(features).reset_index()

    unique_stations = fin_df[['locationLatitude', 'locationLongitude']].drop_duplicates().dropna().reset_index(drop=True)
    unique_stations['station'] = range(1, len(unique_stations) + 1)  
    
    fin_df = fin_df.merge(unique_stations, on=['locationLatitude', 'locationLongitude'], how='left')

    # Shape daset
    unique_datetimes = fin_df["datetime"].unique()

    datetime_range = pd.date_range(start=np.min(unique_datetimes), end=np.max(unique_datetimes), freq='h')
    unique_stations = fin_df["station"].unique()

    all_combinations = pd.DataFrame(
        list(itertools.product(datetime_range, unique_stations)),
        columns=["datetime", "station"]
    )

    df_complete = all_combinations.merge(fin_df, on=["datetime", "station"], how="left")
    df_complete[['locationLatitude', 'locationLongitude']] = \
        df_complete.groupby('station')[['locationLatitude', 'locationLongitude']].transform(lambda x: x.ffill().bfill())

    return df_complete

niwa_df = AirQualityCreate('../../../AirData/Niwa/allNIWA_clarity.csv', ['pm2_5ConcNumIndividual.value', 'relHumidInternalIndividual.value'], ['2022-04-01', '2022-12-01'])

In [4]:
niwa_df

Unnamed: 0,datetime,station,locationLatitude,locationLongitude,pm2_5ConcNumIndividual.value,relHumidInternalIndividual.value
0,2022-04-01 00:00:00+00:00,1,-36.906652,174.633079,3.100000,57.160000
1,2022-04-01 00:00:00+00:00,2,-36.844113,174.761371,2.673333,51.863333
2,2022-04-01 00:00:00+00:00,3,-36.844079,174.762123,3.410000,55.910000
3,2022-04-01 00:00:00+00:00,4,-36.711932,174.740808,7.001429,65.714286
4,2022-04-01 00:00:00+00:00,5,-36.898491,174.591428,,
...,...,...,...,...,...,...
35131,2022-11-30 23:00:00+00:00,2,-36.844113,174.761371,,
35132,2022-11-30 23:00:00+00:00,3,-36.844079,174.762123,,
35133,2022-11-30 23:00:00+00:00,4,-36.711932,174.740808,,
35134,2022-11-30 23:00:00+00:00,5,-36.898491,174.591428,3.803333,62.193333


In [38]:
class AirQualityAuckland(DatetimeDataset, MissingValuesMixin):
    similarity_options = {'distance'}

    def __init__(self,
                 root: str = None,
                 impute_nans: bool = True,
                 test_months: Sequence = (9, 10, 11),
                 infer_eval_from: str = 'next',
                 features: list = ['pm2_5ConcNumIndividual.value'],
                 t_range: Optional[list] = None,
                 freq: Optional[str] = None,
                 masked_sensors: Optional[Sequence] = None):
        self.root = root
        self.test_months = test_months
        self.infer_eval_from = infer_eval_from  # [next, previous]
        self.features = features
        self.t_range = t_range

        if masked_sensors is None:
            self.masked_sensors = []
        else:
            self.masked_sensors = list(masked_sensors)
        
        df, mask, eval_mask, dist = self.load(impute_nans=impute_nans)
        super().__init__(target=df,
                         mask=mask,
                         freq=freq,
                         similarity_score='distance',
                         temporal_aggregation='mean',
                         spatial_aggregation='mean',
                         default_splitting_method='air_quality',
                         name='AQI12')
        
        self.add_covariate('dist', dist, pattern='n n')
        self.set_eval_mask(eval_mask)

        self.df = df
        # self.mask = mask
        # self.eval_masks = eval_mask
        # self.distance = dist

    @property
    def raw_file_names(self) -> List[str]:
        return ['allNIWA_clarity.csv']

    @property
    def required_file_names(self) -> List[str]:
        return self.raw_file_names + ['auck_aqi_dist.npy']

    def build(self):
        # compute distances from latitude and longitude degrees
        path = os.path.join(self.root_dir, 'allNIWA_clarity.csv')
        stations = AirQualityCreate(path, self.features, self.t_range)
        stations = stations.drop_duplicates(subset=["station"])[["station", "locationLatitude", "locationLongitude"]]
        st_coord = stations.loc[:, ['locationLatitude', 'locationLongitude']]
        from tsl.ops.similarities import geographical_distance
        dist = geographical_distance(st_coord, to_rad=True).values
        np.save(os.path.join(self.root_dir, 'auck_aqi_dist.npy'), dist)

    def load_raw(self):
        self.maybe_build()
        dist = np.load(os.path.join(self.root_dir, 'auck_aqi_dist.npy'))
        path = os.path.join(self.root_dir, 'allNIWA_clarity.csv')
        eval_mask = None
        df = AirQualityCreate(path, self.features, self.t_range)

        df_pivot = df.pivot(index="datetime", columns="station", values=self.features)
        df_pivot.columns.names = ["channels", "nodes"]
        df_pivot.columns = df_pivot.columns.swaplevel(0, 1) 
        df_pivot.sort_index(axis=1, level=0, inplace=True)
        df_pivot = df_pivot.rename(columns={feat: ind for ind, feat in enumerate(self.features)})
        
        return pd.DataFrame(df_pivot), dist, eval_mask

    def load(self, impute_nans=True):
        # load readings and stations metadata
        df, dist, eval_mask = self.load_raw()
        # compute the masks:
        mask = (~np.isnan(df.values)).astype('uint8')  # 1 if value is valid
        if eval_mask is None:
            eval_mask = np.random.randint(2, size=mask.shape)
            eval_mask *= mask
        if len(self.masked_sensors):
            mask[:, self.masked_sensors] = 0
            eval_mask[:, self.masked_sensors] = 0
        # eventually replace nans with weekly mean by hour
        if impute_nans:
            from tsl.ops.framearray import temporal_mean
            df = df.fillna(temporal_mean(df))
        return df, mask, eval_mask, dist

    def get_splitter(self, method: Optional[str] = None, **kwargs):
        if method == 'air_quality':
            val_len = kwargs.get('val_len')
            return AirQualitySplitter(test_months=self.test_months,
                                      val_len=val_len)

    def compute_similarity(self, method: str, **kwargs):
        if method == "distance":
            from tsl.ops.similarities import gaussian_kernel

            # use same theta for both air and air36
            theta = np.std(self.dist)
            return gaussian_kernel(self.dist, theta=theta)

In [39]:
dataset = AirQualityAuckland('../../../AirData/Niwa/', t_range=['2022-04-01', '2022-12-01'], masked_sensors=[2,3])

In [40]:
dataset.eval_mask[:, [2,3]], dataset.mask[:, [2,3]]

(array([[[False],
         [False]],
 
        [[False],
         [False]],
 
        [[False],
         [False]],
 
        ...,
 
        [[False],
         [False]],
 
        [[False],
         [False]],
 
        [[False],
         [False]]]),
 array([[[False],
         [False]],
 
        [[False],
         [False]],
 
        [[False],
         [False]],
 
        ...,
 
        [[False],
         [False]],
 
        [[False],
         [False]],
 
        [[False],
         [False]]]))

In [33]:
dataset.mask

array([[[ True],
        [ True],
        [ True],
        [ True],
        [False],
        [False]],

       [[ True],
        [ True],
        [ True],
        [ True],
        [False],
        [False]],

       [[ True],
        [ True],
        [ True],
        [ True],
        [False],
        [False]],

       ...,

       [[ True],
        [False],
        [False],
        [False],
        [ True],
        [ True]],

       [[ True],
        [False],
        [False],
        [False],
        [ True],
        [ True]],

       [[ True],
        [False],
        [False],
        [False],
        [ True],
        [ True]]])

In [272]:
covariates = {'u': dataset.datetime_encoded('day').values}

# get adjacency matrix
adj = dataset.get_connectivity(method = 'distance', threshold = 1e-4, include_self= False)
# u = np.expand_dims(covariates['u'], axis=1)
# covariates['u'] = np.repeat(u, max(adj[0]), axis=1)
# print(covariates['u'].shape)


In [273]:
torch_dataset = ImputationDataset(target=dataset.dataframe(),
                                    mask=dataset.training_mask,
                                    eval_mask=dataset.eval_mask,
                                    covariates=covariates,
                                    transform=MaskInput(),
                                    connectivity=adj)

scalers = {'target': StandardScaler(axis=(0, 1))}

dm = SpatioTemporalDataModule(
    dataset=torch_dataset,
    scalers=scalers,
    splitter=dataset.get_splitter(val_len= 0.1, test_len= 0.2),
    batch_size=34,
    workers=1)

In [274]:
dm.setup(stage='fit')

In [275]:
dm.testset.indices

array([3672, 3673, 3674, ..., 5842, 5843, 5844])

In [276]:
np.sum(dataset.eval_masks)

9608

In [277]:
dataset.dataframe()

nodes,1,2,3,4,5,6
channels,0,0,0,0,0,0
datetime,Unnamed: 1_level_2,Unnamed: 2_level_2,Unnamed: 3_level_2,Unnamed: 4_level_2,Unnamed: 5_level_2,Unnamed: 6_level_2
2022-04-01 00:00:00+00:00,3.100000,2.673333,3.410000,7.001429,2.084038,26.935846
2022-04-01 01:00:00+00:00,3.182500,2.705000,3.127500,7.156667,1.952051,26.430929
2022-04-01 02:00:00+00:00,3.036667,2.400000,3.140000,5.253750,1.909407,26.247177
2022-04-01 03:00:00+00:00,3.560000,2.370000,3.030000,4.761667,2.000657,25.495037
2022-04-01 04:00:00+00:00,3.796667,2.780000,3.263333,4.273750,2.001064,22.854179
...,...,...,...,...,...,...
2022-11-30 19:00:00+00:00,4.507500,3.220572,6.007094,3.583112,4.593333,31.510000
2022-11-30 20:00:00+00:00,3.830000,3.538578,8.621893,4.527214,4.425000,31.017500
2022-11-30 21:00:00+00:00,3.740000,4.023368,7.992508,2.938905,4.486667,31.803333
2022-11-30 22:00:00+00:00,3.623333,3.346984,7.031816,2.417434,4.220000,29.594999


In [280]:
sums = 0
for ind in dm.trainset.indices:
    sums += np.sum(dataset.eval_mask[ind, :])
sums

6075

In [219]:
dm.torch_dataset.eval_mask

tensor([[[False],
         [False],
         [False],
         [ True],
         [False],
         [False]],

        [[False],
         [False],
         [False],
         [ True],
         [False],
         [False]],

        [[False],
         [False],
         [False],
         [ True],
         [False],
         [False]],

        ...,

        [[False],
         [False],
         [False],
         [False],
         [False],
         [ True]],

        [[False],
         [False],
         [False],
         [False],
         [False],
         [ True]],

        [[False],
         [False],
         [False],
         [False],
         [False],
         [ True]]])