In [1]:
import time
import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings("ignore")

In [2]:
from datetime import timedelta
from typing import Any, Callable, List, Optional, Union

import numpy as np
import pandas as pd
from pandas.tseries.frequencies import to_offset

TimedeltaLike = Union[timedelta, float, str]

In [3]:
import ipywidgets as widgets # 交互组件
from ipywidgets import interact, fixed
from IPython.display import display

In [4]:
class Simulator:
    def __init__(
        self,
        n: int = 100,
        freq: str = "D",
        start: Any = None,
    ):
        self.n = n
        self.freq = freq
        self.start = start

        # create time
        self.time = pd.date_range(
            start=start,
            freq=freq,
            periods=n,
        )

        # create the simulated time series
        self.timeseries = np.zeros(self.n)
        
    def sigmoid(x: float):
        return 1 / (1 + np.exp(-10 * x))
    
    def _convert_period(self, period):
        
        return to_offset(period).nanos / 1e9
    
    def _add_component(
        self,
        component_gen: Callable,
        multiply: bool,
        time_scale: Optional[float] = None,
    ):
        timestamps = self.time.values.astype(np.float64) / 1e9
        if time_scale is None:
            time_scale = timestamps[-1] - timestamps[0] + np.finfo(float).eps
        timepoints = (timestamps - timestamps[0]) / time_scale
        component = component_gen(timepoints)

        if multiply:
            self.timeseries *= 1 + component
        else:
            self.timeseries += component

        return self
    
    # 趋势项
    def add_trend(
        self, magnitude: float, trend_type: str = "linear", multiply: bool = False
    ):
        def component_gen(timepoints):
            if trend_type == "sigmoid" or trend_type == "S":
                return magnitude * self.sigmoid(timepoints - 0.5)
            else:  # 'linear' trend by default
                return magnitude * timepoints

        return self._add_component(component_gen, multiply)
    
    # 误差项
    def add_noise(
        self,
        magnitude: float = 1.0,
        lam: float = 0.0, # 偏正态分布中的偏度参数
        multiply: bool = False,
    ):

        def component_gen(timepoints):
            return magnitude*lam/(1+lam**2)**0.5*abs(np.random.randn(len(timepoints)))+magnitude/(1+lam**2)**0.5*np.random.randn(len(timepoints))

        return self._add_component(component_gen, multiply)

    def add_seasonality(
        self,
        magnitude: float = 0.0,
        period: TimedeltaLike = "1D",
        multiply: bool = False,
    ):

        period = self._convert_period(period)

        def component_gen(timepoints):
            return magnitude * np.sin(2 * np.pi * timepoints)

        return self._add_component(component_gen, multiply, time_scale=period)
    
    def trend_shift_sim(
        self,
        random_seed: int = 15,
        cp_arr: Optional[List[int]] = None,
        trend_arr: Optional[List[float]] = None,
        intercept: float = 100.0,
        noise: float = 3.0,
        lam: float = 0.0, # 偏正态分布的偏度系数λ
        seasonal_period: int = 7,
        seasonal_magnitude: float = 3.0,
        anomaly_arr: Optional[List[int]] = None,
        z_score_arr: Optional[List[int]] = None,
    ):

        # initializing the lists inside the function since
        # mutable lists as defaults is bad practice that linter flags
        if cp_arr is None:
            cp_arr = [100]
        if trend_arr is None:
            trend_arr = [3.0, 30.0]
        if anomaly_arr is None:
            anomaly_arr = []
        if z_score_arr is None:
            z_score_arr = []

        # if cp_arr is not sorted, sort it
        cp_arr = sorted(cp_arr)

        # length of trend array should be one larger than cp array
        # so that there is a trend corresponding to every segment
        if len(trend_arr) - len(cp_arr) != 1:
            raise ValueError(
                f"""
                Length of trend array should be one greater than
                cp array. But we got
                cp_arr: {len(cp_arr)},
                trend_arr: {len(trend_arr)}
                """
            )

        if len(cp_arr) > 0 and cp_arr[-1] >= self.n:
            raise ValueError(f"Last cp {cp_arr[-1]} is greater than length {self.n}")

        cp_arr.append(self.n)
        cp_arr.insert(0, 0)

        y_val = np.full(self.n, intercept, dtype=float)

        for i in range(len(cp_arr) - 1):
            cp_begin = cp_arr[i]
            cp_end = cp_arr[i + 1]

            y_val[cp_begin:cp_end] = y_val[cp_begin:cp_end] + trend_arr[i] * np.arange(
                cp_begin, cp_end
            )

            if i > 0:
                delta_val = y_val[cp_begin] - y_val[cp_begin - 1]
                y_val[cp_begin:cp_end] -= delta_val

        # add seasonality
        y_val += seasonal_magnitude * np.sin(
            (np.pi / seasonal_period) * np.arange(self.n)
        )

        # add noise and anomalies
        noise_arr = noise*lam/(1+lam**2)**0.5*abs(np.random.randn(self.n))+noise/(1+lam**2)**0.5*np.random.randn(self.n)
        if len(anomaly_arr) != len(z_score_arr):
            raise ValueError(
                f"""
                Length of anomaly array should be equal to z_score array. But we got
                anomaly_arr: {len(anomaly_arr)},
                z_score_arr: {len(z_score_arr)}
                """
            )
        for arr_idx, y_idx in enumerate(anomaly_arr):
            if y_idx < 0 or y_idx >= self.n:
                raise ValueError(f"Anomaly point {y_idx} is out of range")
            # pyre-fixme[16]: `Sequence` has no attribute `__setitem__`.
            noise_arr[y_idx] = z_score_arr[arr_idx] * noise

        y_val += noise_arr
        
        self.timeseries = y_val

