# Setup

In [1]:
import pandas as pd
import matplotlib.pyplot as plt
import os
import glob
import json
import shutil
import requests
import datetime
import time
from io import StringIO
import sys
from sklearn.preprocessing import MinMaxScaler
from tqdm import tqdm
import numpy as np
from numpy.lib.stride_tricks import sliding_window_view
from concurrent.futures import ThreadPoolExecutor, as_completed
from darts.datasets import AirPassengersDataset, ETTh1Dataset, ETTh2Dataset, ETTm1Dataset, ETTm2Dataset, ElectricityDataset, EnergyDataset, ExchangeRateDataset, ILINetDataset, TaylorDataset, TrafficDataset, USGasolineDataset, UberTLCDataset, WeatherDataset
print("success")

success


In [2]:
np.random.seed(42)

In [3]:
if os.getcwd()[-9:] == "notebooks":
    os.chdir(os.path.dirname(os.getcwd()))
os.getcwd()

'/home/koos/Documents/timeseries_transfer_learning'

In [4]:
def drop_nan_at_ends(df):
    first_idx = df.first_valid_index()
    last_idx = df.last_valid_index()
    return df.loc[first_idx:last_idx]


# Stocks

In [None]:
input_directory = 'data/raw_download/stocks'
output_directory = 'data/processed/stocks'
test_directory = "data/processed/test"

if not os.path.exists(output_directory):
    os.makedirs(output_directory)
    print("created output_directory")
    
if not os.path.exists(test_directory):
    os.makedirs(test_directory)
    print("created test_directory")

In [None]:
def has_movement(df):
    return bool((df.max() - df.min()).sum())

NAN_THRESHOLD_STOCKS = 0.005
def test_nan_acceptable(df):
    return ((df.isna().sum().sum()/4)/len(df)) <= NAN_THRESHOLD_STOCKS
    
def test_high_low_logical(df):
    return all(((df["High"].round(4) >= df["Open"].round(4)).all(),
                (df["High"].round(4) >= df["Low"].round(4)).all(),
                (df["High"].round(4) >= df["Close"].round(4)).all(),
                (df["Low"].round(4) <= df["Open"].round(4)).all(),
                (df["Low"].round(4) <= df["Close"].round(4)).all()))

def test_iliquid(df):
    return ((df == df.shift(1)) &
            (df == df.shift(2)) &
            (df == df.shift(3)) &
            (df == df.shift(4)) &
            (df == df.shift(5)) &
            (df == df.shift(6)) &
            (df == df.shift(7)) &
            (df == df.shift(8)) &
            (df == df.shift(9)) &
            (df == df.shift(10)) &
            (df == df.shift(11)) &
            (df == df.shift(12)) &
            (df == df.shift(13)) &
            (df == df.shift(14)) &
            (df == df.shift(15)) &
            (df == df.shift(16)) &
            (df == df.shift(17)) &
            (df == df.shift(18)) &
            (df == df.shift(19)) &
            (df == df.shift(20)) &
            (df == df.shift(21)) &
            (df == df.shift(22)) &
            (df == df.shift(23)) &
            (df == df.shift(24)) &
            (df == df.shift(25)) &
            (df == df.shift(26)) &
            (df == df.shift(27)) &
            (df == df.shift(28)) &
            (df == df.shift(29)) &
            (df == df.shift(30)) &
            (df == df.shift(31)) &
            (df == df.shift(32)) &
            (df == df.shift(33)) &
            (df == df.shift(34)) &
            (df == df.shift(35))
            ).any().any()

def test_df(df):
    """
    return True if df quality is acceptable
    """
    
    if len(df) < 200:# dataframe is too short
        return False, "length"
    
    if (df == 0).any().any():# dataframe contains a zero, price cant be zero
        return False, "zero"
    
    if (df.pct_change()[1:] >= 2.5).any().any():# movement is unrealistic, drops or increases too much
        return False, "unrealistic_movement"
    
    if not test_high_low_logical(df):# high is lower than other or low is higher than other
        return False, "high_low"
    
    if test_iliquid(df):
        return False, "illiquid"
    
    
    return True, None

In [None]:
csv_files = glob.glob(os.path.join(input_directory, '*.csv'))
np.random.shuffle(csv_files)
len(csv_files)

In [None]:
errors = {"length": 0,
          "zero": 0,
          "no_movement": 0,
          "nan": 0,
          "high_low": 0,
          "unrealistic_movement": 0,
          "illiquid": 0
         }

