<a href="https://colab.research.google.com/github/superjoe96/LSTM/blob/main/tcn_odl.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from datetime import datetime, timedelta
import tensorflow as tf
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Input, Dense, Conv1D, Dropout, LayerNormalization, Activation
from tensorflow.keras.layers import Add, Lambda
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.callbacks import EarlyStopping, ModelCheckpoint, ReduceLROnPlateau
from sklearn.preprocessing import StandardScaler, MinMaxScaler, LabelEncoder
from sklearn.metrics import mean_squared_error, mean_absolute_error, r2_score
import warnings
warnings.filterwarnings('ignore')

# Set random seeds for reproducibility
np.random.seed(42)
tf.random.set_seed(42)

In [3]:
df = pd.read_csv('personal_transactions_10000.csv')

In [4]:
df['Date'] = pd.to_datetime(df['Date'])

# Sort by date
df = df.sort_values('Date')

# Create additional temporal features
df['Year'] = df['Date'].dt.year
df['Month'] = df['Date'].dt.month
df['Day'] = df['Date'].dt.day
df['DayOfWeek'] = df['Date'].dt.dayofweek
df['WeekOfYear'] = df['Date'].dt.isocalendar().week

# Convert transaction type to numeric (1 for Credit, -1 for Debit)
df['TransactionValue'] = np.where(df['Transaction Type'] == 'credit', 1, -1)

# Encode categorical variables
label_encoders = {}
categorical_cols = ['Category', 'Account Name']

for col in categorical_cols:
    le = LabelEncoder()
    df[f'{col}_Encoded'] = le.fit_transform(df[col])
    label_encoders[col] = le

# Aggregate data by week for time series forecasting
weekly_data = df.groupby(pd.Grouper(key='Date', freq='W')).agg({
    'Amount': 'sum',
    'Transaction Type': 'count'  # Count of transactions per week
}).rename(columns={'Transaction Type': 'TransactionCount'})

date_range = pd.date_range(start=weekly_data.index.min(), end=weekly_data.index.max(), freq='W')
weekly_data = weekly_data.reindex(date_range, fill_value=0)

In [16]:
for i in range(1, 9):  # Create 8 lag features
    weekly_data[f'Amount_Lag_{i}'] = weekly_data['Amount'].shift(i)

# Create rolling statistics
weekly_data['Amount_RollingMean_4W'] = weekly_data['Amount'].rolling(window=4).mean()
weekly_data['Amount_RollingStd_4W'] = weekly_data['Amount'].rolling(window=4).std()

# Add month and quarter as cyclical features
weekly_data['Month'] = weekly_data.index.month
weekly_data['Quarter'] = weekly_data.index.quarter

# Create month and quarter as cyclical features
weekly_data['Month_sin'] = np.sin(2 * np.pi * weekly_data['Month'] / 12)
weekly_data['Month_cos'] = np.cos(2 * np.pi * weekly_data['Month'] / 12)
weekly_data['Quarter_sin'] = np.sin(2 * np.pi * weekly_data['Quarter'] / 4)
weekly_data['Quarter_cos'] = np.cos(2 * np.pi * weekly_data['Quarter'] / 4)

# Remove rows with NaN values (from the lag and rolling features)
weekly_data = weekly_data.dropna()


In [17]:
# Define the features and target
features = [f'Amount_Lag_{i}' for i in range(1, 9)] + \
           ['Amount_RollingMean_4W', 'Amount_RollingStd_4W',
            'Month_sin', 'Month_cos', 'Quarter_sin', 'Quarter_cos']

target = 'Amount'

# Split the data into training and testing sets (keeping the time series order)
train_size = int(len(weekly_data) * 0.7)
val_size = int(len(weekly_data) * 0.15)

train_data = weekly_data.iloc[:train_size]
val_data = weekly_data.iloc[train_size:train_size+val_size]
test_data = weekly_data.iloc[train_size+val_size:]

In [18]:
# Scale the features
scaler_X = StandardScaler()
scaler_y = StandardScaler()

X_train = scaler_X.fit_transform(train_data[features])
y_train = scaler_y.fit_transform(train_data[[target]])

X_val = scaler_X.transform(val_data[features])
y_val = scaler_y.transform(val_data[[target]])

X_test = scaler_X.transform(test_data[features])
y_test = scaler_y.transform(test_data[[target]])

# Reshape input data for the TCN model [samples, timesteps, features]
# For TCN, we reshape the input to have a single timestep with all features
X_train_reshaped = X_train.reshape(X_train.shape[0], 1, X_train.shape[1])
X_val_reshaped = X_val.reshape(X_val.shape[0], 1, X_val.shape[1])
X_test_reshaped = X_test.reshape(X_test.shape[0], 1, X_test.shape[1])