#         ts = pd.Series(index=self.time, data=self.timeseries)

        return self
    
    
    def stl_sim(self):
        ts = pd.Series(index=self.time, data=self.timeseries)
        return ts

In [6]:
# 分割线
main_line = widgets.Output()
main_line.append_stdout(' '*60+'主程序'+' '*60)
trend_line = widgets.Output()
trend_line.append_stdout(' '*60+'趋势项'+' '*60)
period_line = widgets.Output()
period_line.append_stdout(' '*60+'周期项'+' '*60)
noise_line = widgets.Output()
noise_line.append_stdout(' '*60+'误差项'+' '*60)
feature_line = widgets.Output()
feature_line.append_stdout(' '*59+'时序特征'+' '*59)
log_line = widgets.Output()
log_line.append_stdout('打印：')

In [7]:
# 异常
def anomaly(start_point, length, magnitude=1, anomaly_type='突增'):
    if length == 0:
        return [], []
    anomaly_arr = list(range(start_point, start_point+length))
    z_score_arr = list(range(1, 1 + length * abs(magnitude), abs(magnitude)))
    if anomaly_type == '突增':
        z_score_arr = list(map(lambda x: int(x), z_score_arr))
    elif anomaly_type == '突降':
        z_score_arr = list(map(lambda x: int((-1)*x), z_score_arr))
    return anomaly_arr, z_score_arr

anomaly_arr, z_score_arr = [], []
def add_anomaly(a):
    global anomaly_arr, z_score_arr
    new_anomaly_arr, new_z_score_arr = anomaly(np.random.randint(0, n - anomaly_length), anomaly_length, 
                                               magnitude=anomaly_magnitude, anomaly_type=anomaly_type)
    anomaly_arr += new_anomaly_arr
    z_score_arr += new_z_score_arr

def print_anomaly(a):
    global anomaly_arr, z_score_arr
    if anomaly_arr == []:
        print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())))}：没有异常！')
    else:
        print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())))}：异常位置为{anomaly_arr}，对应的异常程度为{z_score_arr}')
    
def del_anomaly(a):
    global anomaly_arr, z_score_arr
    if anomaly_arr == []:
        return
    last_del = None
    for anomaly in reversed(anomaly_arr):
        if last_del == None or last_del - anomaly == 1:
            last_del = anomaly_arr.pop()
            z_score_arr.pop()

# 变点
cp_arr, Trend_arr = [], []
def add_trend(a):
    global cp_arr, Trend_arr
    if cp_arr == []:
        cp_arr.append(np.random.randint(0, n))
        Trend_arr.append(trendly_magnitude)
    else:
        cp_arr.append(np.random.randint(cp_arr[-1], n))
        Trend_arr.append(trendly_magnitude)

def del_trend(a):
    cp_arr.pop()
    Trend_arr.pop()

def print_trend(a):
    if cp_arr == []:
        print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())))}：没有变点！')
    else:
        print(f'{time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time())))}：变点位置为{cp_arr}，对应的变点斜率为{Trend_arr}')