for i in tqdm(range(len(csv_files))):
    file = csv_files[i]
    ticker = file[25:-4]
    try:
        df = pd.read_csv(file)[["Open", "High", "Low", "Close"]]
    except pd.errors.ParserError:
        print("parseerror for ticker", ticker)
        continue
    
    if not test_nan_acceptable(df):
        errors["nan"] += 1
        continue
    
    df = df.ffill()
    
    test, error = test_df(df)
    if not test:
        errors[error] += 1
        continue

    for col in ["Close"]:
        df_ = df[[col]]


        df_.to_parquet(f"data/processed/stocks/{ticker}-{col}.parquet", engine='pyarrow')

print("Processing complete.", errors)
print("n_files:", len(os.listdir("data/processed/stocks")), len(os.listdir("data/processed/stocks")) == 35873+6330)
print("errors:", sum(errors.values()))

In [None]:

# Get a list of files in the stocks folder
files_in_stocks = os.listdir(output_directory)

# Choose a random sample of 20 files from the stocks folder
random_files = np.random.choice(files_in_stocks, int(0.15*len(files_in_stocks)), replace=False)

# Move the selected files to the test folder
for file_name in random_files:
    file_path = os.path.join(output_directory, file_name)
    destination_path = os.path.join(test_directory, file_name)
    shutil.move(file_path, destination_path)
    #print(file_path, destination_path)
print(f"Moved files")


# Weather

In [None]:
def get_anualized_std_weather(df, periods_in_year=252):
    daily_std = np.std(np.array(df))
    annualized_std = daily_std * np.sqrt(periods_in_year)
    return annualized_std

def add_missing_dates(df):
    #df['ELEMENT'] = pd.to_datetime(df['DATE'])
    #df.set_index('ELEMENT', inplace=True)
    df.sort_index()

    date_range = pd.date_range(start=df.index.min(), end=df.index.max())
    df = df.reindex(date_range)
    return df

In [None]:
# Metadata specs #

metadata_col_specs = [
    (0,  12),
    (12, 21),
    (21, 31),
    (31, 38),
    (38, 41),
    (41, 72),
    (72, 76),
    (76, 80),
    (80, 86)
]

metadata_names = [
    "ID",
    "LATITUDE",
    "LONGITUDE",
    "ELEVATION",
    "STATE",
    "NAME",
    "GSN FLAG",
    "HCN/CRN FLAG",
    "WMO ID"]

metadata_dtype = {
    "ID": str,
    "STATE": str,
    "NAME": str,
    "GSN FLAG": str,
    "HCN/CRN FLAG": str,
    "WMO ID": str
    }


# Data specs #

data_header_names = [
    "ID",
    "YEAR",
    "MONTH",
    "ELEMENT"]

data_header_col_specs = [
    (0,  11),
    (11, 15),
    (15, 17),
    (17, 21)]

data_header_dtypes = {
    "ID": str,
    "YEAR": int,
    "MONTH": int,
    "ELEMENT": str}

data_col_names = [[
    "VALUE" + str(i + 1),
    "MFLAG" + str(i + 1),
    "QFLAG" + str(i + 1),
    "SFLAG" + str(i + 1)]
    for i in range(31)]
# Join sub-lists
data_col_names = sum(data_col_names, [])

data_replacement_col_names = [[
    ("VALUE", i + 1),
    ("MFLAG", i + 1),
    ("QFLAG", i + 1),
    ("SFLAG", i + 1)]
    for i in range(31)]
# Join sub-lists
data_replacement_col_names = sum(data_replacement_col_names, [])
data_replacement_col_names = pd.MultiIndex.from_tuples(
    data_replacement_col_names,
    names=['VAR_TYPE', 'DAY'])

data_col_specs = [[
    (21 + i * 8, 26 + i * 8),
    (26 + i * 8, 27 + i * 8),
    (27 + i * 8, 28 + i * 8),
    (28 + i * 8, 29 + i * 8)]
    for i in range(31)]
data_col_specs = sum(data_col_specs, [])

data_col_dtypes = [{
    "VALUE" + str(i + 1): int,
    "MFLAG" + str(i + 1): str,
    "QFLAG" + str(i + 1): str,
    "SFLAG" + str(i + 1): str}
    for i in range(31)]
data_header_dtypes.update({k: v for d in data_col_dtypes for k, v in d.items()})


# Reading functions #

