In [1]:
import sched
import time
import datetime as dt
from prometheus_api_client.utils import parse_datetime
import pandas as pd
from detector import get_slice_from_ts, is_anomaly
import numpy as np
import pickle

%load_ext autoreload
%autoreload 2

## Scheduling Exploration

one option is to schedule the detector inside the process, such that the process will run "forever" and each minute will execute fresh detection. For that here are some experiments with sched module:

In [2]:
s = sched.scheduler(time.time, time.sleep)
def print_time(a='default'):
    print("From print_time", time.time(), a)

In [3]:
time.mktime(dt.datetime(2021,11,26,12,40,5).timetuple())

1637923205.0

In [4]:
parse_datetime('now').timetuple()

time.struct_time(tm_year=2022, tm_mon=2, tm_mday=25, tm_hour=6, tm_min=43, tm_sec=1, tm_wday=4, tm_yday=56, tm_isdst=-1)

In [5]:
start = time.mktime(parse_datetime('now').timetuple())
for i in range(5):
    s.enterabs(start, 1, print_time)
    start = start + 3
s.run()


From print_time 1645764181.5203056 default
From print_time 1645764184.0048535 default
From print_time 1645764187.0039542 default
From print_time 1645764190.010169 default
From print_time 1645764193.007207 default


We can see that it is possible. However, running the process forever seems not to be a good pattern. A better one will be to use external scheduler like crontab and run the process each minute. In this way each process run independently for short time, hence we get more stability.

# Test Data Preparation

I create deticated directory to store test data. The directory should include fixed data that won't change due to other processes (other than testing) such as the data in forecasts directory. For that purpose, I copied prometheus_tsdb_head_chunks.pkl from "forecasts" to "test_data":

In [6]:
with open('test_data/prometheus_tsdb_head_chunks.pkl', 'rb') as inp:
    metric_obj = pickle.load(inp)
df = metric_obj[0].metric_values

In [7]:
df.head()

Unnamed: 0,ds,yhat,yhat_lower,yhat_upper
1681,2021-12-10 14:00:30,2786.236983,1963.141442,3607.608634
1682,2021-12-10 14:01:00,2805.900209,2006.692185,3543.498044
1683,2021-12-10 14:01:30,2825.414385,1974.111814,3615.091469
1684,2021-12-10 14:02:00,2844.695356,2078.776026,3637.382563
1685,2021-12-10 14:02:30,2863.661498,2037.208944,3721.174402


In [8]:
df.tail()

Unnamed: 0,ds,yhat,yhat_lower,yhat_upper
1916,2021-12-10 15:58:00,2385.217989,-2377.240693,6663.882663
1917,2021-12-10 15:58:30,2404.317748,-2612.066149,7008.245907
1918,2021-12-10 15:59:00,2423.706093,-2591.518273,6906.597615
1919,2021-12-10 15:59:30,2443.295276,-2658.969402,6978.996968
1920,2021-12-10 16:00:00,2462.997028,-2603.900921,6748.20835


## 1st Use Case: Identical Timestamps

create df for the first use case: slice by integer multiples of the period time (such that we have identical timestamps):

In [9]:
df_out_identical = df[(df['ds'] >= "2021-12-10 15:00:00") & (df['ds'] <= "2021-12-10 15:01:30")]
df_out_identical

Unnamed: 0,ds,yhat,yhat_lower,yhat_upper
1800,2021-12-10 15:00:00,2684.659331,638.053123,4503.40359
1801,2021-12-10 15:00:30,2632.651066,711.869864,4454.511461
1802,2021-12-10 15:01:00,2581.149742,813.374634,4492.459369
1803,2021-12-10 15:01:30,2530.321788,617.114597,4364.441102


In [10]:
df_out_identical = df_out_identical.set_index('ds').resample('5s').bfill().reset_index()
df_out_identical

Unnamed: 0,ds,yhat,yhat_lower,yhat_upper
0,2021-12-10 15:00:00,2684.659331,638.053123,4503.40359
1,2021-12-10 15:00:05,2632.651066,711.869864,4454.511461
2,2021-12-10 15:00:10,2632.651066,711.869864,4454.511461
3,2021-12-10 15:00:15,2632.651066,711.869864,4454.511461
4,2021-12-10 15:00:20,2632.651066,711.869864,4454.511461
5,2021-12-10 15:00:25,2632.651066,711.869864,4454.511461
6,2021-12-10 15:00:30,2632.651066,711.869864,4454.511461
7,2021-12-10 15:00:35,2581.149742,813.374634,4492.459369
8,2021-12-10 15:00:40,2581.149742,813.374634,4492.459369
9,2021-12-10 15:00:45,2581.149742,813.374634,4492.459369


