In [6]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import math
from scipy.interpolate import CubicSpline
from itertools import product
from sklearn.metrics import r2_score
import csv
from vmd import VMD

In [7]:
# adjusted R-squared
def r2_score_adjusted(y, y_pred, featurecount):
    n = len(y)
    return 1 - (1-r2_score(y, y_pred)) * (n-1)/(n-featurecount-1)

# AIC
def calculate_aic(y, y_pred, k):
    sse = np.sum((y - y_pred) ** 2)
    n = len(y)
    aic = n * np.log(sse / n) + 2 * k
    return aic

# BIC
def calculate_bic(y, y_pred, k):
    sse = np.sum((y - y_pred) ** 2)
    n = len(y)
    bic = n * np.log(sse / n) + k * np.log(n)
    return bic

# импорт временного ряда

In [8]:
# linux path
path1 = './'
path2 = '/'

# windows path
# path1 = '.\\'
# path2 = '\\'

In [9]:
seasonsTS1 = ("s2", "s6", "s13")
seasonsTS2 = ("s11", "s12")
trendsTS = ("t6", "t6", "t6", "t6", "t6")
noisesTS = ("n1", "n1", "n1", "n1", "n1")

cases = ("t6_n1_s2", "t6_n1_s6", "t6_n1_s13", "t6_n1_s2_s11", "t6_n1_s2_s12")

In [None]:
Trend = list()
for trend_i in trendsTS:
    t = pd.read_csv(f"{path1}Components{path2}{trend_i}.csv", decimal=',')
    trend = pd.DataFrame(t, dtype=float)
    Trend.append(trend['Value'])
Component_of_TS = Trend[::]
Component_name = "Trend"
range_of_imfs = range(5)

Noise = list()
for noise_i in noisesTS:
    n = pd.read_csv(f"{path1}Components{path2}{noise_i}.csv", decimal=',')
    noise = pd.DataFrame(n, dtype=float)
    Noise.append(noise['Value'])
Component_of_TS = Noise[::]
Component_name = "Noise"
range_of_imfs = range(10, 7, -1)

Season = list()
for season_i in seasonsTS1:
    s = pd.read_csv(f"{path1}Components{path2}{season_i}.csv", decimal=',')
    season = pd.DataFrame(s, dtype=float)
    Season.append(season['Value'])

s2 = pd.read_csv(f"{path1}Components{path2}s2.csv", decimal=',')
season2 = pd.DataFrame(s2, dtype=float)
for season_i in seasonsTS2:
    s = pd.read_csv(f"{path1}Components{path2}{season_i}.csv", decimal=',')
    season = pd.DataFrame(s, dtype=float)
    Season.append(season2['Value']+season['Value'])
Component_of_TS = Season[::]
Component_name = "Season"
range_of_imfs = range(2,7)


In [13]:
index = list()
TS = list()

data = pd.read_csv(f"{path1}Cases{path2}{cases[0]}.csv", decimal=',')
df = pd.DataFrame(data, dtype=float)
index = df.index

for case_i in cases:
    data = pd.read_csv(f"{path1}Cases{path2}{case_i}.csv", decimal=',')
    df = pd.DataFrame(data, dtype=float)
    TS.append(df.values)


# тестовый анализ

In [None]:
alpha = (1000, 2000)
init = 1
DC = (False, True)
K = (5, 7)
tau = 0            #BY DEFAULT  
tol = 1e-6         #BY DEFAULT

u, u_hat, omega = VMD(TS[0], alpha[0], tau, K[0], DC[1], init, tol)

featurecount = 6

In [None]:
best_metriks = {"Time Series": f"TS0", "R2": -np.inf}
best_r2_params = {"Time Series": f"TS0", "Metric": "R2", "alpha": 0, "tau": tau, "K": 3, "DC": False, "init": 0, "tol": tol, "IMFs" : "1"}


In [None]:
TS[0]

In [None]:
np.shape(Trend)

In [None]:
np.shape(TS)

In [None]:
r2_list = list()

for params_set in list(product(alpha, K, DC)):
    u, u_hat, omega = VMD(TS[0], params_set[0], tau, params_set[1], params_set[2], init, tol)
    # best_params = [{"Time Series": f"t{ts_i+1}" ,"alpha": params_set[0], "tau": tau, "K": params_set[1], "DC": params_set[2], "init": params_set[3], "tol": tol}]
    
    k_i = params_set[1]
    r2 = list()
    sum_of_imfs = 0
    for i_imfs in range(k_i):
        sum_of_imfs += u[i_imfs].T

        r2.append(r2_score(Trend[0], sum_of_imfs))


    max_r2 = np.max(r2)
    argmax_r2 = np.argmax(r2)

    r2_list.append([max_r2, argmax_r2])

r2_list = np.transpose(r2_list)
max_r2 = np.max(r2_list[0])
argmax_r2 = np.argmax(r2_list[0])
i_imfs = int(r2_list[1][argmax_r2])
if (max_r2 > best_metriks["R2"]):
    best_metriks["R2"] = max_r2

    best_r2_params["alpha"] = params_set[0]
    best_r2_params["K"] = params_set[1]
    best_r2_params["DC"] = params_set[2]
    best_r2_params["IMFs"] = f'1 - {i_imfs+1}'


