In [1]:
!pip install tf2onnx


Collecting tf2onnx
  Downloading tf2onnx-1.16.1-py3-none-any.whl.metadata (1.3 kB)
Downloading tf2onnx-1.16.1-py3-none-any.whl (455 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m455.8/455.8 kB[0m [31m13.9 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: tf2onnx
Successfully installed tf2onnx-1.16.1


In [2]:
import pandas as py
import numpy as np
import os
import tensorflow as tf
import onnx 
import tf2onnx
from sklearn.model_selection import train_test_split
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense, concatenate
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from sklearn.ensemble import RandomForestClassifier 
from sklearn.metrics import accuracy_score
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from sklearn.model_selection import train_test_split

In [3]:
class TradeDataLoader:
    def __init__(self, file_path="/kaggle/input/real-real-trading-dataset/Trading data.xlsx"):
        self.file_path = file_path
        self.data = None

    def load_data(self):
        self.data = py.read_excel(self.file_path)
        self.data.drop(columns=['BOS', 'LOWER TF', 'Date'], inplace=True, errors='ignore')
        print(f"Data loaded from {self.file_path}")
        print("Original Tabular Data Shape:", self.data.shape)
        print(self.data.head(5))

    def preprocess_data(self):
        if self.data is not None:
            print("Step 1 - Original Data Shape:", self.data.shape)
            print(self.data.head())  # Print first few rows
    
        
            print("Step 2 - Unique TP/SL values before cleaning:", self.data['TP/SL'].unique())
            self.data['TP/SL'] = self.data['TP/SL'].str.replace('n', '-', regex=False)
            self.data['TP/SL'] = self.data['TP/SL'].str.split('-').str[0].str.strip()
            self.data['Price at 800 EMA'] = self.data['Price at 800 EMA'].str.split('-').str[0].str.strip()
            print("Step 2 - Unique TP/SL values after cleaning:", self.data['TP/SL'].unique())
            print("Step 2 - Unique Price at 800 EMA values after cleaning:", self.data['Price at 800 EMA'].unique())
    
            
            self.data = py.get_dummies(self.data, columns=['Timeframe', 'Trade Type', 'TP/SL', 'Price at 800 EMA', 'Hit 50EMA'])
    
            print("Step 3 - Shape After Encoding:", self.data.shape)
            print("Step 3 - Columns After Encoding:", self.data.columns.tolist())
    
            
            bool_columns = self.data.select_dtypes(include=[bool]).columns.tolist()
            self.data[bool_columns] = self.data[bool_columns].astype(int)
    
            numeric_columns = self.data.select_dtypes(include=[np.number]).columns.tolist()
            print(f"Step 4 - Selected Numeric Columns: {numeric_columns}")
    
            if not numeric_columns:
                print("ERROR: No numeric features found! Possible issue with encoding or missing numeric columns.")
            else:
                self.data[numeric_columns] = self.data[numeric_columns].astype(np.float32)
    
            print("Step 5 - Final Processed Data Shape:", self.data.shape)
            print("Step 5 - Processed Columns:", self.data.columns.tolist())
            print(self.data.head())  # Print processed data to verify
        else:
            raise Exception("Data not loaded")
    def get_data(self):
        return self.data


In [4]:
tabular_loader = TradeDataLoader()
tabular_loader.load_data()
tabular_loader.preprocess_data()
tabular_data = tabular_loader.get_data()
print(tabular_loader.get_data().head())  

Data loaded from /kaggle/input/real-real-trading-dataset/Trading data.xlsx
Original Tabular Data Shape: (100, 7)
  Pair No Timeframe Number of Cs Price at 800 EMA Trade Type Hit 50EMA  \
0     EA1        D1          5-6              YES       SELL       YES   
1     EA2        D1          6-7               NO       SELL        NO   
2     EA3        D1          6-7               NO       SELL        NO   
3     EA4        D1          4-5        NO-AT 200        BUY       YES   
4     EA5        D1            3               NO       SELL        NO   

                                       TP/SL  
0          TP n SL could have been too close  
1            SL- another leg top tho the LHS  
2                                         SL  
3  TP-trend was bearsh for a very short time  
4                                         SL  
Step 1 - Original Data Shape: (100, 7)
  Pair No Timeframe Number of Cs Price at 800 EMA Trade Type Hit 50EMA  \
0     EA1        D1          5-6              Y

In [5]:
class ImageDataLoader:
    def __init__(self, image_directory):
        if not os.path.exists(image_directory):
            raise FileNotFoundError(f"Image directory {image_directory} not found!")

        self.image_directory = image_directory
        self.image_data = {}

    def load_images(self):
        for file_name in os.listdir(self.image_directory):
            if file_name.lower().endswith(".png") or file_name.lower().endswith(".jpg"):
                pair_number = file_name.split('.')[0]  # Extract "GU1" from "GU1.png"
                image_path = os.path.join(self.image_directory, file_name)

                image = load_img(image_path, target_size=(256, 256), color_mode="grayscale")
                self.image_data[pair_number] = img_to_array(image)  # Save image data

        print(f"Loaded {len(self.image_data)} images from {self.image_directory}")
        print("Loaded image filenames:", list(image_loader.image_data.keys()))


    def get_image_data(self):
        return self.image_data


In [6]:
# Load image data
image_loader = ImageDataLoader("/kaggle/input/real-real-trading-dataset/Image data")
image_loader.load_images()
image_data = image_loader.get_image_data()


Loaded 100 images from /kaggle/input/real-real-trading-dataset/Image data
Loaded image filenames: ['GA4', 'AU5', 'AF5', 'GA1', 'GU9', 'EC7', 'UC3', 'EN2', 'G11', 'EA4', 'G10', 'AF4', 'UD4', 'AF2', 'AU9', 'GU1', 'GA10', 'UD1', 'EG5', 'AU7', 'AU6', 'EA8', 'GU6', 'NU2', 'UC1', 'EC3', 'UD3', 'EN1', 'EA5', 'EG4', 'AF6', 'NC6', 'EN3', 'EG7', 'EG3', 'GA6', 'NC8', 'GU4', 'GN1', 'NU3', 'GU5', 'EA3', 'EC6', 'AU10', 'EG1', 'NU1', 'EC2', 'AF7', 'EU8', 'EA2', 'NC7', 'AF3', 'EU6', 'GU2', 'AU2', 'NC1', 'AU11', 'GA3', 'GA7', 'NC4', 'EC10', 'EA6', 'EU5', 'NC3', 'AU13', 'EC5', 'GA9', 'GA2', 'EU4', 'AU8', 'GU8', 'EG6', 'NC2', 'EG2', 'EC9', 'EU2', 'EU1', 'GU7', 'AU4', 'GU3', 'AU1', 'NC5', 'AU3', 'AU12', 'GA5', 'NU4', 'EA7', 'AF1', 'UC2', 'AC1', 'EU3', 'EU7', 'AF8', 'EC4', 'EC8', 'GA8', 'EA1', 'EC11', 'UD2', 'EC1']


In [7]:
class TradeModelTrainer:
    def __init__(self, tabular_data, image_data):
        self.data = tabular_data
        self.image_data = image_data
        self.model = None

    def check_rows_before_splitting(self):
        print("Total Rows Before Split:", len(self.data))

    def preprocess_data(self):
        images = list(self.image_data.values())
        self.image_data = np.array(images).reshape(-1, 256, 256, 1)
    
        print("DEBUG: All Columns Before Selecting Numerics:", self.data.columns.tolist())
    
        numeric_columns = self.data.select_dtypes(include=['number']).columns.tolist()
        print("DEBUG: Numeric Columns Selected:", numeric_columns)
    
        if not numeric_columns:
            raise ValueError("ERROR: No numeric features found! Possible issue with encoding.")
    
        self.tabular_data_features = self.data[numeric_columns].astype(np.float32)
        print("DEBUG: Tabular Data Features Shape Before Split:", self.tabular_data_features.shape)
    
        self.buy_sell_labels = self.data['Trade Type_BUY'].astype(np.float32).values
        self.tp_sl_labels = self.data[[col for col in self.data.columns if col.startswith('TP/SL_')]].astype(np.float32).values
    
       
        print("DEBUG: Buy/Sell Labels Shape:", self.buy_sell_labels.shape)
        print("DEBUG: TP/SL Labels Shape:", self.tp_sl_labels.shape)
    
        self.train_features, self.test_features, self.train_images, self.test_images, \
        self.train_buy_sell, self.test_buy_sell, self.train_tp_sl, self.test_tp_sl = train_test_split(
            self.tabular_data_features, self.image_data, self.buy_sell_labels, self.tp_sl_labels,
            test_size=0.2, random_state=42
        )
    
        print("DEBUG: Train Features Shape:", self.train_features.shape)
        print("DEBUG: Test Features Shape:", self.test_features.shape)
        print("Data preprocessed and split into train and test sets")

    def build_model(self):
        image_input = Input(shape=(256, 256, 1), name='image_input')
        x = Conv2D(32, (3, 3), activation='relu')(image_input)
        x = MaxPooling2D((2, 2))(x)
        x = Flatten()(x)  

        tabular_input = Input(shape=(len(self.train_features.columns),), name='tabular_input')
        y = Dense(64, activation='relu')(tabular_input)

        combined = concatenate([x, y])
        z = Dense(64, activation='relu')(combined)

        buy_sell_output = Dense(1, activation='sigmoid', name='buy_sell_output')(z)
        tp_sl_output = Dense(len(self.tp_sl_labels[0]), activation='softmax', name='tp_sl_output')(z)

        self.model = Model(inputs=[image_input, tabular_input], outputs=[buy_sell_output, tp_sl_output])
        self.model.compile(
            optimizer=Adam(),
            loss={'buy_sell_output': 'binary_crossentropy', 'tp_sl_output': 'categorical_crossentropy'},
            metrics={'buy_sell_output': 'accuracy', 'tp_sl_output': 'accuracy'}
        )

        print("Model built successfully")
        print("Tabular Data Shape:", self.train_features.shape)

    def train_model(self):
        self.model.fit(
            [self.train_images, self.train_features],
            {'buy_sell_output': self.train_buy_sell, 'tp_sl_output': self.train_tp_sl},
            epochs=10,
            batch_size=16,
        )

        print("Model trained")

    def test_model(self):
        results = self.model.evaluate(
            [self.test_images, self.test_features],
            {'buy_sell_output': self.test_buy_sell, 'tp_sl_output': self.test_tp_sl}
        )
    
        print(f"Test Results: {results}")  


In [8]:
trainer = TradeModelTrainer(tabular_data, image_data)
trainer.preprocess_data()
trainer.build_model()
trainer.train_model()
trainer.test_model()
trainer.check_rows_before_splitting()

DEBUG: All Columns Before Selecting Numerics: ['Pair No', 'Number of Cs', 'Timeframe_D1', 'Timeframe_H1', 'Timeframe_H4', 'Timeframe_W1', 'Trade Type_BUY', 'Trade Type_SELL', 'TP/SL_G', 'TP/SL_GU', 'TP/SL_SL', 'TP/SL_TP', 'TP/SL_TRAILING SL', 'Price at 800 EMA_NO', 'Price at 800 EMA_YES', 'Hit 50EMA_NO', 'Hit 50EMA_YES']
DEBUG: Numeric Columns Selected: ['Timeframe_D1', 'Timeframe_H1', 'Timeframe_H4', 'Timeframe_W1', 'Trade Type_BUY', 'Trade Type_SELL', 'TP/SL_G', 'TP/SL_GU', 'TP/SL_SL', 'TP/SL_TP', 'TP/SL_TRAILING SL', 'Price at 800 EMA_NO', 'Price at 800 EMA_YES', 'Hit 50EMA_NO', 'Hit 50EMA_YES']
DEBUG: Tabular Data Features Shape Before Split: (100, 15)
DEBUG: Buy/Sell Labels Shape: (100,)
DEBUG: TP/SL Labels Shape: (100, 5)
DEBUG: Train Features Shape: (80, 15)
DEBUG: Test Features Shape: (20, 15)
Data preprocessed and split into train and test sets
Model built successfully
Tabular Data Shape: (80, 15)
Epoch 1/10
[1m5/5[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m6s[0m 662ms/s

In [9]:
trainer.model.save("/kaggle/working/tradin_model.h5")

In [10]:
print(os.listdir("/kaggle/working/"))

['.virtual_documents', 'tradin_model.h5']


In [11]:
model = tf.keras.models.load_model("/kaggle/working/tradin_model.h5")

fixed_input_shapes = [
    tf.TensorSpec(shape=(None, 256, 256, 1), dtype=tf.float32, name="image_input"),
    tf.TensorSpec(shape=(None, 15), dtype=tf.float32, name="tabular_input")  # Fix input shape for tabular data
]

onnx_model, _ = tf2onnx.convert.from_keras(model, input_signature=fixed_input_shapes, opset=13)


onnx.save_model(onnx_model, "/kaggle/working/tradin_model.onnx")

print(" Model converted and saved as tradin_model.onnx")



 Model converted and saved as tradin_model.onnx


In [12]:
pip install onnxruntime

Collecting onnxruntime
  Downloading onnxruntime-1.22.1-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (4.6 kB)
Collecting coloredlogs (from onnxruntime)
  Downloading coloredlogs-15.0.1-py2.py3-none-any.whl.metadata (12 kB)
Collecting humanfriendly>=9.1 (from coloredlogs->onnxruntime)
  Downloading humanfriendly-10.0-py2.py3-none-any.whl.metadata (9.2 kB)
Downloading onnxruntime-1.22.1-cp310-cp310-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (16.5 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m16.5/16.5 MB[0m [31m96.1 MB/s[0m eta [36m0:00:00[0m:00:01[0m0:01[0m
[?25hDownloading coloredlogs-15.0.1-py2.py3-none-any.whl (46 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m46.0/46.0 kB[0m [31m2.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading humanfriendly-10.0-py2.py3-none-any.whl (86 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m86.8/86.8 kB[0m [31m4.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstall

In [13]:
pip install yfinance

Note: you may need to restart the kernel to use updated packages.


In [14]:
import threading
import onnxruntime as ort
import time
import os
import yfinance as yf
import pandas as pd
import cv2

In [19]:
import threading
import onnxruntime as ort
import time
import os
import yfinance as yf
import pandas as pd
import numpy as np
import cv2

class BotImplementation:
    def __init__(self):
        print("AI Trading Bot Initialized")
        self.session = ort.InferenceSession("/kaggle/working/tradin_model.onnx")
        print("AI Model Loaded Successfully")
        print("Checking model inputs:")
        for input_tensor in self.session.get_inputs():
            print(f"Input Name: {input_tensor.name}, Shape: {input_tensor.shape}, Type: {input_tensor.type}")

    def fetch_historical_data(self, symbol, timeframe="1d", period="1y"):
        try:
            print(f"Fetching historical data for {symbol} ({timeframe}, {period})...")
            
            # Try different variations of BTC symbol
            symbols_to_try = [symbol, "BTC-USD", "BTCUSD", "BTC"]
            df = None
            
            for sym in symbols_to_try:
                try:
                    df = yf.download(sym, period=period, interval=timeframe, progress=False)
                    if not df.empty:
                        print(f"Successfully downloaded data using symbol: {sym}")
                        break
                except:
                    continue
            
            # If Yahoo Finance fails, try Ticker method
            if df is None or df.empty:
                print("Trying Ticker method...")
                try:
                    ticker = yf.Ticker("BTC-USD")
                    df = ticker.history(period=period, interval=timeframe)
                    if not df.empty:
                        print("Ticker method successful!")
                except:
                    pass
                    
            # If still no data, create sample data for testing
            if df is None or df.empty:
                print("Yahoo Finance unavailable. Creating sample BTC data for testing...")
                df = self._create_sample_btc_data()
                if df is not None:
                    print("✅ Using sample data for testing")
                
            if df is None or df.empty:
                print(f"No data available for {symbol}.")
                return None
                
            df.reset_index(inplace=True)
            # Handle both "Date" and "Datetime" column names
            if "Date" in df.columns:
                df.rename(columns={"Date": "time"}, inplace=True)
            elif "Datetime" in df.columns:
                df.rename(columns={"Datetime": "time"}, inplace=True)
                
            df.rename(columns={"Open": "open", "High": "high",
                               "Low": "low", "Close": "close", "Volume": "volume"}, inplace=True)
            print(f"Historical data ready: {df.shape[0]} rows")
            return df
        except Exception as e:
            print(f"Error fetching historical data for {symbol}: {e}")
            return None
    
    def _create_sample_btc_data(self):
        """Create sample BTC data when Yahoo Finance is unavailable"""
        try:
            dates = pd.date_range(end=pd.Timestamp.now(), periods=365, freq='D')
            np.random.seed(42)  # For reproducible results
            
            # Generate realistic BTC price movement
            base_price = 45000
            price_changes = np.random.normal(0, 0.025, 365)  # 2.5% daily volatility
            prices = [base_price]
            
            for change in price_changes[1:]:
                new_price = prices[-1] * (1 + change)
                prices.append(max(new_price, 1000))  # Minimum price floor
            
            # Create OHLCV data
            sample_data = pd.DataFrame({
                'time': dates,
                'open': prices,
                'high': [p * (1 + abs(np.random.normal(0, 0.015))) for p in prices],
                'low': [p * (1 - abs(np.random.normal(0, 0.015))) for p in prices],
                'close': prices,
                'volume': np.random.randint(500000, 5000000, 365)
            })
            
            # Ensure high >= max(open,close) and low <= min(open,close)
            sample_data['high'] = sample_data[['open', 'close', 'high']].max(axis=1)
            sample_data['low'] = sample_data[['open', 'close', 'low']].min(axis=1)
            
            return sample_data
            
        except Exception as e:
            print(f"Sample data creation failed: {e}")
            return None

    def get_dynamic_candlestick_pattern(self, price_data):
        # Use last 7 candles if available, else 3..7
        min_candles, max_candles = 3, 7
        available = len(price_data)
        if available < min_candles:
            return None
        N = min(max_candles, available)
        
        # Extract last N rows [open, high, low, close] => (N,4)
        patch = price_data[["open","high","low","close"]].values[-N:].astype(np.float32)
        
        # Normalize each column to [0,1] range
        patch_min = patch.min(axis=0, keepdims=True)
        patch_max = patch.max(axis=0, keepdims=True)
        # Avoid division by zero
        patch_range = patch_max - patch_min
        patch_range[patch_range == 0] = 1.0
        patch_normalized = (patch - patch_min) / patch_range
        
        # Convert to grayscale by averaging the 4 OHLC values
        # This gives us (N,) array
        grayscale_values = patch_normalized.mean(axis=1)
        
        # Create a simple 2D pattern by tiling the values
        # We'll create a (N, N) matrix first, then resize to 256x256
        pattern_2d = np.tile(grayscale_values.reshape(-1, 1), (1, N))
        
        # If we have fewer than 7 candles, pad to make it square
        if N < 7:
            pad_size = 7 - N
            pattern_2d = np.pad(pattern_2d, ((0, pad_size), (0, pad_size)), mode='edge')
            
        # Now resize to 256x256
        pattern_resized = cv2.resize(pattern_2d, (256, 256), interpolation=cv2.INTER_LINEAR)
        
        # Ensure it's in the right format and add batch and channel dimensions
        image_4d = pattern_resized.astype(np.float32).reshape(1, 256, 256, 1)
        
        return image_4d, N

    def detect_trade_pattern(self, symbol, timeframe):
        price_data = self.fetch_historical_data(symbol, timeframe)
        if price_data is None:
            return None, 0, 0, None, None

        result = self.get_dynamic_candlestick_pattern(price_data)
        if result is None:
            return None, 0, 0, None, None

        image_input, pattern_length = result

        # DO NOT resize again here - image_input is already (1, 256, 256, 1)
        
        # Build tabular vector of size 15
        features = [
            "open","high","low","close","volume",
            "50EMA","200EMA","800EMA",
            "RSI","MACD","MACD_Signal",
            "Bollinger_Upper","Bollinger_Lower",
            "ATR","ADX"
        ]

        # Ensure all 15 exist (fill with 0.0 if missing)
        for f in features:
            if f not in price_data.columns:
                price_data[f] = 0.0

        tabular = price_data.iloc[-1][features].values.astype(np.float32).reshape(1, 15)

        # Run ONNX inference with BOTH inputs
        output_name = self.session.get_outputs()[0].name
        pred = self.session.run(
            [output_name],
            {"image_input": image_input, "tabular_input": tabular}
        )

        # Handle different prediction output shapes
        if isinstance(pred[0], np.ndarray):
            prob = float(pred[0].flatten()[0])
        else:
            prob = float(pred[0])
        trade_type = "BUY" if prob > 0.5 else "SELL"
        entry_price = float(price_data["close"].iloc[-1])
        trade_time = price_data["time"].iloc[-1]

        return trade_type, prob, pattern_length, entry_price, trade_time

    def analyze_historical_patterns(self, symbol="BTC-USD", timeframe="1d", period="1y"):
        print(f"Running AI pattern detection on {symbol} ({timeframe}) for {period} of historical data...\n")
        trade_type, prob, N, entry, ts = self.detect_trade_pattern(symbol, timeframe)
        if trade_type:
            print(f"TRADE ALERT: {symbol} ({timeframe})")
            print(f"    • Trade Type: {trade_type}")
            print(f"    • Probability: {prob:.4f}")
            print(f"    • Entry Price: {entry}")
            print(f"    • Time: {ts}")
            print(f"    • Candles Used: {N}")
        else:
            print(f"No valid trade pattern detected for {symbol} ({timeframe})")

# Expected usage:
bot = BotImplementation()
bot.analyze_historical_patterns("BTC-USD", "1d", "1y")

✅ AI Trading Bot Initialized
✅ AI Model Loaded Successfully
📌 Checking model inputs:
🔹 Input Name: image_input, Shape: ['unk__21', 256, 256, 1], Type: tensor(float)
🔹 Input Name: tabular_input, Shape: ['unk__22', 15], Type: tensor(float)
🧠 Running AI pattern detection on BTC-USD (1d) for 1y of historical data...

🔍 Fetching historical data for BTC-USD (1d, 1y)...
Trying Ticker method...
⚠️ Yahoo Finance unavailable. Creating sample BTC data for testing...
✅ Using sample data for testing
✅ Historical data ready: 365 rows
📈 TRADE ALERT: BTC-USD (1d)
    • Trade Type: BUY
    • Probability: 1.0000
    • Entry Price: 43949.3173326369
    • Time: 2025-08-10 06:47:53.635469
    • Candles Used: 7


In [None]:
pip install yfinance --upgrade