# Classify peaks as cumulative or not

**Classify the measurements after the NaN and zero periods as cumulative or not based on peak detection**

- **#1**: For each NaN/zero interval, check whether the value that follows is greater than peak_factor times the average value of the next M-1 values (peak_factor and M are parameters)

- **#2**: For each meter, check whether the values after the NaN/zero intervals are higher than the average peak length

- **#3**: For each NaN/zero interval, check whether the value that follows is among the detected peaks

- **#4**: For each meter, apply 2-medoids to the M samples that follow the NaN/zero intervals to classify cumulative measurements

In [None]:
import altair as alt
import numpy as np
import pandas as pd
from pathlib import Path
import itertools
import datetime
import tqdm
import random
from scipy.signal import find_peaks, find_peaks_cwt
from sklearn.cluster import KMeans
from sklearn_extra.cluster import KMedoids
import warnings
idx = pd.IndexSlice
alt.data_transformers.disable_max_rows()

In [None]:
PRE_PATH = Path('/cw/dtaiproj/ml/2020-FLAIR-VITO/profile-clustering/preprocessed/combined')
info_path = PRE_PATH/'info.csv'
data_path = PRE_PATH/'data.csv'
assert info_path.exists() and data_path.exists(), 'These paths should exist'

## Read the data

In [None]:
info_df = pd.read_csv(info_path, index_col = [0,1])
info_df.head()

In [None]:
data_df = pd.read_csv(data_path, index_col = [0,1])
data_df.columns = pd.to_datetime(data_df.columns)
data_df.columns.name = 'timestamp'
data_df.head()

## Handle all data sources and years seperately
Of course connection problems need to be in the same year and within the same measurement project, so for now lets use the EandisVREG data of 2016

In [None]:
DATA_SOURCE = 'EandisVREG'
YEAR = 2016
# get the right subset based on the info df
info16_df = info_df.loc[idx[:, 2016],:]
info16_df = info16_df[info16_df.data_source == 'EandisVREG']
info16_df

In [None]:
# read the corresponding data profiles 
data16_df = data_df.loc[info16_df.index, :]
data16_df

## Select a random subset of data (some of the meters) for efficiency:

In [None]:
random.seed(333)

#meterIDs = data16_df.index.get_level_values(0).unique().values
#meterIDs_selected = random.sample(meterIDs.tolist(), int(len(meterIDs)*0.01))

#info16_df = info16_df.query(f'meterID in {meterIDs_selected}')
#data16_df = data16_df.query(f'meterID in {meterIDs_selected}')

## Look at the amount of NaNs and Zeros

In [None]:
# nb of zeros for each profile
nb_of_na = (data16_df.isna()).sum(axis = 1)
nb_of_zeros = (data16_df == 0).sum(axis = 1)

## Look at profiles with potential problems

In [None]:
data16_df= data16_df.loc[(nb_of_na>0)| (nb_of_zeros>0), :]
data16_df

## Construct the intervals
So in the rest of this code we simply construct the intervals as a dataset and add different attributes/features and investigate whether they could be useful or not

In [None]:
# code to find intervals with only zeros
def value_interval(meterID, year, a, value):
    """
        Makes a dataframe containing the start and end of each interval (only the longest intervals) that only contains value
    """
    # Create an array that is 1 where a is 0, and pad each end with an extra 0.
    if np.isnan(value):
        iszero = np.concatenate(([0], np.isnan(a).view(np.int8), [0]))
    else: 
        iszero = np.concatenate(([0], np.equal(a, value).view(np.int8), [0]))
    absdiff = np.abs(np.diff(iszero))
    # Runs start and end where absdiff is 1.
    ranges = np.where(absdiff == 1)[0].reshape(-1, 2)
    df = pd.DataFrame(ranges, columns = ['start', 'end'])
    df['meterID'] = meterID
    df['year'] = year
    df['interval_value'] = value
    return df.set_index(['meterID', 'year'])

