In [None]:
import os

import numpy as np
import pandas as pd

#from lib import datasets_path
#from .pd_dataset import PandasDataset
from ..utils.utils import disjoint_months, infer_mask, compute_mean, geographical_distance, thresholded_gaussian_kernel


class AirQuality(PandasDataset):
    SEED = 3210

    def __init__(self, impute_nans=False, small=False, freq='60T', masked_sensors=None):
        self.random = np.random.default_rng(self.SEED)
        self.test_months = [3, 6, 9, 12]
        self.infer_eval_from = 'next'
        self.eval_mask = None
        df, dist, mask = self.load(impute_nans=impute_nans, small=small, masked_sensors=masked_sensors)
        self.dist = dist
        if masked_sensors is None:
            self.masked_sensors = list()
        else:
            self.masked_sensors = list(masked_sensors)
        super().__init__(dataframe=df, u=None, mask=mask, name='air', freq=freq, aggr='nearest')

    def load_raw(self, small=False):
        if small:
            path = os.path.join(datasets_path['air'], 'small36.h5')
            eval_mask = pd.DataFrame(pd.read_hdf(path, 'eval_mask'))
        else:
            path = os.path.join(datasets_path['air'], 'full437.h5')
            eval_mask = None
        df = pd.DataFrame(pd.read_hdf(path, 'pm25'))
        stations = pd.DataFrame(pd.read_hdf(path, 'stations'))
        return df, stations, eval_mask

    def load(self, impute_nans=True, small=False, masked_sensors=None):
        # load readings and stations metadata
        df, stations, eval_mask = self.load_raw(small)
        # compute the masks
        mask = (~np.isnan(df.values)).astype('uint8')  # 1 if value is not nan else 0
        if eval_mask is None:
            eval_mask = infer_mask(df, infer_from=self.infer_eval_from)

        eval_mask = eval_mask.values.astype('uint8')
        if masked_sensors is not None:
            eval_mask[:, masked_sensors] = np.where(mask[:, masked_sensors], 1, 0)
        self.eval_mask = eval_mask  # 1 if value is ground-truth for imputation else 0
        # eventually replace nans with weekly mean by hour
        if impute_nans:
            df = df.fillna(compute_mean(df))
        # compute distances from latitude and longitude degrees
        st_coord = stations.loc[:, ['latitude', 'longitude']]
        dist = geographical_distance(st_coord, to_rad=True).values
        return df, dist, mask

    def splitter(self, dataset, val_len=1., in_sample=False, window=0):
        nontest_idxs, test_idxs = disjoint_months(dataset, months=self.test_months, synch_mode='horizon')
        if in_sample:
            train_idxs = np.arange(len(dataset))
            val_months = [(m - 1) % 12 for m in self.test_months]
            _, val_idxs = disjoint_months(dataset, months=val_months, synch_mode='horizon')
        else:
            # take equal number of samples before each month of testing
            val_len = (int(val_len * len(nontest_idxs)) if val_len < 1 else val_len) // len(self.test_months)
            # get indices of first day of each testing month
            delta_idxs = np.diff(test_idxs)
            end_month_idxs = test_idxs[1:][np.flatnonzero(delta_idxs > delta_idxs.min())]
            if len(end_month_idxs) < len(self.test_months):
                end_month_idxs = np.insert(end_month_idxs, 0, test_idxs[0])
            # expand month indices
            month_val_idxs = [np.arange(v_idx - val_len, v_idx) - window for v_idx in end_month_idxs]
            val_idxs = np.concatenate(month_val_idxs) % len(dataset)
            # remove overlapping indices from training set
            ovl_idxs, _ = dataset.overlapping_indices(nontest_idxs, val_idxs, synch_mode='horizon', as_mask=True)
            train_idxs = nontest_idxs[~ovl_idxs]
        return [train_idxs, val_idxs, test_idxs]

    def get_similarity(self, thr=0.1, include_self=False, force_symmetric=False, sparse=False, **kwargs):
        theta = np.std(self.dist[:36, :36])  # use same theta for both air and air36
        adj = thresholded_gaussian_kernel(self.dist, theta=theta, threshold=thr)
        if not include_self:
            adj[np.diag_indices_from(adj)] = 0.
        if force_symmetric:
            adj = np.maximum.reduce([adj, adj.T])
        if sparse:
            import scipy.sparse as sps
            adj = sps.coo_matrix(adj)
        return adj

    @property
    def mask(self):
        return self._mask

    @property
    def training_mask(self):
        return self._mask if self.eval_mask is None else (self._mask & (1 - self.eval_mask))

    def test_interval_mask(self, dtype=bool, squeeze=True):
        m = np.in1d(self.df.index.month, self.test_months).astype(dtype)
        if squeeze:
            return m
        return m[:, None]