def read_station_metadata(filename="ghcnd-stations.txt"):
    """Reads in station metadata

    :filename: ghcnd station metadata file.
    :returns: station metadata as a pandas Dataframe

    """
    df = pd.read_fwf(filename, metadata_col_specs, names=metadata_names,
                     index_col='ID', dtype=metadata_dtype)

    return df


def read_ghcn_data_file(filename=None,
                        variables=None, include_flags=False,
                        dropna='all'):
    """Reads in all data from a GHCN .dly data file

    :param filename: path to file
    :param variables: list of variables to include in output dataframe
        e.g. ['TMAX', 'TMIN', 'PRCP']
    :param include_flags: Whether to include data quality flags in the final output
    :returns: Pandas dataframe
    """

    df = pd.read_fwf(
        filename,
        colspecs=data_header_col_specs + data_col_specs,
        names=data_header_names + data_col_names,
        index_col=data_header_names,
        dtype=data_header_dtypes
        )


    df.columns = data_replacement_col_names

    df = df.stack(level='DAY').unstack(level='ELEMENT')

    if dropna:
        df.replace(-9999.0, np.nan, inplace=True)
        df.dropna(how=dropna, inplace=True)

    # replace the entire index with the date.
    # This loses the station ID index column!
    # This will usuall fail if dropna=False, since months with <31 days
    # still have day=31 columns
    df.index = pd.to_datetime(
        df.index.get_level_values('YEAR') * 10000 +
        df.index.get_level_values('MONTH') * 100 +
        df.index.get_level_values('DAY'),
        format='%Y%m%d')

    return df["VALUE"]


In [None]:
with open('data/noaa_stations.json', 'r') as file:
    stations = json.load(file)

In [None]:
input_directory = "data/raw_download/weather"
output_directory = 'data/processed/weather'

if not os.path.exists(output_directory):
    os.makedirs(output_directory)
    print("created output directory")

In [None]:
NAN_THRESHOLD = 0.01
WRONG_THRESHOLD = 0.03
MIN_LENGTH = 200 # can be different for weather and stocks



for file in tqdm(os.listdir(input_directory)):
    station = file[:-4]
    df = read_ghcn_data_file(f"data/raw_download/weather/{file}")
    df = add_missing_dates(df)
    best_len = 0
    saved_df = False
    
    
    for col in ("TMIN", "TMAX", "TAVG"):
        if not col in df.columns:
            # column not in data for this station
            continue

        df_ = df[[col]]
        df_ = drop_nan_at_ends(df_)

        if len(df_) < MIN_LENGTH:
            continue
        
        
        nan_percentage = float(df_.isna().sum().sum()/len(df_))
        if nan_percentage > NAN_THRESHOLD:
            continue
        df_ = df_.interpolate()
        
        value_range = (df_.max()[0] - df_.min()[0])
        rolling_average = df_[col].rolling(window=10).mean()
        condition = abs(df_[col] - rolling_average) >= 0.2 * value_range
        df_.loc[condition, col] = float("nan") 
        
        
        wrong_data_percentage = float(df_.isna().sum().sum()/len(df_))
        
        if wrong_data_percentage > WRONG_THRESHOLD:
            continue
        elif not df[col].max() and not df[col].min():
            # all entries are zero, e.g. snow in africa
            continue
        df_ = df_.interpolate()
        

        
        if len(df_) > best_len:
            best_len = len(df_)
            col_save = col
            df_save = df_
            saved_df = True
    

    if saved_df:
        output_file = f"data/processed/weather/{station}-{col_save}.parquet"
        df_save.to_parquet(output_file, engine='pyarrow')


# All Darts Datasets

In [5]:
output_directory = 'data/processed/darts'

if not os.path.exists(output_directory):
    os.makedirs(output_directory)
    print("created output directory")

In [6]:
def moving_average(a, n):
    if isinstance(a, pd.DataFrame):
        a = np.array(a)
    
    ret = np.cumsum(a, dtype=float)
    ret[n:] = ret[n:] - ret[:-n]
    return pd.DataFrame(ret[n - 1:] / n)

def save_timeseries(timeseries, name, moving_average_window=None):
    if isinstance(timeseries, pd.DataFrame):
        df = timeseries
    elif isinstance(timeseries, pd.Series):
        df = timeseries.to_frame()
    else:
        df = timeseries.pd_dataframe()
    df = pd.DataFrame(np.array(df))

    nan_percent = float(df.isna().sum().sum()/len(df))
    if nan_percent > 0.01:
        print(f"didnt save {name} because of nan")
        return
    df = df.interpolate()
    df = drop_nan_at_ends(df)
    if moving_average_window is not None:
        df = moving_average(df, moving_average_window)
    
    df.to_parquet(output_directory + f"/{name}.parquet")

