<a href="https://colab.research.google.com/github/mahdi-alalawi/AndroidTutorialForBeginners/blob/master/Untitled1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
!wget http://prdownloads.sourceforge.net/ta-lib/ta-lib-0.4.0-src.tar.gz
!tar xzf ta-lib-0.4.0-src.tar.gz
%cd ta-lib
!./configure --prefix=/usr
!make
!make install
%cd ..

In [None]:
!pip install ta-lib

In [None]:
import pandas as pd
import numpy as np
import talib
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from xgboost import XGBRegressor
from keras.models import Sequential
from keras.layers import Dense, Dropout, BatchNormalization
from keras.optimizers import Adam
import seaborn as sns
import requests
from bs4 import BeautifulSoup
from statsmodels.tsa.seasonal import seasonal_decompose
from sklearn.ensemble import RandomForestRegressor
from sklearn.svm import SVR
from sklearn.metrics import mean_squared_error
from textblob import TextBlob
from google.colab import drive

# Mount Google Drive
drive.mount('/content/drive')

# Define the file path for the gold price data
file_path = '/content/drive/MyDrive/gold_usd_prices.csv'  # Adjust if your path is different

# Load Data
def load_data(filename):
    """Loads data from a CSV file, sets 'date' as index, and parses dates."""
    try:
        data = pd.read_csv(filename, index_col='date', parse_dates=True)
        print("Data loaded successfully.")
        return data
    except FileNotFoundError:
        print(f"Error: File not found at {filename}")
        return None
    except pd.errors.EmptyDataError:
        print(f"Error: File at {filename} is empty")
        return None
    except pd.errors.ParserError:
        print(f"Error: Could not parse file at {filename}. Check file format.")
        return None
    except Exception as e:
        print(f"An unexpected error occurred: {e}")
        return None

# Clean Data
def clean_data(data):
    """Removes rows with any missing values."""
    if data is not None:
        data.dropna(inplace=True)
        print("Data cleaned successfully.")
        return data
    else:
      return None

# Fetch News
def fetch_news():
    """Fetches news titles from Investing.com."""
    url = "https://www.investing.com/news/commodities-news/gold"
    try:
        response = requests.get(url)
        response.raise_for_status()  # Raise HTTPError for bad responses (4xx or 5xx)
        soup = BeautifulSoup(response.content, 'html.parser')
        news_headers = soup.select('div.news-item-header')
        news_titles = [header.text.strip() for header in news_headers]
        print("News fetched successfully.")
        return news_titles
    except requests.exceptions.RequestException as e:
        print(f"Error fetching news: {e}")
        return []

# Analyze Sentiment
def analyze_sentiment(news_titles):
    """Analyzes sentiment of news titles using TextBlob."""
    sentiments = []
    for title in news_titles:
        analysis = TextBlob(title)
        score = analysis.sentiment.polarity
        sentiments.append(score)
    print("Sentiment analyzed successfully.")
    return sentiments

# Calculate Order Blocks
def calculate_order_blocks(data):
    """Identifies order block zones."""
    order_blocks = []
    if data is not None:
        for i in range(1, len(data)):
            if data['Close'][i] < data['Close'][i-1] and data['Close'][i-1] > data['Close'][i-2]:
                order_blocks.append((data.index[i-1], data['Close'][i-1]))
    return order_blocks

# Calculate Break of Structure (BOS)
def calculate_bos(data):
    """Identifies break of structure points."""
    bos = []
    if data is not None:
        for i in range(1, len(data)):
            if data['Close'][i] > data['Close'][i-1] and data['Close'][i-1] < data['Close'][i-2]:
                bos.append(data.index[i])
    return bos

# Calculate Change of Character (CHOCH)
def calculate_choch(data):
    """Identifies change of character points."""
    choch = []
    if data is not None:
        for i in range(1, len(data)):
            if (data['Close'][i] > data['Close'][i-1] and data['Close'][i-1] < data['Close'][i-2]) or \
               (data['Close'][i] < data['Close'][i-1] and data['Close'][i-1] > data['Close'][i-2]):
                choch.append(data.index[i])
    return choch

# Calculate Supply and Demand Zones
def calculate_supply_demand(data):
    """Identifies supply and demand zones."""
    supply_zones = []
    demand_zones = []
    if data is not None:
        for i in range(1, len(data)):
            if data['Close'][i] > data['High'][i-1]:
                supply_zones.append((data.index[i], data['High'][i-1]))
            elif data['Close'][i] < data['Low'][i-1]:
                demand_zones.append((data.index[i], data['Low'][i-1]))
    return supply_zones, demand_zones

# Calculate Support and Resistance Lines
def calculate_support_resistance(data):
    """Calculates support and resistance levels using rolling min/max."""
    if data is not None:
      support = data['Low'].rolling(window=20).min()
      resistance = data['High'].rolling(window=20).max()
      return support, resistance
    else:
        return None, None