In [11]:
# df_out_identical.to_csv('test_data/df_out_identical.csv', index=False)

the extraction of start and end time in the unit test will be as follow:

In [12]:
df_out_identical.ds.iloc[0]

Timestamp('2021-12-10 15:00:00')

In [13]:
df_out_identical.ds.iloc[-1]

Timestamp('2021-12-10 15:01:30')

## 2nd Use Case: Non-Identical Timestamps

second use case - start and end time are not integer multiples of the period time:

In [14]:
df_out_not_identical = df_out_identical[2:16].reset_index(drop=True)
df_out_not_identical

Unnamed: 0,ds,yhat,yhat_lower,yhat_upper
0,2021-12-10 15:00:10,2632.651066,711.869864,4454.511461
1,2021-12-10 15:00:15,2632.651066,711.869864,4454.511461
2,2021-12-10 15:00:20,2632.651066,711.869864,4454.511461
3,2021-12-10 15:00:25,2632.651066,711.869864,4454.511461
4,2021-12-10 15:00:30,2632.651066,711.869864,4454.511461
5,2021-12-10 15:00:35,2581.149742,813.374634,4492.459369
6,2021-12-10 15:00:40,2581.149742,813.374634,4492.459369
7,2021-12-10 15:00:45,2581.149742,813.374634,4492.459369
8,2021-12-10 15:00:50,2581.149742,813.374634,4492.459369
9,2021-12-10 15:00:55,2581.149742,813.374634,4492.459369


In [15]:
# df_out_not_identical.to_csv('test_data/df_out_not_identical.csv', index=False)

## 3rd Use-Case: relative timestamps

This is the main use-case where end time is now() and start_time is relative to it (like 10 minutes). since we use historic data frame we cannot use now() but we can simulate it by using complete timestamps with milliseconds:

In [16]:
start_time = df_out_identical.ds.iloc[0] - pd.Timedelta(5000 * np.random.rand(), 'ms')
end_time = df_out_identical.ds.iloc[-1] + pd.Timedelta(5000 * np.random.rand(), 'ms')
print(f"{start_time}, {end_time}")

2021-12-10 14:59:56.387975872, 2021-12-10 15:01:32.869881381


using the above simulated timestamps we expect to get the same dataframe as in the first use-case (identical timestamps)

In [17]:
df.set_index('ds').resample('5s').bfill()[start_time:end_time]

Unnamed: 0_level_0,yhat,yhat_lower,yhat_upper
ds,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1
2021-12-10 15:00:00,2684.659331,638.053123,4503.40359
2021-12-10 15:00:05,2632.651066,711.869864,4454.511461
2021-12-10 15:00:10,2632.651066,711.869864,4454.511461
2021-12-10 15:00:15,2632.651066,711.869864,4454.511461
2021-12-10 15:00:20,2632.651066,711.869864,4454.511461
2021-12-10 15:00:25,2632.651066,711.869864,4454.511461
2021-12-10 15:00:30,2632.651066,711.869864,4454.511461
2021-12-10 15:00:35,2581.149742,813.374634,4492.459369
2021-12-10 15:00:40,2581.149742,813.374634,4492.459369
2021-12-10 15:00:45,2581.149742,813.374634,4492.459369


Notice the built-in support of pandas in indexing with timestamps which are not identical to the ones that are in the df index. In this case pandas acts with the natural behaviour which is to take the nearest indexes (the nearest higher one at the start of the slice and the nearest lower one at the end)

## edge-case 1: same start and end time, identical timestamp

In [18]:
df_out_identical.iloc[6:7]

Unnamed: 0,ds,yhat,yhat_lower,yhat_upper
6,2021-12-10 15:00:30,2632.651066,711.869864,4454.511461


In [19]:
# df_out_identical.iloc[6:7].to_csv('test_data/df_out_one_row_identical.csv', index=False)

## edge-case 2: same start and end time, non-identical timestamp

In [20]:
df_out_identical.iloc[7:8]

