# Chronos Model: 5-Minute and 15-Minute Data Predictions

This notebook uses the Chronos Bolt model to generate predictions for stock datasets with 5-minute and 15-minute intervals.

In [None]:
import os
import numpy as np
import pandas as pd
import torch
import matplotlib.pyplot as plt
from chronos import ChronosBoltPipeline
from sklearn.metrics import mean_absolute_error, mean_squared_error

device = torch.device("cpu")

  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# Initialize the ChronosBolt model
pipeline = ChronosBoltPipeline.from_pretrained(
    "amazon/chronos-bolt-base",
    device_map="auto",  #
    torch_dtype=torch.bfloat16,
)

In [None]:
# Get the context window size from the model configuration
context_length = pipeline.model.config.n_positions
print(f"Context window: {context_length} time steps")

forecast_horizon = 128  # Same as timesfm

Context window: 512 time steps


In [None]:
# reusable plotting
def plot_forecast(dates, actual_values, forecast_values, title, save_path=None):
    plt.figure(figsize=(18, 6))
    
    # Plot actual values as a continuous line
    plt.plot(dates, actual_values, color='green', label='Actual', marker='o', markersize=2)
    
    # Plot forecast values with markers
    plt.plot(dates, forecast_values, color='red', linestyle='--', marker='o', markersize=2, label='Predicted')
    
    plt.title(title, fontsize=14)
    plt.xlabel('Date', fontsize=10)
    plt.ylabel('Price ($)', fontsize=10)
    plt.xticks(rotation=45)
    plt.legend(frameon=True, shadow=True)
    plt.grid(True, linestyle='--', alpha=0.6)
    plt.tight_layout()
    
    # Save the plot if a path is provided
    if save_path:
        plt.savefig(save_path, dpi=300)
    
    plt.show()

In [None]:
# Function to process a single dataset
def process_dataset(file_path, time_interval):
    # Extract ticker symbol from file name
    ticker = os.path.basename(file_path).split('_')[0]
    print(f'Processing {ticker} dataset with {time_interval} interval...')
    
    # Load the dataset
    df = pd.read_csv(file_path)
    df['Datetime'] = pd.to_datetime(df['Datetime'])
    
    # Convert to tensor format for Chronos
    input_df = pd.DataFrame({
        'unique_id': [1] * len(df),
        'ds': df['Datetime'], 
        'y': df['Close']
    })
    
    # convert values to tensor
    values_tensor = torch.tensor(input_df['y'].values, dtype=torch.float32)
    
    # config
    max_start = len(values_tensor) - context_length - forecast_horizon
    
    if max_start < 0:
        print(f'Warning: {ticker} dataset too small for forecasting with current window sizes')
        return
    
    # define the starting points for each backtesting window
    backtest_starts = list(range(0, max_start + 1, forecast_horizon))
    
    # create results directory for this ticker if it doesn't exist
    ticker_results_dir = os.path.join(os.getcwd(), "..", "results", f'chronos_{time_interval}_{ticker}')
    os.makedirs(ticker_results_dir, exist_ok=True)
    
    # Loop through each backtesting window
    for idx, start_idx in enumerate(backtest_starts):
        print(f'Processing window {idx+1}/{len(backtest_starts)}...')
        
        # Extract context window for this backtesting iteration
        context_end = start_idx + context_length
        context_window = values_tensor[start_idx:context_end]
        
        # Make prediction for the forecast horizon
        try:
            quantiles, mean_forecast = pipeline.predict_quantiles(
                context=context_window,
                prediction_length=forecast_horizon,
                quantile_levels=[0.1, 0.5, 0.9],
            )
            
            # Convert predictions to numpy for easier handling
            forecast_values = mean_forecast.squeeze().cpu().numpy()
            
            # Get actual values for this forecast window
            actual_start = context_end
            actual_end = actual_start + forecast_horizon
            actual_values = values_tensor[actual_start:actual_end].cpu().numpy()
            
            # Get the corresponding dates
            forecast_dates = input_df['ds'].iloc[actual_start:actual_end]
            
            # Calculate metrics
            mae = mean_absolute_error(actual_values, forecast_values)
            mse = mean_squared_error(actual_values, forecast_values)
            rmse = np.sqrt(mse)
            
            # Plot the forecast
            title = f'{ticker} ({time_interval}) - Window {idx+1} Forecast (MAE: {mae:.4f}, RMSE: {rmse:.4f})'
            plot_path = os.path.join(ticker_results_dir, f'{ticker}_window_{idx+1}.png')
            plot_forecast(forecast_dates, actual_values, forecast_values, title, plot_path)
            
            # Save the results to CSV
            results_df = pd.DataFrame({
                'date': forecast_dates,
                'actual': actual_values,
                'forecast': forecast_values
            })
            csv_path = os.path.join(ticker_results_dir, f'{ticker}_window_{idx+1}.csv')
            results_df.to_csv(csv_path, index=False)
            
            print(f'  MAE: {mae:.4f}, RMSE: {rmse:.4f}')
            
        except Exception as e:
            print(f'Error processing window {idx+1}: {str(e)}')
    
    print(f'Finished processing {ticker} dataset.')