In [None]:
data = ETTh1Dataset().load()["OT"]
save_timeseries(data, "ETTh1Dataset", 20)
print(ETTh1Dataset)
data = ETTh2Dataset().load()["OT"]
save_timeseries(data, "ETTh2Dataset", 20)
print(ETTh2Dataset)
data = ETTm1Dataset().load()["OT"]
save_timeseries(data, "ETTm1Dataset")
print(ETTm1Dataset)
data = ETTm2Dataset().load()["OT"]
save_timeseries(data, "ETTm2Dataset")
print(ETTm2Dataset)
dataset = ElectricityDataset(multivariate=False).load()
for col in dataset:
    name = list(col.columns)[0]
    save_timeseries(col, f"ElectricityDataset_{name}", 20)
dataset = EnergyDataset().load().pd_dataframe()
for col in ["generation biomass", "generation fossil gas", "generation fossil oil", "generation other renewable", "generation waste", "total load forecast", "generation fossil hard coal", "generation hydro run-of-river and poundage", "generation other", "generation wind onshore"]:
    timeseries = dataset[col]
    save_timeseries(timeseries, f"EnergyDataset_{col}")
print(EnergyDataset)

dataset = ExchangeRateDataset(multivariate=False).load()
for i, timeseries in enumerate(dataset):
    if i == 4:
        continue
    save_timeseries(timeseries, f"ExchangeRateDataset_{i}")
print(ExchangeRateDataset)
dataset = ILINetDataset(multivariate=False).load()
for i, timeseries in enumerate(dataset):
    if i in [3, 4, 6]:
        continue
    save_timeseries(timeseries, f"ILINetDataset_{i}", 20)
print(ILINetDataset)
dataset = TrafficDataset().load().pd_dataframe()
for col in dataset.columns:
    timeseries = dataset[col]
    save_timeseries(timeseries, f"TrafficDataset_{col}", 40)
print(TrafficDataset)
dataset = USGasolineDataset().load().pd_dataframe()
save_timeseries(dataset, "USGasolineDataset", 10)
print(USGasolineDataset)

# Only Exchange Rates

In [7]:
output_directory = 'data/processed/fx'

if not os.path.exists(output_directory):
    os.makedirs(output_directory)
    print("created output directory")

created output directory


In [8]:
dataset = ExchangeRateDataset(multivariate=False).load()
for i, timeseries in enumerate(dataset):
    if i == 4:
        continue
    save_timeseries(timeseries, f"ExchangeRateDataset_{i}")

# Make Plots from Data

In [None]:
files_stock = glob.glob(os.path.join("data/processed/stocks/", '*.parquet')) + glob.glob(os.path.join("data/processed/test/", '*.parquet'))
files_weather = glob.glob(os.path.join("data/processed/weather/", '*.parquet'))
files_darts = glob.glob(os.path.join("data/processed/darts/", '*.parquet'))

In [None]:
def show(files):
    for i in range(10):
        file = np.random.choice(files)
        df = pd.read_parquet(file)[:1000]
        
        df = np.array(df)
        
        #plt.plot(df)
        #plt.plot(moving_average(np.array(df), 10))
        #plt.title(str(file).split("\\")[-1][:-8])
        #plt.title(file)
        #plt.show()

show(files_darts)


# Show Data Split by type

In [None]:
n_stock_dp, n_weather_dp, n_darts_dp = 0, 0, 0
for file in tqdm(files_stock):
    n_stock_dp += len(pd.read_parquet(file))
print(f"Stock timeseries: {len(files_stock)} stocks\nMean length: {int(n_stock_dp/len(files_stock))} workdays\nTotal datapoints: {round(n_stock_dp/1_000_000, 2)} million\n")

for file in tqdm(files_weather):
    n_weather_dp += len(pd.read_parquet(file))
print(f"Weather timeseries: {len(files_weather)} stations\nMean length: {int(n_weather_dp/len(files_weather))} days\nTotal datapoints: {round(n_weather_dp/1_000_000, 2)} million\n")

for file in tqdm(files_darts):
    n_darts_dp += len(pd.read_parquet(file))
print(f"Darts timeseries: {len(files_darts)} measurements\nMean length: {int(n_darts_dp/len(files_darts))} observations\nTotal datapoints: {round(n_darts_dp/1_000_000, 2)} million\n")
