In [10]:
import MetaTrader5 as mt5
import pandas as pd
from datetime import datetime

# Connect to MetaTrader 5
if not mt5.initialize():
    print("MT5 Initialization failed:", mt5.last_error())
    quit()

# Set symbol
symbol = "XAUUSD"

# Ensure symbol is available
if not mt5.symbol_select(symbol, True):
    print(f"Failed to select {symbol}")
    mt5.shutdown()
    quit()

# Get last 1000 1-minute candles
rates = mt5.copy_rates_from_pos(symbol, mt5.TIMEFRAME_M1, 0, 1000)

# Convert to DataFrame
df = pd.DataFrame(rates)
df['time'] = pd.to_datetime(df['time'], unit='s')

# Show last few rows
print(df.tail())

# Save for training
df.to_csv("xauusd_data.csv", index=False)

# Disconnect
mt5.shutdown()


                   time     open     high      low    close  tick_volume  \
995 2025-06-30 13:18:00  3281.16  3282.02  3281.04  3281.42          164   
996 2025-06-30 13:19:00  3281.41  3282.23  3281.28  3282.19          104   
997 2025-06-30 13:20:00  3282.18  3282.46  3280.17  3280.80          161   
998 2025-06-30 13:21:00  3280.80  3281.51  3279.84  3280.22          173   
999 2025-06-30 13:22:00  3280.21  3280.31  3280.11  3280.16           34   

     spread  real_volume  
995      12            0  
996      12            0  
997      12            0  
998      12            0  
999      12            0  


True

In [16]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
import xgboost as xgb
import joblib
import os

# Load Data
df = pd.read_csv("xauusd_data.csv")
df['time'] = pd.to_datetime(df['time'])

# Use only 'close' prices for now
data = df['close'].values.reshape(-1, 1)

# Normalize data
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(data)

# Prepare LSTM sequences
def create_lstm_data(data, window=60):
    X, y = [], []
    for i in range(window, len(data)):
        X.append(data[i-window:i, 0])
        y.append(data[i, 0])
    return np.array(X), np.array(y)

X, y = create_lstm_data(scaled_data)

# Split
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

# Reshape for LSTM: (samples, time_steps, features)
X_train_lstm = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
X_test_lstm = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))

# Build LSTM model
lstm_model = Sequential()
lstm_model.add(LSTM(50, return_sequences=True, input_shape=(X_train.shape[1], 1)))
lstm_model.add(LSTM(50))
lstm_model.add(Dense(1))
lstm_model.compile(optimizer='adam', loss='mse')
lstm_model.fit(X_train_lstm, y_train, epochs=70, batch_size=32)

# Save LSTM model
os.makedirs("models", exist_ok=True)
lstm_model.save("models/lstm_model.h5")

# Use LSTM predictions as feature for XGBoost
lstm_preds = lstm_model.predict(X_train_lstm)
X_train_xgb = np.hstack((X_train, lstm_preds))

lstm_preds_test = lstm_model.predict(X_test_lstm)
X_test_xgb = np.hstack((X_test, lstm_preds_test))

# Train XGBoost
xgb_model = xgb.XGBRegressor(n_estimators=100)
xgb_model.fit(X_train_xgb, y_train)

# Save XGBoost model
xgb_model.save_model("models/xgboost_model.json")
joblib.dump(scaler, "models/scaler.pkl")

print("✅ Models trained and saved!")


Epoch 1/70


  super().__init__(**kwargs)