# Calculate Fibonacci Retracement Levels
def fibonacci_retracement(high, low):
    """Calculates Fibonacci retracement levels."""
    diff = high - low
    return {
        'fib_retracement_0.0': low,
        'fib_retracement_23.6': low + diff * 0.236,
        'fib_retracement_38.2': low + diff * 0.382,
        'fib_retracement_50.0': low + diff * 0.5,
        'fib_retracement_61.8': low + diff * 0.618,
        'fib_retracement_100.0': high
    }

# Calculate Relative Strength Index (RSI)
def calculate_rsi(data, period=14):
    """Calculates the RSI."""
    if data is not None:
        data['RSI'] = talib.RSI(data['Close'], timeperiod=period)
        return data
    else:
        return None

# Calculate Moving Average
def calculate_moving_average(data, period=20):
    """Calculates the moving average."""
    if data is not None:
        data['MA'] = data['Close'].rolling(window=period).mean()
        return data
    else:
      return None


# Calculate Momentum
def calculate_momentum(data, period=10):
    """Calculates the momentum."""
    if data is not None:
        data['Momentum'] = data['Close'].diff(periods=period)
        return data
    else:
      return None

# Analyze Volume
def analyze_volume(data):
    """Calculates moving average of volume."""
    if data is not None:
      data['Volume_MA'] = data['Volume'].rolling(window=20).mean()
      return data
    else:
      return None


# Calculate Wolf Wave
def calculate_wolf_wave(data):
    """Identifies Wolf Wave points."""
    wave_points = []
    if data is not None:
        for i in range(1, len(data)-4):
            if (data['Close'][i] < data['Close'][i-1] and
                data['Close'][i+1] < data['Close'][i] and
                data['Close'][i+2] > data['Close'][i+1] and
                data['Close'][i+3] < data['Close'][i+2] and
                data['Close'][i+4] > data['Close'][i+3]):
                    wave_points.append(data.index[i])
    return wave_points

# Create Additional Features
def create_features(data):
    """Creates additional features like price change, log returns, and volatility."""
    if data is not None:
        data['Price_Change'] = data['Close'].pct_change()
        data['Log_Returns'] = np.log(data['Close'] / data['Close'].shift(1))
        data['Volatility'] = data['Log_Returns'].rolling(window=20).std()
        return data
    else:
        return None


# Calculate Price Channel
def calculate_price_channel(data, window=20):
    """Calculates price channel (upper, lower, mid)."""
    if data is not None:
      data['Channel_Upper'] = data['High'].rolling(window=window).max()
      data['Channel_Lower'] = data['Low'].rolling(window=window).min()
      data['Channel_Mid'] = (data['Channel_Upper'] + data['Channel_Lower']) / 2
      return data
    else:
      return None

# Advanced Risk Management
def advanced_risk_management(data):
    """Calculates ATR and sets stop loss and take profit levels."""
    if data is not None:
        data['ATR'] = talib.ATR(data['High'], data['Low'], data['Close'], timeperiod=14)
        data['Stop_Loss'] = data['Close'] - (data['ATR'] * 1.5)
        data['Take_Profit'] = data['Close'] + (data['ATR'] * 3)
        return data
    else:
      return None


# Build Strategy
def build_strategy(data):
    """Builds the complete trading strategy."""
    if data is None:
        print("No data to build strategy.")
        return None, None, None

    data = clean_data(data)

    if data is None:
      return None, None, None

    # Calculate technical indicators and features
    order_blocks = calculate_order_blocks(data)
    bos = calculate_bos(data)
    choch = calculate_choch(data)
    supply_zones, demand_zones = calculate_supply_demand(data)
    support, resistance = calculate_support_resistance(data)
    if support is not None and resistance is not None:
      data['Support'] = support
      data['Resistance'] = resistance

    fib_levels = fibonacci_retracement(data['High'].max(), data['Low'].min())
    for key, value in fib_levels.items():
        data[key] = value
    data = calculate_rsi(data)
    data = calculate_moving_average(data)
    data = calculate_momentum(data)
    data = analyze_volume(data)
    wave_points = calculate_wolf_wave(data)
    data = create_features(data)
    data = calculate_price_channel(data)
    data = advanced_risk_management(data)

    # Print results for technical analysis
    print("Order Blocks:", order_blocks)
    print("BOS:", bos)
    print("CHOCH:", choch)
    print("Supply Zones:", supply_zones)
    print("Demand Zones:", demand_zones)
    print("Wolf Wave Points:", wave_points)

    # Fetch and analyze sentiment from news
    news_titles = fetch_news()
    sentiments = analyze_sentiment(news_titles)
    if sentiments:
        data['Sentiment'] = np.mean(sentiments)
    else:
        data['Sentiment'] = 0 # Assign a default value if no sentiment found

    # Prepare data for machine learning
    data['target'] = data['Close'].shift(-1)  # shift target column up by one row
    data.dropna(inplace=True) # Remove rows that got NaN because of the shift
    features = data.drop(['target'], axis=1)
    target = data['target']

    # Split the data
    X_train, X_test, y_train, y_test = train_test_split(features, target, test_size=0.2, random_state=42)

    # Train Machine Learning models
    model_xgb = XGBRegressor()
    model_xgb.fit(X_train, y_train)

    model_rf = RandomForestRegressor()
    model_rf.fit(X_train, y_train)

    model_svr = SVR()
    model_svr.fit(X_train, y_train)

    # Generate trading signals from models
    signals_xgb = model_xgb.predict(X_test)
    signals_rf = model_rf.predict(X_test)
    signals_svr = model_svr.predict(X_test)

    print("Strategy built successfully.")
    return signals_xgb, signals_rf, signals_svr