In [None]:
import os

import numpy as np
import pandas as pd

import torch

In [None]:
from sklearn.metrics.pairwise import haversine_distances


In [None]:
datasets_path = {
    'air': 'datasets/air_quality',
    'la': 'datasets/metr_la',
    'bay': 'datasets/pems_bay',
    'synthetic': 'datasets/synthetic',
    
}

In [None]:
class PandasDataset:
    def __init__(self, dataframe: pd.DataFrame, u: pd.DataFrame = None, name='pd-dataset', mask=None, freq=None,
                 aggr='sum', **kwargs):
        """
        Initialize a tsl dataset from a pandas dataframe.


        :param dataframe: dataframe containing the data, shape: n_steps, n_nodes
        :param u: dataframe with exog variables
        :param name: optional name of the dataset
        :param mask: mask for valid data (1:valid, 0:not valid)
        :param freq: force a frequency (possibly by resampling)
        :param aggr: aggregation method after resampling
        """
        super().__init__()
        self.name = name

        # set dataset dataframe
        self.df = dataframe

        # set optional exog_variable dataframe
        # make sure to consider only the overlapping part of the two dataframes
        # assumption u.index \in df.index
        idx = sorted(self.df.index)
        self.start = idx[0]
        self.end = idx[-1]

        if u is not None:
            self.u = u[self.start:self.end]
        else:
            self.u = None

        if mask is not None:
            mask = np.asarray(mask).astype('uint8')
        self._mask = mask

        if freq is not None:
            self.resample_(freq=freq, aggr=aggr)
        else:
            self.freq = self.df.index.inferred_freq
            # make sure that all the dataframes are aligned
            self.resample_(self.freq, aggr=aggr)

        assert 'T' in self.freq
        self.samples_per_day = int(60 / int(self.freq[:-1]) * 24)

    def __repr__(self):
        return "{}(nodes={}, length={})".format(self.__class__.__name__, self.n_nodes, self.length)

    @property
    def has_mask(self):
        return self._mask is not None

    @property
    def has_u(self):
        return self.u is not None

    def resample_(self, freq, aggr):
        resampler = self.df.resample(freq)
        idx = self.df.index
        if aggr == 'sum':
            self.df = resampler.sum()
        elif aggr == 'mean':
            self.df = resampler.mean()
        elif aggr == 'nearest':
            self.df = resampler.nearest()
        else:
            raise ValueError(f'{aggr} if not a valid aggregation method.')

        if self.has_mask:
            resampler = pd.DataFrame(self._mask, index=idx).resample(freq)
            self._mask = resampler.min().to_numpy()

        if self.has_u:
            resampler = self.u.resample(freq)
            self.u = resampler.nearest()
        self.freq = freq

    def dataframe(self) -> pd.DataFrame:
        return self.df.copy()

    @property
    def length(self):
        return self.df.values.shape[0]

    @property
    def n_nodes(self):
        return self.df.values.shape[1]

    @property
    def mask(self):
        if self._mask is None:
            return np.ones_like(self.df.values).astype('uint8')
        return self._mask

    def numpy(self, return_idx=False):
        if return_idx:
            return self.numpy(), self.df.index
        return self.df.values

    def pytorch(self):
        data = self.numpy()
        return torch.FloatTensor(data)

    def __len__(self):
        return self.length

    @staticmethod
    def build():
        raise NotImplementedError

    def load_raw(self):
        raise NotImplementedError

    def load(self):
        raise NotImplementedError