Unnamed: 0,ds,yhat,yhat_lower,yhat_upper
7,2021-12-10 15:00:35,2581.149742,813.374634,4492.459369


In [21]:
# df_out_identical.iloc[7:8].to_csv('test_data/df_out_one_row_non_identical.csv', index=False)

In [22]:
pd.read_csv('test_data/df_out_one_row_non_identical.csv').index

RangeIndex(start=0, stop=1, step=1)

## edge-case 3: same start and end time, ms-percision timestamps

in this case we expect to get empty df:

In [23]:
df_empty = df_out_identical.iloc[8:8]
df_empty

Unnamed: 0,ds,yhat,yhat_lower,yhat_upper


In [24]:
# df_empty.to_csv('test_data/df_out_empty.csv', index=False)

In [25]:
pd.read_csv('test_data/df_out_empty.csv', parse_dates=['ds']).dtypes

ds            object
yhat          object
yhat_lower    object
yhat_upper    object
dtype: object

## edge-case 4: start timestamp in range, end timestamp out of range

In [26]:
df_out_tail = df[df['ds'] >= "2021-12-10 15:58:30"]
df_out_tail

Unnamed: 0,ds,yhat,yhat_lower,yhat_upper
1917,2021-12-10 15:58:30,2404.317748,-2612.066149,7008.245907
1918,2021-12-10 15:59:00,2423.706093,-2591.518273,6906.597615
1919,2021-12-10 15:59:30,2443.295276,-2658.969402,6978.996968
1920,2021-12-10 16:00:00,2462.997028,-2603.900921,6748.20835


In [27]:
df_out_tail = df_out_tail.set_index('ds').resample('5s').bfill().reset_index()
df_out_tail

Unnamed: 0,ds,yhat,yhat_lower,yhat_upper
0,2021-12-10 15:58:30,2404.317748,-2612.066149,7008.245907
1,2021-12-10 15:58:35,2423.706093,-2591.518273,6906.597615
2,2021-12-10 15:58:40,2423.706093,-2591.518273,6906.597615
3,2021-12-10 15:58:45,2423.706093,-2591.518273,6906.597615
4,2021-12-10 15:58:50,2423.706093,-2591.518273,6906.597615
5,2021-12-10 15:58:55,2423.706093,-2591.518273,6906.597615
6,2021-12-10 15:59:00,2423.706093,-2591.518273,6906.597615
7,2021-12-10 15:59:05,2443.295276,-2658.969402,6978.996968
8,2021-12-10 15:59:10,2443.295276,-2658.969402,6978.996968
9,2021-12-10 15:59:15,2443.295276,-2658.969402,6978.996968


In [28]:
# df_out_tail.to_csv('test_data/df_out_tail.csv', index=False)

## edge-case 5: start timestamp before range, end timestamp in range

In [29]:
df_out_head = df[df['ds'] <= "2021-12-10 14:02:00"]
df_out_head

Unnamed: 0,ds,yhat,yhat_lower,yhat_upper
1681,2021-12-10 14:00:30,2786.236983,1963.141442,3607.608634
1682,2021-12-10 14:01:00,2805.900209,2006.692185,3543.498044
1683,2021-12-10 14:01:30,2825.414385,1974.111814,3615.091469
1684,2021-12-10 14:02:00,2844.695356,2078.776026,3637.382563


In [30]:
df_out_head = df_out_head.set_index('ds').resample('5s').bfill().reset_index()
df_out_head

Unnamed: 0,ds,yhat,yhat_lower,yhat_upper
0,2021-12-10 14:00:30,2786.236983,1963.141442,3607.608634
1,2021-12-10 14:00:35,2805.900209,2006.692185,3543.498044
2,2021-12-10 14:00:40,2805.900209,2006.692185,3543.498044
3,2021-12-10 14:00:45,2805.900209,2006.692185,3543.498044
4,2021-12-10 14:00:50,2805.900209,2006.692185,3543.498044
5,2021-12-10 14:00:55,2805.900209,2006.692185,3543.498044
6,2021-12-10 14:01:00,2805.900209,2006.692185,3543.498044
7,2021-12-10 14:01:05,2825.414385,1974.111814,3615.091469
8,2021-12-10 14:01:10,2825.414385,1974.111814,3615.091469
9,2021-12-10 14:01:15,2825.414385,1974.111814,3615.091469


In [31]:
# df_out_head.to_csv('test_data/df_out_head.csv', index=False)