In [None]:
best_metriks

In [None]:
best_r2_params

In [None]:
best_metriks

In [None]:
best_r2_params

In [None]:
plt.figure(figsize=(15,12))
plt.subplot(2,1,1)
plt.plot(index[0], TS[0])
plt.title('Time Series')
plt.xlabel('X')

In [None]:
plt.figure(figsize=(15,12))
plt.subplot(2,1,1)
plt.plot(index[0], Trend[0])
plt.title('Trend')
plt.xlabel('X')

In [None]:
featurecount

In [None]:
r2_score(Trend[0]["Value"], u[0].T)

In [None]:
plt.figure(figsize=(15,12))
plt.subplot(2,1,1)
plt.plot(index[0], Season)
plt.title('Season')
plt.xlabel('X')

In [None]:
# plt.figure(figsize=(15,12))
# plt.subplot(2,1,1)
# plt.plot(index[0], TS[0])

fig, axs = plt.subplots(figsize=(12,10), nrows=K)

# axs[0].set(xlabel = 'date', ylabel='C')
# plt.subplot(2,2,1)
for i in range(K):
    axs[i].plot(index[0], u[i].T)
    axs[i].set_title(f'IMF {i+1}')

plt.title('IMF')
plt.xlabel('X')
plt.ylabel('Y')
# plt.legend(['Оригинальный сигнал', 'Нижняя огибающая', 'Верхняя огибающая', 'Средняя огибающих'])

plt.tight_layout()

In [None]:
# plt.figure(figsize=(15,12))
# plt.subplot(2,1,1)
# plt.plot(index[0], TS[0])

fig, axs = plt.subplots(figsize=(12,10), nrows=K-1)

# axs[0].set(xlabel = 'date', ylabel='C')
# plt.subplot(2,2,1)
for i in range((K-1)):
    axs[i].plot(index[0], u[i].T + u[i+1].T)
    axs[i].set_title(f'IMF {i+1} + IMF {i+2}')

# plt.title('IMF')
plt.xlabel('X')
plt.ylabel('Y')
# plt.legend(['Оригинальный сигнал', 'Нижняя огибающая', 'Верхняя огибающая', 'Средняя огибающих'])

plt.tight_layout()

# декомпозиция временного ряда

In [None]:
alpha = (0, 500, 1000, 5000, 10000, 20000)
init = (0, 1, 2)
DC = (False, True)
K = 10

tau = 0            #BY DEFAULT  
tol = 1e-6         #BY DEFAULT

featurecount = 6

In [None]:
best_metriks = np.zeros(len(TS), dtype=dict)

best_r2_params = np.zeros(len(TS), dtype=dict)
best_r2_adj_params = np.zeros(len(TS), dtype=dict)
best_aic_params = np.zeros(len(TS), dtype=dict)
best_bic_params = np.zeros(len(TS), dtype=dict)

for i in range(len(TS)):
    best_metriks[i] = {"Time Series": f"TS{i+1}", "R2": -np.inf, "R2_Adj": -np.inf, "AIC" : np.inf, "BIC": np.inf}
    
    best_r2_params[i] = ({"Time Series": f"TS{i+1}", "Metric": "R2", "alpha": 0, "tau": tau, "K": 3, "DC": False, "init": 0, "IMFs" : "1"}) 
    best_r2_adj_params[i] = ({"Time Series": f"TS{i+1}", "Metric": "R2_Adj", "alpha": 0, "tau": tau, "K": 3, "DC": False, "init": 0, "IMFs" : "1"}) 
    best_aic_params[i] = ({"Time Series": f"TS{i+1}", "Metric": "AIC", "alpha": 0, "tau": tau, "K": 3, "DC": False, "init": 0, "IMFs" : "1"}) 
    best_bic_params[i] = ({"Time Series": f"TS{i+1}", "Metric": "BIC", "alpha": 0, "tau": tau, "K": 3, "DC": False, "init": 0, "IMFs" : "1"}) 



search of alpha