In [None]:
def geographical_distance(x=None, to_rad=True):
    """
    Compute the as-the-crow-flies distance between every pair of samples in `x`. The first dimension of each point is
    assumed to be the latitude, the second is the longitude. The inputs is assumed to be in degrees. If it is not the
    case, `to_rad` must be set to False. The dimension of the data must be 2.

    Parameters
    ----------
    x : pd.DataFrame or np.ndarray
        array_like structure of shape (n_samples_2, 2).
    to_rad : bool
        whether to convert inputs to radians (provided that they are in degrees).

    Returns
    -------
    distances :
        The distance between the points in kilometers.
    """
    _AVG_EARTH_RADIUS_KM = 6371.0088

    # Extract values of X if it is a DataFrame, else assume it is 2-dim array of lat-lon pairs
    latlon_pairs = x.values if isinstance(x, pd.DataFrame) else x

    # If the input values are in degrees, convert them in radians
    if to_rad:
        latlon_pairs = np.vectorize(np.radians)(latlon_pairs)

    distances = haversine_distances(latlon_pairs) * _AVG_EARTH_RADIUS_KM

    # Cast response
    if isinstance(x, pd.DataFrame):
        res = pd.DataFrame(distances, x.index, x.index)
    else:
        res = distances

    return res

In [None]:
def compute_mean(x, index=None):
    """Compute the mean values for each datetime. The mean is first computed hourly over the week of the year.
    Further NaN values are computed using hourly mean over the same month through the years. If other NaN are present,
    they are removed using the mean of the sole hours. Hoping reasonably that there is at least a non-NaN entry of the
    same hour of the NaN datetime in all the dataset."""
    if isinstance(x, np.ndarray) and index is not None:
        shape = x.shape
        x = x.reshape((shape[0], -1))
        df_mean = pd.DataFrame(x, index=index)
    else:
        df_mean = x.copy()
    cond0 = [df_mean.index.year, df_mean.index.isocalendar().week, df_mean.index.hour]
    cond1 = [df_mean.index.year, df_mean.index.month, df_mean.index.hour]
    conditions = [cond0, cond1, cond1[1:], cond1[2:]]
    while df_mean.isna().values.sum() and len(conditions):
        nan_mean = df_mean.groupby(conditions[0]).transform(np.nanmean)
        df_mean = df_mean.fillna(nan_mean)
        conditions = conditions[1:]
    if df_mean.isna().values.sum():
        df_mean = df_mean.fillna(method='ffill')
        df_mean = df_mean.fillna(method='bfill')
    if isinstance(x, np.ndarray):
        df_mean = df_mean.values.reshape(shape)
    return df_mean

In [None]:
def load_raw(small=False):
    if small:
        path = os.path.join(datasets_path['air'], 'small36.h5')
        eval_mask = pd.DataFrame(pd.read_hdf(path, 'eval_mask'))
    else:
        path = os.path.join(datasets_path['air'], 'full437.h5')
        eval_mask = None
    df = pd.DataFrame(pd.read_hdf(path, 'pm25'))
    stations = pd.DataFrame(pd.read_hdf(path, 'stations'))
    return df, stations, eval_mask

In [None]:
def load(impute_nans=True, small=False, masked_sensors=None):
    # load readings and stations metadata
    df, stations, eval_mask = load_raw(small)
    # compute the masks
    mask = (~np.isnan(df.values)).astype('uint8')  # 1 if value is not nan else 0
    if eval_mask is None:
        eval_mask = infer_mask(df, infer_from='next')
    eval_mask = eval_mask.values.astype('uint8')
    if masked_sensors is not None:
        eval_mask[:, masked_sensors] = np.where(mask[:, masked_sensors], 1, 0)
    #self.eval_mask = eval_mask  # 1 if value is ground-truth for imputation else 0
        # eventually replace nans with weekly mean by hour
    if impute_nans:
        df = df.fillna(compute_mean(df))
        # compute distances from latitude and longitude degrees
    st_coord = stations.loc[:, ['latitude', 'longitude']]
    dist = geographical_distance(st_coord, to_rad=True).values
    return df, dist, mask