def zero_nan_intervals_df(data_df): 
    dfs = []
    for (meterID, year), row in data_df.iterrows(): 
        nan_df = value_interval( meterID, year,row, np.NaN)
        zero_df = value_interval( meterID, year, row, 0)
        dfs.append(nan_df)
        dfs.append(zero_df)
    full_df = pd.concat(dfs, axis = 0)
#     full_df['start_time'] = data14_df.columns[full_df['start']]
#     full_df['end_time'] = data14_df.columns[full_df['end']-1]
    return full_df

profile_intervals = zero_nan_intervals_df(data16_df)
profile_intervals['interval_length'] = profile_intervals.end - profile_intervals.start
# start time and end time are exclusive! (this plots well with altair that why we do it this way)
profile_intervals['start_time'] = data16_df.columns[profile_intervals['start']] - pd.Timedelta('15min')
profile_intervals['end_time'] = data16_df.columns[profile_intervals['end']-1] # doing it this way because the timestamp we need might not exist in the columns
profile_intervals['end_time'] += pd.Timedelta('15min')
profile_intervals = profile_intervals.set_index(['start', 'end'], append = True)
profile_intervals

*notes:*  
- start is inclusive, end is exclusive so the interval is $[start, end[$  
- start_time and end_time are both exclusive $]start\_time, end\_time[$  
This works better for plotting in altair
     

## Remove the missing hour on march 27 due to change from winter to summer time 

In [None]:
profile_intervals = profile_intervals[~((profile_intervals.start_time == '2016-03-27 02:00:00') & (profile_intervals.end_time == '2016-03-27 03:00:00'))]

## Add M values before and M values after each interval

In [None]:
# profile_intervals['value_after_interval'] =
def values_after_end(row, M):
    meterID, year, start, end = row.name
    # if end is to large
    
    # slow:
    #val = []
    #for m in range(M):
    #    try:
    #        val.append(data16_df.at[(meterID,year), data16_df.columns[end+m]])
    #    except: 
    #        val.append('end')
    
    # fast:
    #time_inds = pd.date_range(start=data16_df.columns[end-1] + datetime.timedelta(minutes=15), freq='15min', periods=M)
    #val = data16_df.loc[(meterID,year)].reindex(time_inds, axis=1, fill_value='end').values.tolist()
    
    # faster:
    val = data16_df.loc[(meterID,year)].iloc[end:end+M].values.tolist() # may return a shorter array if the index exceeds the length
    if len(val) < M:
        val = val + ['end']*(M - len(val))
    
    return val

def values_before_start(row, M):
    meterID, year, start, end = row.name

    # fast:
    #time_inds_prev = pd.date_range(end=data16_df.columns[start] - datetime.timedelta(minutes=15), freq='15min', periods=M)
    #val = data16_df.loc[(meterID,year)].reindex(time_inds_prev, axis=1, fill_value='start').values.tolist()
    
    # faster:
    val = data16_df.loc[(meterID,year)].iloc[max(start-M, 0):start].values.tolist() # may return a shorter array if the index exceeds starts before 0
    if len(val) < M:
        val = ['start']*(M - len(val)) + val
    
    return val

M = 8

if f'value{M}_before_start' not in profile_intervals.columns:
    print('Adding values before start...')
    before_values_df = profile_intervals.apply(lambda o: values_before_start(o, M), axis = 1, result_type = 'expand').rename(columns = lambda o: f'value{o+1}_before_start')
    profile_intervals = pd.concat([profile_intervals, before_values_df], axis = 1)

if f'value{M}_after_end' not in profile_intervals.columns:
    print('Adding values after end...')
    after_values_df = profile_intervals.apply(lambda o: values_after_end(o, M), axis = 1, result_type = 'expand').rename(columns = lambda o: f'value{o+1}_after_end')
    profile_intervals = pd.concat([profile_intervals, after_values_df], axis = 1)

profile_intervals

## Add connection capacity 

In [None]:
if 'connection_power' not in profile_intervals.columns:
    connection_power = info16_df[['connection_power']]
    profile_intervals = profile_intervals.join(connection_power)
profile_intervals

## Check peaks due to connection_power

In [None]:
if 'is_connection_power_peak' not in profile_intervals.columns: 
    # a value is a connection power peak if the first or the second value after the interval is higher than the peak
    profile_intervals['is_connection_power_peak'] = (profile_intervals['value1_after_end'].astype(object).replace({'end': np.NaN}) > profile_intervals['connection_power'].astype('float'))|(profile_intervals['value2_after_end'].astype(object).replace({'end': np.NaN}) > profile_intervals['connection_power'].astype('float'))
profile_intervals

In [None]:
profile_intervals.is_connection_power_peak.value_counts().to_frame('count')

So clearly this rule only helps to detect very few peaks

# Detect peaks in the whole data

Select one of different methods&parameters to find peaks:

In [None]:
#peaks16_df = data16_df.apply(lambda o: find_peaks(o.values, 
#                                                  height=0, 
#                                                  width=[1,1], 
#                                                  plateau_size=[1,1], 
#                                                  distance=4
#                                                 )[0], axis=1) \
#    .to_frame().rename(columns={0:'peak_locations'})

# finds only sharp peaks, may not find peaks after NaNs!
#peaks16_df = data16_df.join(info16_df) \
#    .apply(lambda o: find_peaks(o.values[0:data16_df.shape[1]], 
#                                #height=0, 
#                                #threshold=float(o['connection_power'])*0.01,
#                                prominence=float(o['connection_power'])*0.05,
#                                width=[1,4], 
#                                #distance=4
#                               )[0], axis=1) \
#    .to_frame().rename(columns={0:'peak_locations'})

# finds only sharp peaks, NaNs are filled with the preceding value:
peaks16_df = data16_df.fillna(method='ffill', axis=1).join(info16_df) \
    .apply(lambda o: find_peaks(o.values[0:data16_df.shape[1]], 
                                #height=0, 
                                #threshold=float(o['connection_power'])*0.01,
                                prominence=float(o['connection_power'])*0.05,
                                width=[1,4], 
                                #distance=4
                               )[0], axis=1) \
    .to_frame().rename(columns={0:'peak_locations'})

# slow!:
#peaks16_df = data16_df.join(info16_df) \
#    .apply(lambda o: find_peaks_cwt(o.values[0:data16_df.shape[1]], 
#                                    widths=np.arange(1,40),
#                                   ), axis=1) \
#    .to_frame().rename(columns={0:'peak_locations'})

peaks16_df['nb_of_peaks'] = peaks16_df.apply(lambda o: len(o['peak_locations']), axis=1)
peaks16_df = peaks16_df.join(data16_df.join(peaks16_df).apply(
    lambda o: np.array(o[o['peak_locations']]), axis=1).to_frame().rename(columns={0:'peak_values'}))
peaks16_df['average_peak_value'] = peaks16_df.apply(lambda o: o['peak_values'].mean() if len(o['peak_values']) > 0 else np.nan, axis=1)

peaks16_df

In [None]:
all_peaks = peaks16_df['peak_values'].values
all_peaks = np.concatenate(all_peaks)
print('Number of total peaks:', sum(peaks16_df['nb_of_peaks']))

In [None]:
alt.Chart(pd.DataFrame({'peak_values':all_peaks}), title='histogram').mark_bar().encode(
    alt.X('peak_values:Q', bin=alt.Bin(step=0.1), axis=alt.Axis(title='peak values')),
    alt.Y('count()', scale=alt.Scale(type='log'), axis=alt.Axis(title='number of peaks'))
).properties(width = 750).interactive()

In [None]:
#profile_intervals.filter(regex='value[0-9]+_before_start|value[0-9]+_after_end', axis=1).iloc[2:3]
#profile_intervals.filter(regex='value[0-9]+_after_end', axis=1).iloc[2].values

# Peaks cannot be found like this!:
#value_after_end_columns = np.where([bool(re.compile('value[0-9]+_after_end').match(i)) for i in profile_intervals.columns.values])[0]
#is_first_sample_peak = lambda o: 0 in find_peaks(o[value_after_end_columns].replace({'end':np.nan}).values)[0]
#profile_intervals['interval_after_end_starts_with_peak'] = profile_intervals.apply(is_first_sample_peak, axis=1)

# Methods to detect whether the values are cumulative or not:

## **#1**: For each NaN/zero interval, check whether the value that follows is greater than peak_factor times the average value of the next M-1 values:

In [None]:
import re

peak_factor = 5

value_after_end_columns = np.where([bool(re.compile('value[0-9]+_after_end').match(i)) for i in profile_intervals.columns.values])[0]
def is_cumulative_based_on_factor(row):
    val1 = row['value1_after_end']
    if val1 == 'end' or not np.isfinite(val1):
        return False
    else:
        #return val1 > peak_factor * row.filter(regex='value[0-9]+_after_end').replace({'end':np.nan})[1:].mean() #slow
        return val1 > peak_factor * row[value_after_end_columns].replace({'end':np.nan})[1:].mean() #fast

profile_intervals['is_cumulative_based_on_factor'] = profile_intervals.apply(is_cumulative_based_on_factor, axis=1)

print(f'Out of {profile_intervals.shape[0]} measurements, {profile_intervals["is_cumulative_based_on_factor"].sum()} ({profile_intervals["is_cumulative_based_on_factor"].mean()*10}%) are considered as peaks.')

## **#2**: For each meter, check whether the values after the NaN/zero intervals are higher than the average peak length:

In [None]:
profile_intervals['is_cumulative_based_on_average_peak_value'] = profile_intervals.join(peaks16_df).apply(lambda o: o['value1_after_end'] > o['average_peak_value'] if not o['value1_after_end'] == 'end' else False, axis=1)

## **#3**: For each NaN/zero interval, check whether the value that follows is among the detected peaks:

In [None]:
is_first_sample_global_peak = lambda o: (o['end'] in peaks16_df.loc[o['meterID'], o['year']]['peak_locations'])
profile_intervals['is_first_sample_global_peak'] = profile_intervals.reset_index().apply(is_first_sample_global_peak, axis=1).values
profile_intervals

## **#4**: For each meter, apply 2-medoids to the M samples that follow the NaN/zero intervals to classify cumulative measurements:

In [None]:
for name, group in tqdm.tqdm(profile_intervals.groupby(['meterID', 'year'])):
    X = group.iloc[:, value_after_end_columns].copy()
    X.replace('end', np.nan, inplace=True) # treat 'end's as missing values
    Xn = X.dropna() # clustering methods don't like missing values so omit these rows
    with warnings.catch_warnings():
        warnings.simplefilter("ignore")
        clu = KMedoids(n_clusters=2, random_state=222).fit(Xn.values)
    X.loc[Xn.index, 'is_cumulative_based_on_clustering'] = clu.labels_
    cumulative_cluster = np.argmax(list(map(lambda o: o[0], clu.cluster_centers_))) # cluster with the largest first measurement
    X['is_cumulative_based_on_clustering'] = X['is_cumulative_based_on_clustering'] \
        .map({cumulative_cluster:True, int(not cumulative_cluster):False}) # change cluster numbers to True and False
    X['is_cumulative_based_on_clustering'].fillna(False, inplace=True) # classify the ones with missing values as 'real'
    profile_intervals.loc[X.index, 'is_cumulative_based_on_clustering'] = X['is_cumulative_based_on_clustering']
    

# Plot the intervals and the measurements after them:

In [None]:
def explode_dataframe(df, lst_cols, fill_value=''):
    # make sure `lst_cols` is a list
    if lst_cols and not isinstance(lst_cols, list):
        lst_cols = [lst_cols]
    # all columns except `lst_cols`
    idx_cols = df.columns.difference(lst_cols)

    # calculate lengths of lists
    lens = df[lst_cols[0]].str.len()

    if (lens > 0).all():
        # ALL lists in cells aren't empty
        return pd.DataFrame({
            col:np.repeat(df[col].values, df[lst_cols[0]].str.len())
            for col in idx_cols
        }).assign(**{col:np.concatenate(df[col].values) for col in lst_cols}) \
          .loc[:, df.columns]
    else:
        # at least one list in cells is empty
        return pd.DataFrame({
            col:np.repeat(df[col].values, df[lst_cols[0]].str.len())
            for col in idx_cols
        }).assign(**{col:np.concatenate(df[col].values) for col in lst_cols}) \
          .append(df.loc[lens==0, idx_cols]).fillna(fill_value) \
          .loc[:, df.columns]

peaks16_df_exploded = explode_dataframe(peaks16_df.reset_index(), ['peak_locations', 'peak_values'], np.nan).set_index(['meterID', 'year'])

peaks16_df_exploded['peak_time'] = peaks16_df_exploded.apply(
    lambda o: data16_df.columns[int(o['peak_locations'])] if not np.isnan(o['peak_locations']) else np.nan, axis=1)

peaks16_df_exploded.rename(columns={'peak_locations':'peak_location', 'peak_values':'peak_value', 'nb_of_peaks':'nb_of_peaks_for_meter', 'average_peak_value':'average_peak_value_for_meter'}, inplace=True)

peaks16_df_exploded

In [None]:
def plot_profile_with_intervals_and_peaks(meterID, year, period_type_column = None, data = None, peak_data = None, peak_type_column = None, daterange = None):
    # plots the profile, using the period data in data and peak data in peak_data
    # the color can be determined using the period_type_column
    if data is None : 
        data = profile_intervals
    if peak_data is None:
        peak_data = peaks16_df_exploded.loc[(meterID, year)].copy()
    if daterange is not None: 
        start_time =  f'2016-{daterange[0]}-1 00:00:00'
        end_time = f'2016-{daterange[1]}-1 00:00:00'
        profile_df = data16_df.loc[(meterID, year),start_time:end_time]
        periods_for_profile =data.loc[(meterID,year), :]
        periods_for_profile = periods_for_profile[(periods_for_profile['end_time'] > start_time ) & (periods_for_profile['start_time'] < end_time)]
    else: 
        profile_df = data16_df.loc[(meterID, year),:]
        periods_for_profile =data.loc[(meterID,year), :]
        
#     print(periods_for_profile[['start_time', 'end_time']])
#     print(zero_periods_for_profile[['start_time', 'end_time', 'is_disconnection_period']])
    line = alt.Chart(profile_df.to_frame('value').reset_index()).mark_line().encode(
        x = alt.X('timestamp:T'), 
        y = alt.Y('value:Q')
    )
    
    if peak_type_column is None:
        peak_color_encoding = alt.ColorValue('red')
    else:
        peak_color_encoding = alt.Color(f'{peak_type_column}:N')
    
    peak_points = alt.Chart(peak_data.reset_index()).mark_point(filled=True, size=100).encode(
        x=alt.X('peak_time:T'),
        y=alt.Y('peak_value:Q'),
        color=peak_color_encoding
    )
    
    if period_type_column is None: 
        color_encoding = alt.ColorValue('blue') 
    else: 
        color_encoding = alt.Color(f'{period_type_column}:N')
    plot_df =periods_for_profile.reset_index(drop=True)
    rect = alt.Chart(plot_df).mark_rect(opacity = 0.6).encode(
        x = 'start_time:T',
        x2 = 'end_time:T', 
        color = color_encoding
    ) + alt.Chart(plot_df).mark_circle(opacity = 0.6).encode(
        x = 'start_time:T',
        y = alt.YValue(profile_df.max()),
#         x2 = 'end_time:T', 
        color = color_encoding
    )
    
    chart = rect + line + peak_points
    
    if 'connection_power' in periods_for_profile.columns: 
        connection_power = float(periods_for_profile.connection_power.iat[0])

        connection_power_line = alt.Chart(periods_for_profile.reset_index()).mark_rule(color = 'black', opacity = 0.8).encode(
            y =  'mean(connection_power):Q'
        )
        chart += connection_power_line
    return chart.properties(width = 750, title = f"{meterID} in {year}").interactive()

In [None]:
meter_no = 4
meter = peaks16_df_exploded.index.get_level_values(0).unique()[meter_no]
#plot_profile_with_intervals_and_peaks(meter, 2016, period_type_column='interval_value')
plot_profile_with_intervals_and_peaks(meter, 2016, period_type_column='is_connection_power_peak').display()
plot_profile_with_intervals_and_peaks(meter, 2016, period_type_column='is_cumulative_based_on_factor').display()
plot_profile_with_intervals_and_peaks(meter, 2016, period_type_column='is_cumulative_based_on_average_peak_value').display()
plot_profile_with_intervals_and_peaks(meter, 2016, period_type_column='is_first_sample_global_peak').display()
plot_profile_with_intervals_and_peaks(meter, 2016, period_type_column='is_cumulative_based_on_clustering').display()

In [None]:
def plot_measurements_after_intervals(meterID, year, profile_intervals, cumulative_classification_column='is_cumulative_based_on_clustering'):
    # Plot the measurements after the NaN/zero intervals, colored according to cumulative_classification_column.
    # The number of samples is determined by the number of columns named as 'valuei_after_end' for i=1,2,...,M where M is provided above.
    value_after_end_columns_ = np.where([bool(re.compile('value[0-9]+_after_end').match(i)) 
                                         for i in profile_intervals.reset_index().columns.values])[0]
    other_columns_ = [profile_intervals.reset_index().columns[i] 
                      for i in range(len(profile_intervals.reset_index().columns)) if i not in value_after_end_columns_]
    plot_profile_intervals = profile_intervals.reset_index().set_index(other_columns_) \
                                              .rename_axis('time_sample', axis=1).stack().to_frame('value').reset_index()
    plot_profile_intervals['time_sample'] = plot_profile_intervals['time_sample'].map(lambda o: int(re.search(r'\d+', o).group())) # convert string values to numbers
    #plot_profile_intervals['start_end'] = list(zip(plot_profile_intervals['start'], plot_profile_intervals['end'])) # altair doesn't like tuples
    plot_profile_intervals['start_end'] = plot_profile_intervals['start'].astype(str) + '-' + plot_profile_intervals['end'].astype(str)
    return alt.Chart(plot_profile_intervals.query(f'meterID == "{meterID}" & year == {year}')) \
              .mark_line().encode(x='time_sample:Q', y='value:Q', color=f'{cumulative_classification_column}:N', detail='start_end') \
              .properties(width = 500, title = f"{meterID} in {year}").interactive()

In [None]:
meter_no = 4
meter = peaks16_df_exploded.index.get_level_values(0).unique()[meter_no]
plot_measurements_after_intervals(meter, 2016, profile_intervals, 'is_connection_power_peak').display()
plot_measurements_after_intervals(meter, 2016, profile_intervals, 'is_cumulative_based_on_factor').display()
plot_measurements_after_intervals(meter, 2016, profile_intervals, 'is_cumulative_based_on_average_peak_value').display()
plot_measurements_after_intervals(meter, 2016, profile_intervals, 'is_first_sample_global_peak').display()
plot_measurements_after_intervals(meter, 2016, profile_intervals, 'is_cumulative_based_on_clustering').display()