In [None]:
# для всех лчм
for ts_i in range(len(TS)):

    j=0
    r2_list = np.zeros((len(alpha), 2), dtype=float)
    r2_adj_list = np.zeros((len(alpha), 2), dtype=float)
    aic_list = np.zeros((len(alpha), 2), dtype=float)
    bic_list = np.zeros((len(alpha), 2), dtype=float)
    # для всех наборов параметров
    for alpha_i in alpha:
        u, u_hat, omega = VMD(TS[ts_i], alpha_i, tau, K, DC, init, tol)
        
        r2 = list()
        r2_adj = list()
        aic = list()
        bic = list()
        sum_of_imfs = 0
        for i_imfs in range_of_imfs:
            sum_of_imfs += u[i_imfs].T

            r2.append(r2_score(Component_of_TS, sum_of_imfs))
            r2_adj.append(r2_score_adjusted(Component_of_TS, sum_of_imfs, featurecount))
            aic.append(calculate_aic(Component_of_TS, sum_of_imfs, featurecount))
            bic.append(calculate_bic(Component_of_TS, sum_of_imfs, featurecount))

        r2_list[j] = [np.max(r2), np.argmax(r2)]
        r2_adj_list[j] = [np.max(r2_adj), np.argmax(r2_adj)]
        aic_list[j] = [np.min(aic), np.argmin(aic)]
        bic_list[j] = [np.min(bic), np.argmin(bic)]

        j+=1

    r2_list = np.transpose(r2_list)
    max_r2 = np.max(r2_list[0])
    argmax_r2 = np.argmax(r2_list[0])
    i_imfs = int(r2_list[1][argmax_r2])
    if (max_r2 > best_metriks[ts_i]["R2"]):
        best_metriks[ts_i]["R2"] = max_r2
        best_r2_params[ts_i]["alpha"] = alpha[argmax_r2]

    r2_adj_list = np.transpose(r2_adj_list)
    max_r2_adj = np.max(r2_adj_list[0])
    argmax_r2_adj = np.argmax(r2_adj_list[0])
    i_imfs = int(r2_adj_list[1][argmax_r2_adj])
    if (max_r2_adj > best_metriks[ts_i]["R2_Adj"]):
        best_metriks[ts_i]["R2_Adj"] = max_r2_adj
        best_r2_adj_params[ts_i]["alpha"] = alpha[argmax_r2_adj]

    aic_list = np.transpose(aic_list)
    min_aic = np.min(aic_list[0])
    argmin_aic = np.argmin(aic_list[0])
    i_imfs = int(aic_list[1][argmin_aic])
    if (min_aic < best_metriks[ts_i]["AIC"]):
        best_metriks[ts_i]["AIC"] = min_aic
        best_aic_params[ts_i]["alpha"] = alpha[argmin_aic]

    bic_list = np.transpose(bic_list)
    min_bic = np.min(bic_list[0])
    argmin_bic = np.argmin(bic_list[0])
    i_imfs = int(bic_list[1][argmin_bic])
    if (min_bic < best_metriks[ts_i]["BIC"]):
        best_metriks[ts_i]["BIC"] = min_bic
        best_bic_params[ts_i]["alpha"] = alpha[argmin_bic]


  return np.dot(freqs[T//2:T],(abs(u_hat_plus[n+1, T//2:T, k])**2))/np.sum(abs(u_hat_plus[n+1,T//2:T,k])**2)
  return (f_hat_plus - sum_uk - lambda_hat[n,:]/2)/(1.+Alpha[k]*(freqs - omega_plus[n,k])**2)
  return np.dot(freqs[T//2:T],(abs(u_hat_plus[n+1, T//2:T, k])**2))/np.sum(abs(u_hat_plus[n+1,T//2:T,k])**2)
  return (f_hat_plus - sum_uk - lambda_hat[n,:]/2)/(1.+Alpha[k]*(freqs - omega_plus[n,k])**2)
  return np.dot(freqs[T//2:T],(abs(u_hat_plus[n+1, T//2:T, k])**2))/np.sum(abs(u_hat_plus[n+1,T//2:T,k])**2)
  return (f_hat_plus - sum_uk - lambda_hat[n,:]/2)/(1.+Alpha[k]*(freqs - omega_plus[n,k])**2)
  return np.dot(freqs[T//2:T],(abs(u_hat_plus[n+1, T//2:T, k])**2))/np.sum(abs(u_hat_plus[n+1,T//2:T,k])**2)
  return (f_hat_plus - sum_uk - lambda_hat[n,:]/2)/(1.+Alpha[k]*(freqs - omega_plus[n,k])**2)
  return np.dot(freqs[T//2:T],(abs(u_hat_plus[n+1, T//2:T, k])**2))/np.sum(abs(u_hat_plus[n+1,T//2:T,k])**2)
  return (f_hat_plus - sum_uk - lambda_hat[n,:]/2)/(1.+Alpha[k]*(freqs - omega_

# метрики точности и параметры с выводом

In [None]:
filename = f"{path1}Output{path2}Best_Metriks-{Component_name}.csv"

fields = best_metriks[0].keys()
fields = list(fields)

with open(filename, mode='w', newline='') as file:
    writer = csv.DictWriter(file, fieldnames=fields)
    writer.writeheader()  # Write header row
    for ts_i in range(len(TS)):
        writer.writerows([best_metriks[ts_i]])  # Write data rows

In [None]:
filename = f"{path1}Output{path2}Parameters-{Component_name}.csv"

fields = best_r2_params[0].keys()
fields = list(fields)

with open(filename, mode='w', newline='') as file:
    writer = csv.DictWriter(file, fieldnames=fields)
    writer.writeheader()  # Write header row
    for ts_i in range(len(TS)):
        writer.writerows([best_r2_params[ts_i]])  # Write data rows
        writer.writerows([best_r2_adj_params[ts_i]])  # Write data rows
        writer.writerows([best_aic_params[ts_i]])  # Write data rows
        writer.writerows([best_bic_params[ts_i]])  # Write data rows