[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 31ms/step - loss: 0.1682
Epoch 2/70
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 30ms/step - loss: 0.0060
Epoch 3/70
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 31ms/step - loss: 0.0031
Epoch 4/70
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 30ms/step - loss: 0.0029
Epoch 5/70
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 31ms/step - loss: 0.0023
Epoch 6/70
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 31ms/step - loss: 0.0022
Epoch 7/70
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 32ms/step - loss: 0.0019
Epoch 8/70
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 32ms/step - loss: 0.0022
Epoch 9/70
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 34ms/step - loss: 0.0020
Epoch 10/70
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 31ms/step - loss: 0.0019
Epoch 11/7



[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 22ms/step
[1m6/6[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m0s[0m 15ms/step
✅ Models trained and saved!


In [18]:
import pandas as pd
import numpy as np
from sklearn.preprocessing import MinMaxScaler
from sklearn.model_selection import train_test_split
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import LSTM, Dense
import xgboost as xgb
import joblib
import os

# Load dataset
df = pd.read_csv("xauusd_data.csv")
df['time'] = pd.to_datetime(df['time'])

# Focus on 'close' prices only
data = df['close'].values.reshape(-1, 1)

# Normalize close prices
scaler = MinMaxScaler()
scaled_data = scaler.fit_transform(data)

# Create LSTM-compatible sequences
def create_lstm_data(data, window=60):
    X, y = [], []
    for i in range(window, len(data)):
        X.append(data[i-window:i, 0])
        y.append(data[i, 0])
    return np.array(X), np.array(y)

X, y = create_lstm_data(scaled_data)

# Split into train/test
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, shuffle=False)

# Reshape for LSTM input: (samples, time_steps, features)
X_train_lstm = X_train.reshape((X_train.shape[0], X_train.shape[1], 1))
X_test_lstm = X_test.reshape((X_test.shape[0], X_test.shape[1], 1))

# Define LSTM model
lstm_model = Sequential([
    LSTM(50, return_sequences=True, input_shape=(X_train.shape[1], 1)),
    LSTM(50),
    Dense(1)
])
lstm_model.compile(optimizer='adam', loss='mse')

# Train LSTM
lstm_model.fit(X_train_lstm, y_train, epochs=70, batch_size=32)

# Save models directory
os.makedirs("models", exist_ok=True)

# ✅ Save LSTM in native format
lstm_model.save("models/lstm_model.keras")

# LSTM predictions to use in XGBoost
lstm_preds_train = lstm_model.predict(X_train_lstm)
X_train_xgb = np.hstack((X_train, lstm_preds_train))

lstm_preds_test = lstm_model.predict(X_test_lstm)
X_test_xgb = np.hstack((X_test, lstm_preds_test))

# Train XGBoost
xgb_model = xgb.XGBRegressor(n_estimators=100)
xgb_model.fit(X_train_xgb, y_train)

# Save XGBoost model and scaler
xgb_model.save_model("models/xgboost_model.json")
joblib.dump(scaler, "models/scaler.pkl")

print("✅ Models trained and saved successfully!")


Epoch 1/70


  super().__init__(**kwargs)


[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m3s[0m 33ms/step - loss: 0.1399
Epoch 2/70
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 30ms/step - loss: 0.0056
Epoch 3/70
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 31ms/step - loss: 0.0037
Epoch 4/70
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 30ms/step - loss: 0.0026
Epoch 5/70
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 31ms/step - loss: 0.0026
Epoch 6/70
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 34ms/step - loss: 0.0023
Epoch 7/70
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 30ms/step - loss: 0.0023
Epoch 8/70
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 32ms/step - loss: 0.0022
Epoch 9/70
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 31ms/step - loss: 0.0025
Epoch 10/70
[1m24/24[0m [32m━━━━━━━━━━━━━━━━━━━━[0m[37m[0m [1m1s[0m 31ms/step - loss: 0.0022
Epoch 11/7

In [17]:
import numpy as np
import joblib
import xgboost as xgb
from tensorflow.keras.models import load_model

# Load models and scaler
lstm_model = load_model("models/lstm_model.h5")
xgb_model = xgb.XGBRegressor()
xgb_model.load_model("models/xgboost_model.json")
scaler = joblib.load("models/scaler.pkl")

def generate_signal(predicted_price, current_price, threshold=0.002):
    diff = (predicted_price - current_price) / current_price
    if diff > threshold:
        return "BUY"
    elif diff < -threshold:
        return "SELL"
    else:
        return "HOLD"

def forecast_next(close_prices: list):
    data = np.array(close_prices[-60:]).reshape(-1, 1)
    scaled = scaler.transform(data)

    # LSTM expects shape (samples, time_steps, features)
    X = scaled.reshape(1, 60, 1)
    lstm_pred = lstm_model.predict(X)

    # Add lstm prediction as feature for XGBoost
    X_xgb = np.hstack((scaled.reshape(1, 60), lstm_pred))
    xgb_pred = xgb_model.predict(X_xgb)

    predicted_scaled = xgb_pred[0]
    predicted_price = scaler.inverse_transform([[predicted_scaled]])[0][0]

    current_price = close_prices[-1]
    signal = generate_signal(predicted_price, current_price)

    return {
        "predicted_price": round(predicted_price, 3),
        "current_price": round(current_price, 3),
        "signal": signal
    }


TypeError: Could not locate function 'mse'. Make sure custom classes are decorated with `@keras.saving.register_keras_serializable()`. Full object config: {'module': 'keras.metrics', 'class_name': 'function', 'config': 'mse', 'registered_name': 'mse'}

In [19]:
import numpy as np
import joblib
import xgboost as xgb
from tensorflow.keras.models import load_model

# Load LSTM model (native format)
lstm_model = load_model("models/lstm_model.keras")  # ⬅️ No 'mse' error here

# Load XGBoost model and scaler
xgb_model = xgb.XGBRegressor()
xgb_model.load_model("models/xgboost_model.json")
scaler = joblib.load("models/scaler.pkl")

# Signal generator logic
def generate_signal(predicted_price, current_price, threshold=0.002):
    diff = (predicted_price - current_price) / current_price
    if diff > threshold:
        return "BUY"
    elif diff < -threshold:
        return "SELL"
    else:
        return "HOLD"

# Forecast next price and give signal
def forecast_next(close_prices: list):
    if len(close_prices) < 60:
        raise ValueError("Input must have at least 60 closing prices.")

    # Scale the latest 60 prices
    recent_data = np.array(close_prices[-60:]).reshape(-1, 1)
    scaled = scaler.transform(recent_data)

    # Predict with LSTM
    X_lstm = scaled.reshape(1, 60, 1)
    lstm_pred = lstm_model.predict(X_lstm)

    # Use LSTM output as extra feature for XGBoost
    X_xgb = np.hstack((scaled.reshape(1, 60), lstm_pred))
    xgb_pred = xgb_model.predict(X_xgb)

    # Inverse transform to original price
    predicted_scaled = xgb_pred[0]
    predicted_price = scaler.inverse_transform([[predicted_scaled]])[0][0]

    current_price = close_prices[-1]
    signal = generate_signal(predicted_price, current_price)

    return {
        "predicted_price": round(predicted_price, 3),
        "current_price": round(current_price, 3),
        "signal": signal
    }


In [21]:
from fastapi import FastAPI
from pydantic import BaseModel
from app.forecast_logic import forecast_next

app = FastAPI()

class PriceInput(BaseModel):
    close_prices: list

@app.get("/")
def root():
    return {"status": "XAUUSD AI API is running ✅"}

@app.post("/predict")
def predict(input: PriceInput):
    result = forecast_next(input.close_prices)
    return result


In [22]:
from fastapi import FastAPI, Request
from fastapi.templating import Jinja2Templates
from fastapi.responses import HTMLResponse
from app.forecast_logic import forecast_next
from pydantic import BaseModel

app = FastAPI()
templates = Jinja2Templates(directory="app/templates")

class PriceInput(BaseModel):
    close_prices: list

@app.get("/", response_class=HTMLResponse)
def read_root(request: Request):
    return templates.TemplateResponse("index.html", {"request": request})

@app.post("/predict")
def predict(input: PriceInput):
    return forecast_next(input.close_prices)