In [21]:
def residual_block(x, dilation_rate, nb_filters, kernel_size, padding, dropout_rate=0.1):
    """
    Defines the residual block for the TCN
    """
    prev_x = x

    # First dilated convolution
    x = Conv1D(filters=nb_filters,
               kernel_size=kernel_size,
               dilation_rate=dilation_rate,
               padding='causal',
               activation='linear')(x)
    x = LayerNormalization()(x)
    x = Activation('relu')(x)
    x = Dropout(dropout_rate)(x)

    # Second dilated convolution
    x = Conv1D(filters=nb_filters,
               kernel_size=kernel_size,
               dilation_rate=dilation_rate,
               padding='causal',
               activation='linear')(x)
    x = LayerNormalization()(x)
    x = Activation('relu')(x)
    x = Dropout(dropout_rate)(x)

    # If the number of filters changes, use a 1x1 conv to match dimensions
    if prev_x.shape[-1] != nb_filters:
        prev_x = Conv1D(filters=nb_filters, kernel_size=1, padding='same')(prev_x)

    # Add the residual connection
    res = Add()([prev_x, x])
    return res

def create_tcn_model(input_shape, output_units=1):
    """
    Creates an enhanced TCN model for time series prediction to address underfitting
    """
    # Enhanced hyperparameters
    nb_filters = 128  # Increased from 64 to 128 for more capacity
    kernel_size = 3   # Increased from 2 to 3 for wider receptive field
    nb_stacks = 2     # Increased from 1 to 2 for more depth
    dilations = [1, 2, 4, 8, 16, 32]  # Added higher dilation rate for longer dependencies
    dropout_rate = 0.1  # Reduced from 0.2 to prevent too much regularization when underfitting

    # Input layer
    inputs = Input(shape=input_shape)

    # TCN architecture with multiple stacks
    x = inputs
    for stack in range(nb_stacks):
        for dilation_rate in dilations:
            x = residual_block(x, dilation_rate, nb_filters, kernel_size, 'causal', dropout_rate)

    # Improved output layers with additional processing
    x = Lambda(lambda z: z[:, -1, :])(x)  # Extract last time step
    x = Dense(64, activation='relu')(x)    # Additional dense layer for better representation
    outputs = Dense(output_units)(x)

    # Create model
    model = Model(inputs, outputs)

    # Compile with adjusted learning rate
    model.compile(
        optimizer=Adam(learning_rate=0.0005),  # Reduced learning rate for more stable training
        loss='mse',
        metrics=['mae']
    )

    return model

# Create and compile the TCN model
input_shape = (X_train_reshaped.shape[1], X_train_reshaped.shape[2])  # (timesteps, features)
tcn_model = create_tcn_model(input_shape)

In [None]:
# 5. Train the TCN Model
# Define callbacks for training
callbacks = [
    EarlyStopping(monitor='val_loss', patience=20, restore_best_weights=True),
    ReduceLROnPlateau(monitor='val_loss', factor=0.2, patience=10, min_lr=0.0001),
    ModelCheckpoint('tcn_cashflow_model.h5', save_best_only=True, monitor='val_loss')
]

# Re-reshape inputs to use multiple timesteps which may help the TCN learn patterns better
# Instead of single timestep with all features, let's try using actual sequential data
# Reshape to [samples, sequence_length, features_per_step]
sequence_length = 8  # Using 8 weeks of historical data

def create_sequences(X, y, seq_length):
    """Convert feature matrix to sequences for better TCN training"""
    X_seq, y_seq = [], []
    for i in range(len(X) - seq_length + 1):
        X_seq.append(X[i:i+seq_length])
        y_seq.append(y[i+seq_length-1])
    return np.array(X_seq), np.array(y_seq)

# Extract the original features before reshaping
X_train_original = X_train.copy()
X_val_original = X_val.copy()
X_test_original = X_test.copy()

# Create sequences for training
X_train_seq, y_train_seq = create_sequences(X_train_original, y_train, sequence_length)
X_val_seq, y_val_seq = create_sequences(X_val_original, y_val, sequence_length)
X_test_seq, y_test_seq = create_sequences(X_test_original, y_test, sequence_length)

input_shape = (X_train_seq.shape[1], X_train_seq.shape[2])
tcn_model = create_tcn_model(input_shape)

class_weight = None

# Train the model
history = tcn_model.fit(
    X_train_seq,
    y_train_seq,
    epochs=100,  # Increased epochs to allow more time to learn
    batch_size=32,  # Reduced batch size for better gradient estimates
    validation_data=(X_val_seq, y_val_seq),
    callbacks=callbacks,
    verbose=1,
    class_weight=class_weight,
    shuffle=True  # Enable shuffling to prevent order-based biases
)

In [None]:
# Plot training history
plt.figure(figsize=(12, 5))