def ts_init(b):
    global sim
    sim = Simulator(n=n, start=start, freq=freq)
    
    period = periods
    period_dic = {'s':1, 'min':60, 'h':3600, 'd':86400, 'w':604800}
    if period == '':
        period = 1
    else:
        period = period_dic[''.join(x for x in period if x.isalpha())]*float(''.join(x for x in period if x.isdigit() or x == '.'))
        period = int(n/((sim.time[-1] - sim.time[0]).value/1e9/period)/2)
    
    trend_arr = [magnitude_trend] + Trend_arr
    sim.trend_shift_sim(cp_arr=cp_arr, trend_arr=trend_arr, intercept=t, 
                        seasonal_period=period, seasonal_magnitude=magnitude_seasonality, 
                        noise=magnitude_noise, lam = lam, 
                        anomaly_arr = anomaly_arr, z_score_arr = z_score_arr)

    plt.figure(figsize=(16,8))
    plt.plot(sim.stl_sim())
    plt.legend(["Value"], fontsize=15, loc='upper right')
    plt.title("The Simulation of Time Series", fontsize=20)
    plt.xlabel("Time", fontsize=12)
    plt.show()

In [8]:
def params(N, Freq, Start, 
           T, Magnitude_trend, 
           Period, Magnitude_seasonality, 
           Magnitude_noise, Lam, 
           Anomaly_length, Anomaly_magnitude, Anomaly_type, 
           Trendly_magnitude):
    
    global n, freq, start # 初始参数
    global t, magnitude_trend # 趋势项参数
    global periods, magnitude_seasonality # 周期项参数
    global magnitude_noise, lam # 误差项参数
    global anomaly_length, anomaly_magnitude, anomaly_type # 异常点参数
    global trendly_magnitude # 变点参数
    
    n, freq, start = N, Freq, Start
    t, magnitude_trend = T, Magnitude_trend
    periods, magnitude_seasonality = Period, Magnitude_seasonality
    magnitude_noise, lam = Magnitude_noise, Lam
    anomaly_length, anomaly_magnitude, anomaly_type = Anomaly_length, Anomaly_magnitude, Anomaly_type
    trendly_magnitude = Trendly_magnitude

plot_button = widgets.Button(
    description='运行',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='plot',
    icon='' # (FontAwesome names without the `fa-` prefix)
)
plot_button.on_click(ts_init)

In [9]:
# 基础参数
n = 300
N = widgets.IntSlider(
    value=300,
    min=20,
    max=2000,
    step=1,
    description='样本个数:',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='d'
)

freq = widgets.Text(
    value='min',
    placeholder='例：5min',
    description='颗粒度：',
    disabled=False,
    continuous_update=False
)

start = widgets.Text(
    value=time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(int(time.time()))),
    placeholder='例：2022-06-06 00:00:00',
    description='起始时间：',
    disabled=False,
    continuous_update=False
)

# 趋势项参数
t = widgets.FloatSlider(
    value=0,
    min=-1e3,
    max=1e3,
    step=0.05,
    description='常数项:',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='.1f',
)

magnitude_trend = widgets.FloatSlider(
    value=0,
    min=-5,
    max=5,
    step=0.01,
    description='趋势斜率:',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='.1f',
)

# 周期项参数
magnitude_seasonality = widgets.FloatSlider(
    value=0,
    min=-100,
    max=100,
    step=0.1,
    description='振幅:',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='.1f',
)

period = widgets.Text(
    value='',
    placeholder='例：5min',
    description='周期:',
    disabled=False,
    continuous_update=False
)

# 误差项参数
magnitude_noise = widgets.FloatSlider(
    value=1,
    min=0,
    max=10,
    step=0.1,
    description='误差波动:',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='.1f',
)

lam = widgets.FloatSlider(
    value=0,
    min=-10,
    max=10,
    step=0.1,
    description='偏度:',
    disabled=False, 
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='.1f',
)

# 异常参数
anomaly_length = widgets.IntSlider(
    value=5,
    min=1,
    max=n,
    step=1,
    description='异常跨度:',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='d'
)
    
anomaly_magnitude = widgets.IntSlider(
    value=1,
    min=1,
    max=15,
    step=1,
    description='异常程度:',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='d'
)

anomaly_type = widgets.Dropdown(
    options=['突增', '突降'],
    value='突增',
    description='异常类型:',
    disabled=False,
)

add_anomaly_button = widgets.Button(
    description='添加异常',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='add_anomaly',
    icon='' # (FontAwesome names without the `fa-` prefix)
)
add_anomaly_button.on_click(add_anomaly)