## Process 5-Minute Data

In [None]:
# Define 5M data directory
data_dir_5m = os.path.join(os.getcwd(), "..", "data", "5M")

# Get list of all CSV files in the 5M directory
csv_files_5m = [os.path.join(data_dir_5m, f) for f in os.listdir(data_dir_5m) if f.endswith('.csv')]

if not csv_files_5m:
    print(f"No CSV files found in {data_dir_5m}")
else:
    print(f"Found {len(csv_files_5m)} CSV files in 5M directory:")
    for file in csv_files_5m:
        print(f"- {os.path.basename(file)}")

FileNotFoundError: [WinError 3] The system cannot find the path specified: 'c:\\Users\\micha\\code\\finance\\diploma\\notebooks\\data\\5M'

## Process 15-Minute Data

In [None]:
# Define 15M data directory
data_dir_15m = os.path.join(os.getcwd(), "..", "data", "15M")

# Get list of all CSV files in the 15M directory
csv_files_15m = [os.path.join(data_dir_15m, f) for f in os.listdir(data_dir_15m) if f.endswith('.csv')]

if not csv_files_15m:
    print(f"No CSV files found in {data_dir_15m}")
else:
    print(f"Found {len(csv_files_15m)} CSV files in 15M directory:")
    for file in csv_files_15m:
        print(f"- {os.path.basename(file)}")

Found 11 CSV files in 15M directory:
- INTC_15M.csv
- IONQ_15M.csv
- MSTR_15M.csv
- MU_15M.csv
- NVDA_15M.csv
- QBTS_15M.csv
- RGTI_15M.csv
- SMCI_15M.csv
- SRPT_15M.csv
- TSLA_15M.csv
- VKTX_15M.csv


## Process All Tickers (Except NVDA) for Both Intervals

In [None]:
# Process all tickers except NVDA for both intervals
def process_all_tickers(interval_dir, interval_name):
    csv_files = [os.path.join(interval_dir, f) for f in os.listdir(interval_dir) if f.endswith('.csv')]
    for file in csv_files:
        if 'NVDA' not in file:
            print(f'\nProcessing {os.path.basename(file)}...')
            process_dataset(file, interval_name)

print('Processing 5-minute data...')
process_all_tickers(data_dir_5m, '5M')

print('\nProcessing 15-minute data...')
process_all_tickers(data_dir_15m, '15M')

Processing 5-minute data...


NameError: name 'data_dir_5m' is not defined

In [None]:
# Example: Compare NVDA across different time intervals
# Note: This will only work after you've run predictions for all intervals
# compare_ticker_across_intervals('NVDA')