# Extract Long-Term & Seasonal Components from Time Series

This script separates out the long-term and seasonal components from a time series using a Singular Spectrum Analysis (SSA). If done post-iterpolation via ALPS (recommended), input data is provided as a .csv file without headers with three columns of data: decimal year, interpolated data, date in YYYYMMDD format.

Source: Elsner, J. B. and Tsonis, A. A.: Singular spectrum analysis: a new tool in time series analysis, Springer Science & Business Media, 2013.

In [1]:
# depencencies

import os
import numpy as np
import pandas as pd
from pyts.decomposition import SingularSpectrumAnalysis

In [None]:
# source directories

input_directory = "/Users/.../interpolated/"
output_directory = "/Users/.../SSA/"
os.makedirs(output_directory, exist_ok=True)
files = sorted([f for f in os.listdir(input_directory) if f.endswith("slope_interpolated.csv")])

In [None]:
# helper functions

def my_fft(signal, delta_t, n):
    freqs = np.fft.fftfreq(n, d=delta_t)
    fft_vals = np.fft.fft(signal)
    return freqs[:n // 2], fft_vals[:n // 2]

def classify_by_frequency(X_ssa, freq_threshold=0.05, max_seasonal_components=10, amplitude_ratio_threshold=0.1):
    longterm_indices = []
    seasonal_indices = []

    # Compute the amplitude (range) of each component
    amplitudes = [X_ssa[i, :].max() - X_ssa[i, :].min() for i in range(X_ssa.shape[0])]
    max_amplitude = amplitudes[0]  # Assume the first component is long-term and has the largest amplitude

    for i in range(X_ssa.shape[0]):
        xf, yf = my_fft(X_ssa[i, :], 1 / 365, len(X_ssa[i, :]))
        max_freq = xf[np.argmax(2.0 / len(X_ssa[i, :]) * np.abs(yf))]
        relative_amplitude = amplitudes[i] / max_amplitude  # Relative amplitude compared to component 0

        if np.abs(max_freq) < freq_threshold:
            # Classify as long-term if frequency is very low
            longterm_indices.append(i)
        elif (np.abs(max_freq - 1) < 0.2 or np.abs(max_freq - 2) < 0.2) and relative_amplitude > amplitude_ratio_threshold:
            # Classify as seasonal if frequency is close to 1 or 2 and relative amplitude is significant
            seasonal_indices.append(i)

    # Limit seasonal indices to the top `max_seasonal_components` based on their order
    seasonal_indices = seasonal_indices[:max_seasonal_components]

    # Ensure some seasonal indices are defined if none exist
    if not seasonal_indices:
        max_longterm_index = max(longterm_indices) if longterm_indices else -1
        seasonal_indices = [i for i in range(max_longterm_index + 1, 11)]

    return longterm_indices, seasonal_indices

def extract_components(pd_series, X_ssa, indices):
    component = pd.Series(np.sum(X_ssa[indices, :], axis=0), index=pd_series.index)
    return component

In [None]:
# Iterate over each file, one per glacierid
for file_name in files:
    # Extract glacier ID from file name
    glacierid = file_name.split("_")[0]
    
    # Define input and output file paths
    input_file_path = os.path.join(input_directory, file_name)
    output_file_path = os.path.join(output_directory, f"{glacierid}_variable_SSA.csv")
    
    # Load data
    data = np.loadtxt(input_file_path, delimiter=',')
    date = pd.to_datetime(data[:, 2], format='%Y%m%d', errors='ignore')
    datetime_index = pd.DatetimeIndex(date)
    pd_series = pd.Series(data[:, 1], index=datetime_index).sort_index()
    pd_series = pd_series[~pd_series.index.duplicated()]
    print('Working on glacier: ' + glacierid)
    
    # Filter data to only include chosen date range
    start_date = "2000-01-01"
    end_date = "2020-05-01"
    filtered_pd_series = pd_series[start_date:end_date]

    # Resample and interpolate to daily frequency for missing dates (leap days etc)
    daily_pd_series = filtered_pd_series.resample('1D').interpolate(method='linear')

    # Perform SSA
    window_size = 365  # One year
    ssa = SingularSpectrumAnalysis(window_size=window_size)
    X_ssa = ssa.fit_transform(daily_pd_series.values.T[np.newaxis, :])

    # Classify components
    longterm_indices, seasonal_indices = classify_by_frequency(X_ssa)

    # Extract long-term and seasonal components
    longterm = extract_components(daily_pd_series, X_ssa, longterm_indices)
    seasonal = extract_components(daily_pd_series, X_ssa, seasonal_indices)

    # Save data (limited to the filtered date range)
    data_save = np.array((
        daily_pd_series.index.to_julian_date(),  # Decimal Date (adjusted to daily index within range)
        daily_pd_series.values,  # Interpolated values (filtered range)
        daily_pd_series.index.strftime('%Y%m%d').astype(int),  # Integer Date
        longterm.values,  # Long-term Component (filtered range)
        seasonal.values   # Seasonal Component (filtered range)
    ))

    # Transpose the array to match the expected format
    data_save = data_save.T

    # Save to the output file
    np.savetxt(output_file_path, data_save, delimiter=",", header="DecimalDate,Value,IntegerDate,LongTerm,Seasonal", comments="")