In [None]:
df, stations, eval_mask = load_raw(small=True)

In [None]:
df.info()

In [None]:
eval_mask

In [None]:
(eval_mask == 0).astype(int).sum()

In [None]:
df, dist, mask = load(impute_nans=False, small=True, masked_sensors=None)

In [None]:
dist

In [None]:
def thresholded_gaussian_kernel(x, theta=None, threshold=None, threshold_on_input=False):
    if theta is None:
        theta = np.std(x)
    weights = np.exp(-np.square(x / theta))
    if threshold is not None:
        mask = x > threshold if threshold_on_input else weights < threshold
        weights[mask] = 0.
    return weights

In [None]:
    def get_similarity(dist, thr=0.1, include_self=False, force_symmetric=False, sparse=False, **kwargs):
        theta = np.std(dist[:36, :36])  # use same theta for both air and air36
        adj = thresholded_gaussian_kernel(dist, theta=theta, threshold=thr)
        if not include_self:
            adj[np.diag_indices_from(adj)] = 0.
        if force_symmetric:
            adj = np.maximum.reduce([adj, adj.T])
        if sparse:
            import scipy.sparse as sps
            adj = sps.coo_matrix(adj)
        return adj

In [None]:
adj=get_similarity(dist)

In [None]:
adj.shape

In [None]:
class AirQuality(PandasDataset):
    SEED = 3210

    def __init__(self, impute_nans=False, small=False, freq='60T', masked_sensors=None):
        self.random = np.random.default_rng(self.SEED)
        self.test_months = [3, 6, 9, 12]
        self.infer_eval_from = 'next'
        self.eval_mask = None
        df, dist, mask = self.load(impute_nans=impute_nans, small=small, masked_sensors=masked_sensors)
        self.dist = dist
        if masked_sensors is None:
            self.masked_sensors = list()
        else:
            self.masked_sensors = list(masked_sensors)
        super().__init__(dataframe=df, u=None, mask=mask, name='air', freq=freq, aggr='nearest')

    def load_raw(self, small=False):
        if small:
            path = os.path.join(datasets_path['air'], 'small36.h5')
            eval_mask = pd.DataFrame(pd.read_hdf(path, 'eval_mask'))
        else:
            path = os.path.join(datasets_path['air'], 'full437.h5')
            eval_mask = None
        df = pd.DataFrame(pd.read_hdf(path, 'pm25'))
        stations = pd.DataFrame(pd.read_hdf(path, 'stations'))
        return df, stations, eval_mask

    def load(self, impute_nans=True, small=False, masked_sensors=None):
        # load readings and stations metadata
        df, stations, eval_mask = self.load_raw(small)
        # compute the masks
        mask = (~np.isnan(df.values)).astype('uint8')  # 1 if value is not nan else 0
        if eval_mask is None:
            eval_mask = infer_mask(df, infer_from=self.infer_eval_from)

        eval_mask = eval_mask.values.astype('uint8')
        if masked_sensors is not None:
            eval_mask[:, masked_sensors] = np.where(mask[:, masked_sensors], 1, 0)
        self.eval_mask = eval_mask  # 1 if value is ground-truth for imputation else 0
        # eventually replace nans with weekly mean by hour
        if impute_nans:
            df = df.fillna(compute_mean(df))
        # compute distances from latitude and longitude degrees
        st_coord = stations.loc[:, ['latitude', 'longitude']]
        dist = geographical_distance(st_coord, to_rad=True).values
        return df, dist, mask

    def splitter(self, dataset, val_len=1., in_sample=False, window=0):
        nontest_idxs, test_idxs = disjoint_months(dataset, months=self.test_months, synch_mode='horizon')
        if in_sample:
            train_idxs = np.arange(len(dataset))
            val_months = [(m - 1) % 12 for m in self.test_months]
            _, val_idxs = disjoint_months(dataset, months=val_months, synch_mode='horizon')
        else:
            # take equal number of samples before each month of testing
            val_len = (int(val_len * len(nontest_idxs)) if val_len < 1 else val_len) // len(self.test_months)
            # get indices of first day of each testing month
            delta_idxs = np.diff(test_idxs)
            end_month_idxs = test_idxs[1:][np.flatnonzero(delta_idxs > delta_idxs.min())]
            if len(end_month_idxs) < len(self.test_months):
                end_month_idxs = np.insert(end_month_idxs, 0, test_idxs[0])
            # expand month indices
            month_val_idxs = [np.arange(v_idx - val_len, v_idx) - window for v_idx in end_month_idxs]
            val_idxs = np.concatenate(month_val_idxs) % len(dataset)
            # remove overlapping indices from training set
            ovl_idxs, _ = dataset.overlapping_indices(nontest_idxs, val_idxs, synch_mode='horizon', as_mask=True)
            train_idxs = nontest_idxs[~ovl_idxs]
        return [train_idxs, val_idxs, test_idxs]

    def get_similarity(self, thr=0.1, include_self=False, force_symmetric=False, sparse=False, **kwargs):
        theta = np.std(self.dist[:36, :36])  # use same theta for both air and air36
        adj = thresholded_gaussian_kernel(self.dist, theta=theta, threshold=thr)
        if not include_self:
            adj[np.diag_indices_from(adj)] = 0.
        if force_symmetric:
            adj = np.maximum.reduce([adj, adj.T])
        if sparse:
            import scipy.sparse as sps
            adj = sps.coo_matrix(adj)
        return adj

    @property
    def mask(self):
        return self._mask

    @property
    def training_mask(self):
        return self._mask if self.eval_mask is None else (self._mask & (1 - self.eval_mask))

    def test_interval_mask(self, dtype=bool, squeeze=True):
        m = np.in1d(self.df.index.month, self.test_months).astype(dtype)
        if squeeze:
            return m
        return m[:, None]