plt.subplot(1, 2, 1)
plt.plot(history.history['loss'], label='Train Loss')
plt.plot(history.history['val_loss'], label='Validation Loss')
plt.title('Model Loss')
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.legend()
plt.grid(True, alpha=0.3)

plt.subplot(1, 2, 2)
plt.plot(history.history['mae'], label='Train MAE')
plt.plot(history.history['val_mae'], label='Validation MAE')
plt.title('Model MAE')
plt.xlabel('Epochs')
plt.ylabel('MAE')
plt.legend()
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()

In [None]:
# 6. Evaluate the Enhanced TCN Model
# Make predictions on test set
y_pred = tcn_model.predict(X_test_seq)

# Inverse transform predictions and actual values to original scale
y_pred_original = scaler_y.inverse_transform(y_pred)
y_test_original = scaler_y.inverse_transform(y_test_seq)

# Calculate metrics
mse = mean_squared_error(y_test_original, y_pred_original)
rmse = np.sqrt(mse)
mae = mean_absolute_error(y_test_original, y_pred_original)
r2 = r2_score(y_test_original, y_pred_original)

print("Model Evaluation Metrics:")
print(f"MSE: {mse:.2f}")
print(f"RMSE: {rmse:.2f}")
print(f"MAE: {mae:.2f}")
print(f"R²: {r2:.2f}")

In [None]:
# Calculate percentage improvement compared to baseline
# (Assuming a naive baseline that just predicts the previous value)
y_naive = y_test_seq[:-1]  # Previous values as prediction
y_actual = y_test_seq[1:]  # Actual values to compare against

naive_mae = mean_absolute_error(scaler_y.inverse_transform(y_actual),
                              scaler_y.inverse_transform(y_naive))
improvement = (naive_mae - mae) / naive_mae * 100

print(f"Improvement over naive baseline: {improvement:.2f}%")

# Get the actual dates for the test set (adjusted for sequence length)
test_dates = test_data.index[sequence_length-1:]
if len(test_dates) > len(y_test_original):
    test_dates = test_dates[:len(y_test_original)]
elif len(test_dates) < len(y_test_original):
    # This shouldn't happen, but just in case
    y_test_original = y_test_original[:len(test_dates)]
    y_pred_original = y_pred_original[:len(test_dates)]

# Plot actual vs. predicted values with improved visualization
plt.figure(figsize=(14, 7))
plt.plot(test_dates, y_test_original, label='Actual', marker='o', alpha=0.7, markersize=4)
plt.plot(test_dates, y_pred_original, label='Predicted', marker='x', alpha=0.7, markersize=4)
plt.title('Enhanced TCN Model: Actual vs. Predicted Weekly Cash Flow')
plt.xlabel('Date')
plt.ylabel('Amount')
plt.legend()
plt.grid(True, alpha=0.3)
plt.xticks(rotation=45)

# Add shaded area for prediction confidence interval
plt.fill_between(test_dates,
                y_pred_original.flatten() - mae,
                y_pred_original.flatten() + mae,
                alpha=0.2, color='blue',
                label='MAE Confidence')

plt.tight_layout()
plt.show()

In [None]:
# Calculate and plot error distribution with additional analysis
errors = y_test_original - y_pred_original

plt.figure(figsize=(14, 10))

plt.subplot(2, 2, 1)
plt.hist(errors, bins=20, alpha=0.7, color='blue', edgecolor='black')
plt.title('Error Distribution')
plt.xlabel('Prediction Error')
plt.ylabel('Frequency')
plt.grid(True, alpha=0.3)
plt.axvline(x=0, color='red', linestyle='--', alpha=0.7)

plt.subplot(2, 2, 2)
plt.scatter(y_test_original, y_pred_original, alpha=0.5, color='blue')
plt.plot([y_test_original.min(), y_test_original.max()],
         [y_test_original.min(), y_test_original.max()],
         'r--', alpha=0.7)
plt.title('Actual vs. Predicted')
plt.xlabel('Actual Values')
plt.ylabel('Predicted Values')
plt.grid(True, alpha=0.3)

plt.subplot(2, 2, 3)
plt.plot(test_dates, errors, color='green', marker='o', linestyle='-', alpha=0.6, markersize=3)
plt.title('Error Over Time')
plt.xlabel('Date')
plt.ylabel('Error (Actual - Predicted)')
plt.grid(True, alpha=0.3)
plt.axhline(y=0, color='red', linestyle='--', alpha=0.7)
plt.xticks(rotation=45)

plt.subplot(2, 2, 4)
from scipy import stats
stats.probplot(errors.flatten(), plot=plt)
plt.title('Q-Q Plot of Errors')
plt.grid(True, alpha=0.3)

plt.tight_layout()
plt.show()