# Analyze Results
def analyze_results(data, signals_xgb, signals_rf, signals_svr):
    """Analyzes results and plots graphs."""
    if data is not None:
        plt.figure(figsize=(14, 7))
        plt.plot(data.index, data['Close'], label='Close Price', color='blue')
        if signals_xgb is not None:
          plt.plot(data.index[-len(signals_xgb):], signals_xgb, label='XGBoost Signals', color='orange')
        if signals_rf is not None:
          plt.plot(data.index[-len(signals_rf):], signals_rf, label='Random Forest Signals', color='green')
        if signals_svr is not None:
          plt.plot(data.index[-len(signals_svr):], signals_svr, label='SVM Signals', color='red')
        if 'Channel_Upper' in data.columns and 'Channel_Lower' in data.columns:
            plt.plot(data.index, data['Channel_Upper'], label='Upper Channel', linestyle='--', color='purple')
            plt.plot(data.index, data['Channel_Lower'], label='Lower Channel', linestyle='--', color='brown')
        plt.title('Gold Price and Model Predictions')
        plt.xlabel('Date')
        plt.ylabel('Price / Signals')
        plt.legend()
        plt.show()
        print("Results analyzed and plotted successfully.")

# Manage Risk
def risk_management(data, signals):
    """Calculates and prints the maximum drawdown."""
    if data is not None and 'Returns' in data.columns:
        max_drawdown = (data['Returns'].cumsum().min() - data['Returns'].cumsum().max()) / data['Returns'].cumsum().max()
        print(f'Max Drawdown: {max_drawdown}')
        print("Risk managed successfully.")

# Execute Trades
def execute_trades(data, signals):
    """Executes trades based on the signals."""
    if data is not None and signals is not None:
        for i, signal in enumerate(signals):
            if signal > 0:
                print(f"Buy at {data.index[-len(signals) + i]} at price {data['Close'][-len(signals) + i]}")
                if 'Stop_Loss' in data.columns and 'Take_Profit' in data.columns:
                  print(f"Stop loss at {data['Stop_Loss'][-len(signals) + i]} and take profit at {data['Take_Profit'][-len(signals) + i]}")
            elif signal < 0:
                print(f"Sell at {data.index[-len(signals) + i]} at price {data['Close'][-len(signals) + i]}")
                if 'Stop_Loss' in data.columns and 'Take_Profit' in data.columns:
                  print(f"Stop loss at {data['Stop_Loss'][-len(signals) + i]} and take profit at {data['Take_Profit'][-len(signals) + i]}")
        print("Trades executed successfully.")

# Periodic Improvement (placeholder)
def periodic_improvement(data):
    """Placeholder for periodic system improvements."""
    # Add logic for re-training the model and updating the strategy here
    print("Periodic improvement check completed.")
    pass

# Save Results to CSV
def save_results_to_csv(data, filename='results.csv'):
    """Saves the results in a CSV file."""
    if data is not None:
      data.to_csv(filename)
      print("Results saved to CSV successfully.")

# Analyze All Models
def analyze_all_models(data, signals_xgb, signals_rf, signals_svr):
  """Analyzes the total return of each model."""
  print("Analyzing performance of all models:")
  if signals_xgb is not None:
    print(f'Total Return XGBoost: {signals_xgb.sum()}')
  if signals_rf is not None:
    print(f'Total Return Random Forest: {signals_rf.sum()}')
  if signals_svr is not None:
    print(f'Total Return SVM: {signals_svr.sum()}')

# Risk Analysis
def risk_analysis(data):
    """Analyzes and prints max drawdown of the strategy."""
    if data is not None and 'Returns' in data.columns:
      max_drawdown = (data['Returns'].cumsum().min() - data['Returns'].cumsum().max()) / data['Returns'].cumsum().max()
      print(f'Max Drawdown: {max_drawdown}')