In [None]:
dataset = datasets.AirQuality(impute_nans=True, small=True)

In [None]:
new_eval_mask=eval_mask.iloc[2300:4100,1:36]

In [None]:
new_eval_mask.info()

In [None]:
new_eval_mask

discharge data read in and apply mask, then export h5 dataset

In [None]:
import json
import csv
import pandas as pd
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import math

In [None]:
relationship = pd.read_csv('discharge/relationshipset2.csv',dtype=str)

In [None]:
discharge = pd.read_csv('discharge/set2.csv')

In [None]:
discharge=discharge.set_index('datetime')#将date作为标签
discharge.index=pd.DatetimeIndex(discharge.index)#将标签转为时间索引
discharge.axes#查看

In [None]:
discharge1800=discharge[-1800:]

In [None]:
discharge1800

In [None]:
mask=discharge1800.copy()

In [None]:
mask.shape[1]

In [None]:
new_eval_mask=pd.DataFrame(new_eval_mask).values

In [None]:
for i in range(mask.shape[1]):
    mask.iloc[:,i]=new_eval_mask[:,i]

In [None]:
mask

In [None]:
(mask == 0).astype(int).sum()

In [None]:
mask.shape[1]

In [None]:
dist=pd.DataFrame(np.zeros([mask.shape[1],mask.shape[1]]),columns=mask.columns,index=mask.columns)

In [None]:
dist

In [None]:
for i in range(relationship.shape[0]):
    dist.loc[relationship.iloc[i,1],relationship.iloc[i,0]]=1

In [None]:
dist

In [None]:
(dist == 1).astype(int).sum().sum()

In [None]:
store = pd.HDFStore('water.h5')

In [None]:
store.put(key='eval_mask',value=mask);
store.put(key='discharge',value=discharge1800);
store.put(key='dist',value=dist);

In [None]:
store.close()

In [None]:
def load_raw_water():
    path='water.h5'
    eval_mask = pd.DataFrame(pd.read_hdf(path, 'eval_mask'))
    df = pd.DataFrame(pd.read_hdf(path, 'discharge'))
    dist = pd.DataFrame(pd.read_hdf(path, 'dist'))
    return df, dist, eval_mask

In [None]:
df, dist, eval_mask = load_raw_water()

In [None]:
df

In [None]:
dist

In [None]:
dist_array=dist.values

In [None]:
dist_array