In [1]:
import sys
import os
from pathlib import Path
# Get the current working directory (where the notebook/script is running)
current_dir = Path(os.getcwd())
# Navigate to the 'main' folder (adjust if needed)
main_dir = str(current_dir.parent)  # If notebook is inside 'main'
# OR if notebook is outside 'main':
# main_dir = str(current_dir / "main")  # Assumes 'main' is a subfolder
sys.path.append(main_dir)
# import DB
from Data.db_cloud import Database
db = Database()

Connected to sqlite cloud database


In [5]:
db.close()

Closed connection to database


In [None]:

import pandas as pd

from sklearn.model_selection import train_test_split

# import the split lib to split the data into train and test
from sklearn.model_selection import train_test_split

from pip_pattern_miner import Pattern_Miner


# Stocks 

In [3]:
companies = {
    1: "GOLD (XAUUSD)",
    2: "BTC (BTCUSD)",
    3: "APPL (AAPL)",
    4: "Amazon (AMZN)",
    5: "NVIDIA (NVDA)",
}

time_frames = {
    1: 15,  # minutes
    2: 60,  # 1 hour
}

# Fetch Stock Data And Store In DB

In [None]:

def process_stock_data(stock_id, symbol, time_frame):
    try:
        # Extract the ticker symbol from the format "Name (TICKER)"
        ticker = symbol.split('(')[-1].replace(')', '').strip()
        
        # Construct file path
        file_path = f"../Data/Stocks/{ticker}{time_frame}.csv"
        
        # Read and process the data
        df_original = pd.read_csv(file_path)
        
        # create the datetime index
        df_original['Date'] = pd.to_datetime(df_original['Date'] + ' ' + df_original['Time'])
        df_original['Date'] = df_original['Date'].astype('datetime64[s]')
        df_original = df_original.set_index('Date')
        df_original = df_original.drop(columns=['Time'])
        
        df_original = df_original.dropna()
        df_original = df_original.loc['2019-01-01':]
        
        # Store the data
        db.store_stock_data(df_original, stock_id, ticker, time_frame)
        
        return True
        
    except Exception as e:
        print(f"Error processing {symbol} (ID: {stock_id}): {str(e)}")
        return False

# Process all companies
for stock_id, symbol in companies.items():
    time_frame_15 = time_frames[1]
    time_frame_60 = time_frames[2]
    process_stock_data(stock_id, symbol, time_frame_15)
    process_stock_data(stock_id, symbol, time_frame_60)

print("All stocks processed.")

KeyboardInterrupt: 

# Pattern Mining

In [None]:

def perform_pattern_mining_for_all_stocks(n_pips=5, lookback=24, hold_period=6, returns_hold_period=12, time_frame=60):
  
    for stock_id, symbol in companies.items():
        try:
            print(f"\nProcessing {symbol} (ID: {stock_id}) with timeframe {time_frame}...")
            
            # Fetch stock data from database
            df = db.get_stock_data(stock_id, time_frame)
            
            if df.empty:
                print(f"No data found for {symbol} (ID: {stock_id})")
                continue
                
            # Prepare the close prices array
            arr = df['ClosePrice'].to_numpy()
            
            # Split data into train and test (optional)
            train, test = train_test_split(arr, test_size=0.2, shuffle=False)
            print(f"Data split - Train: {train.shape}, Test: {test.shape}")
            
            # Create and train the pip miner
            pip_miner = Pattern_Miner(n_pips, lookback, hold_period, returns_hold_period)
            pip_miner.train(train)
            
            # Store the patterns and clusters
            db.pip_pattern_miner = pip_miner
            db.store_pattern_data(stock_id, pip_miner)
            db.store_cluster_data(stock_id , pip_miner)
            db.bind_pattern_cluster(stock_id , pip_miner)
            db.update_all_cluster_probability_score(stock_id,pip_miner)
                
            print(f"Successfully processed patterns for {symbol} (ID: {stock_id}) with timeframe {time_frame}.")
            
        except Exception as e:
            print(f"Error processing {symbol} (ID: {stock_id}): {str(e)}")
    
    db.close()
    print("\nPattern mining completed for all stocks.")

# Usage:
perform_pattern_mining_for_all_stocks(5, 24, 6, 12)


Processing GOLD (XAUUSD) (ID: 1) with timeframe 60...
Data split - Train: (29693,), Test: (7424,)
Successfully processed patterns for GOLD (XAUUSD) (ID: 1) with timeframe 60.

Processing BTC (BTCUSD) (ID: 2) with timeframe 60...
Data split - Train: (38004,), Test: (9502,)
Successfully processed patterns for BTC (BTCUSD) (ID: 2) with timeframe 60.

Processing APPL (AAPL) (ID: 3) with timeframe 60...
Data split - Train: (8804,), Test: (2202,)
Successfully processed patterns for APPL (AAPL) (ID: 3) with timeframe 60.

Processing Amazon (AMZN) (ID: 4) with timeframe 60...
Data split - Train: (8804,), Test: (2202,)
Successfully processed patterns for Amazon (AMZN) (ID: 4) with timeframe 60.

Processing NVIDIA (NVDA) (ID: 5) with timeframe 60...
Data split - Train: (8804,), Test: (2202,)
Successfully processed patterns for NVIDIA (NVDA) (ID: 5) with timeframe 60.
Closed connection to database: ../Data/data.db

Pattern mining completed for all stocks.


# Get Train & Test Dates

In [5]:
# determine the test and train data dates 
def get_test_train_dates(stock_id, time_frame):
    try:
        # Fetch stock data from database
        df = db.get_stock_data(stock_id, time_frame)
        
        if df.empty:
            print(f"No data found for stock ID: {stock_id}")
            return None, None
        
        # Get the first and last dates of the dataset
        start_date = df.index[0]
        end_date = df.index[-1]
        
        # Split the data into train and test sets
        train_start_date = start_date
        train_end_date = start_date + pd.DateOffset(days=int((end_date - start_date).days * 0.8))
        test_start_date = train_end_date + pd.DateOffset(days=1)
        test_end_date = end_date
        
        return (train_start_date, train_end_date), (test_start_date, test_end_date)
        
    except Exception as e:
        print(f"Error fetching dates for stock ID {stock_id}: {str(e)}")
        return None, None
    
train , test = get_test_train_dates(1, 60)
print(f"Train Dates: {train}")
print(f"Test Dates: {test}")

Train Dates: (Timestamp('2019-01-02 01:00:00'), Timestamp('2024-01-08 01:00:00'))
Test Dates: (Timestamp('2024-01-09 01:00:00'), Timestamp('2025-04-10 23:00:00'))