# Correlation Analysis
def correlation_analysis(data):
    """Analyzes and plots the correlation matrix of the features."""
    if data is not None:
        correlation = data.corr()
        plt.figure(figsize=(12, 8))
        sns.heatmap(correlation, annot=True, cmap='coolwarm')
        plt.title('Correlation Matrix')
        plt.show()

# Time Series Analysis
def time_series_analysis(data):
    """Performs and plots time series decomposition."""
    if data is not None:
      seasonal_decomp = seasonal_decompose(data['Close'], model='additive', period=30)
      seasonal_decomp.plot()
      plt.show()


# Train Neural Network
def train_neural_network(X_train, y_train):
    """Trains the neural network model."""
    model = Sequential()
    model.add(Dense(64, activation='relu', input_dim=X_train.shape[1]))
    model.add(Dropout(0.2))
    model.add(BatchNormalization())
    model.add(Dense(32, activation='relu'))
    model.add(Dropout(0.2))
    model.add(Dense(1))  # Output layer
    model.compile(optimizer=Adam(learning_rate=0.001), loss='mean_squared_error')
    model.fit(X_train, y_train, epochs=100, batch_size=32, verbose=0)
    print("Neural network trained successfully.")
    return model

# Generate Signals with the Neural Network
def generate_signals(model, data):
  """Generates signals using a trained model."""
  if model is not None and data is not None:
    return model.predict(data)
  else:
    return None

# Evaluate Neural Network Performance
def evaluate_performance(signals, data):
    """Evaluates and prints the total return from the signals."""
    if data is not None and 'Returns' in data.columns:
        total_return = data['Returns'].sum()
        print(f'Total Return: {total_return}')
        print("Neural network performance evaluated.")

# Compare Models
def compare_models(signals_xgb, nn_signals, data):
    """Compares cumulative returns of XGBoost and neural network models."""
    if data is not None and signals_xgb is not None and nn_signals is not None:
      plt.figure(figsize=(14, 7))
      plt.plot(data.index[-len(signals_xgb):], np.cumsum(signals_xgb), label='XGBoost Cumulative Returns', color='blue')
      plt.plot(data.index[-len(nn_signals):], np.cumsum(nn_signals), label='Neural Network Cumulative Returns', color='orange')
      plt.title('Model Comparison: XGBoost vs Neural Network')
      plt.xlabel('Date')
      plt.ylabel('Cumulative Returns')
      plt.axhline(0, color='red', linestyle='--')
      plt.legend()
      plt.show()

# Historical Analysis
def historical_analysis(data):
    """Analyzes and plots distribution of historical returns."""
    if data is not None:
        historical_returns = data['Close'].pct_change().dropna()
        plt.figure(figsize=(14, 7))
        plt.hist(historical_returns, bins=50, alpha=0.7, color='blue', edgecolor='black')
        plt.title('Historical Returns Distribution')
        plt.xlabel('Returns')
        plt.ylabel('Frequency')
        plt.show()
        print("Historical analysis completed.")

# Main Execution
if __name__ == "__main__":
    # Load and prepare data
    data = load_data(file_path)

    if data is not None:
      data['Returns'] = data['Close'].pct_change()

    # Build the trading strategy
    signals_xgb, signals_rf, signals_svr = build_strategy(data)

    # Analyze the results and plot graphs
    analyze_results(data, signals_xgb, signals_rf, signals_svr)

    # Manage risk
    if signals_xgb is not None:
      risk_management(data, signals_xgb)

    # Execute trades based on XGBoost model signals
    if signals_xgb is not None:
      execute_trades(data, signals_xgb)

    # Perform periodic improvements (placeholder)
    periodic_improvement(data)

    # Save the results in CSV format
    save_results_to_csv(data)

    # Analyze all model
    analyze_all_models(data, signals_xgb, signals_rf, signals_svr)

    # Perform risk analysis
    risk_analysis(data)

    # Perform Correlation Analysis
    correlation_analysis(data)

    # Perform Time Series Analysis
    time_series_analysis(data)

    # Split data for the neural network
    if data is not None:
      X_train, X_test, y_train, y_test = train_test_split(data.drop(['target'], axis=1), data['target'], test_size=0.2, random_state=42)
      # Train the neural network model
      nn_model = train_neural_network(X_train, y_train)
    else:
      nn_model = None


    # Generate signals using the neural network
    nn_signals = generate_signals(nn_model, X_test)


    # Evaluate performance of the neural network
    if nn_signals is not None:
      evaluate_performance(nn_signals, data)

    # Compare performance between XGBoost and neural network models
    if nn_signals is not None:
      compare_models(signals_xgb, nn_signals, data)

    # Perform periodic improvements
    periodic_improvement(data)

    # Historical Data Analysis
    historical_analysis(data)

    print("Strategy completed.")