del_anomaly_button = widgets.Button(
    description='删除上一个异常',
    disabled=False,
    button_style='',
    tooltip='del_anomaly',
    icon=''
)
del_anomaly_button.on_click(del_anomaly)

print_anomaly_button = widgets.Button(
    description='打印异常',
    disabled=False,
    button_style='',
    tooltip='print_anomaly',
    icon=''
)
print_anomaly_button.on_click(print_anomaly)

# 变点参数
trendly_magnitude = widgets.FloatSlider(
    value=0,
    min=-10,
    max=10,
    step=0.05,
    description='变点斜率:',
    disabled=False,
    continuous_update=False,
    orientation='horizontal',
    readout=True,
    readout_format='.2f',
)

add_cp_button = widgets.Button(
    description='添加变点',
    disabled=False,
    button_style='', # 'success', 'info', 'warning', 'danger' or ''
    tooltip='add_anomaly',
    icon='' # (FontAwesome names without the `fa-` prefix)
)
add_cp_button.on_click(add_trend)

del_cp_button = widgets.Button(
    description='删除上一个变点',
    disabled=False,
    button_style='',
    tooltip='del_anomaly',
    icon=''
)
del_cp_button.on_click(del_trend)

print_cp_button = widgets.Button(
    description='打印变点',
    disabled=False,
    button_style='',
    tooltip='print_anomaly',
    icon=''
)
print_cp_button.on_click(print_trend)

init_params = widgets.HBox([N, freq, start])
trend_params = widgets.HBox([t, magnitude_trend])
period_params = widgets.HBox([period, magnitude_seasonality])
noise_params = widgets.HBox([magnitude_noise, lam])
anomaly_params = widgets.HBox([anomaly_type, anomaly_length, anomaly_magnitude])
anomaly_button_params = widgets.HBox([add_anomaly_button, del_anomaly_button, print_anomaly_button])
cp_params = widgets.HBox([trendly_magnitude])
cp_button_params = widgets.HBox([add_cp_button, del_cp_button, print_cp_button])

out = widgets.interactive_output(params, {'N': N, 'Freq': freq, 'Start': start, 
                                          'T': t, 'Magnitude_trend': magnitude_trend, 
                                          'Period': period, 'Magnitude_seasonality': magnitude_seasonality, 
                                          'Magnitude_noise': magnitude_noise, 'Lam': lam, 
                                          'Anomaly_length': anomaly_length, 'Anomaly_magnitude': anomaly_magnitude, 'Anomaly_type': anomaly_type, 
                                          'Trendly_magnitude': trendly_magnitude})

display(main_line, init_params, 
        trend_line, trend_params, 
        period_line, period_params, 
        noise_line, noise_params, 
        feature_line, 
        anomaly_params, anomaly_button_params, 
        cp_params, cp_button_params, 
        plot_button)

Output(outputs=({'output_type': 'stream', 'name': 'stdout', 'text': '                                         …

HBox(children=(IntSlider(value=300, continuous_update=False, description='样本个数:', max=2000, min=20), Text(valu…

Output(outputs=({'output_type': 'stream', 'name': 'stdout', 'text': '                                         …

HBox(children=(FloatSlider(value=0.0, continuous_update=False, description='常数项:', max=1000.0, min=-1000.0, re…

Output(outputs=({'output_type': 'stream', 'name': 'stdout', 'text': '                                         …

HBox(children=(Text(value='', continuous_update=False, description='周期:', placeholder='例：5min'), FloatSlider(v…

Output(outputs=({'output_type': 'stream', 'name': 'stdout', 'text': '                                         …

HBox(children=(FloatSlider(value=1.0, continuous_update=False, description='误差波动:', max=10.0, readout_format='…

Output(outputs=({'output_type': 'stream', 'name': 'stdout', 'text': '                                         …

HBox(children=(Dropdown(description='异常类型:', options=('突增', '突降'), value='突增'), IntSlider(value=5, continuous_…

HBox(children=(Button(description='添加异常', style=ButtonStyle(), tooltip='add_anomaly'), Button(description='删除上…

HBox(children=(FloatSlider(value=0.0, continuous_update=False, description='变点斜率:', max=10.0, min=-10.0, step=…

HBox(children=(Button(description='添加变点', style=ButtonStyle(), tooltip='add_anomaly'), Button(description='删除上…

Button(description='运行', style=ButtonStyle(), tooltip='plot')