In [1]:
import pandas as pd
import pandas_ta as ta
import numpy as np
import plotly.graph_objects as go
import matplotlib.pyplot as plt
import requests
import defs

In [2]:
fx_pairs = ['AUD_CAD', 'AUD_USD', 'NZD_USD', 'EUR_USD', 'GBP_JPY', 'GBP_USD', 'USD_CAD', 'USD_JPY', 'XAU_USD', 'WTICO_USD', 'NAS100_USD', 'SPX500_USD', 'AU200_AUD', 'BTC_USD']
timeframes = ['H1', 'H4', 'D']
session = requests.Session()
fx_dataframes = {}
trend_df_columns = ['Asset'] + [f'MOMO_{timeframe}' for timeframe in timeframes] + [f'PATI_{timeframe}' for timeframe in timeframes] + [f'OE_{timeframe}' for timeframe in timeframes]

trend_data_list = []
total_bytes = 0

for instrument in fx_pairs:
    trend_data = {'Asset': instrument}
    for timeframe in timeframes:
        url = f'{defs.OANDA_URL}/instruments/{instrument}/candles'
        params = {'count': 60, 'granularity': timeframe, 'price': "M"}
        response = session.get(url, params=params, headers=defs.SECURE_HEADER)

        if response.status_code == 200:
            size_in_bytes = len(response.content)
            total_bytes += size_in_bytes
            data = response.json()
            my_data = [{'time': candle['time'], 'open': float(candle['mid']['o']), 'high': float(candle['mid']['h']), 'low': float(candle['mid']['l']), 'close': float(candle['mid']['c']), 'volume': candle['volume']} for candle in data['candles'] if candle['complete']]
            precision = 3 if 'JPY' in instrument else 5
            df = pd.DataFrame(my_data)
            df['time'] = pd.to_datetime(df['time'])
            df.set_index('time', inplace=True)
            df['SMA_FAST'] = ta.sma(df['close'], length=20).round(precision)
            df['SMA_SLOW'] = ta.sma(df['close'], length=50).round(precision)
            df['MOMO'] = np.where(df['SMA_FAST'] > df['SMA_SLOW'], 'Uptrend', 'Downtrend')
            df['RSI'] = ta.rsi(df['close'], length=7)
            df['OE'] = 'Neutral'
            for i in range(7, len(df)):
                current_slice = df.iloc[i-7:i+1]
                if (current_slice['RSI'] > 80).any():
                    df.at[df.index[i], 'OE'] = 'Downtrend'
                elif (current_slice['RSI'] < 20).any():
                    df.at[df.index[i], 'OE'] = 'Uptrend' 
            

            fx_dataframes[(instrument, timeframe)] = df

            # Initialize variables for trend tracing logic
            df['Candle'] = np.select([df['close'] > df['open'], df['close'] < df['open']], ['Bull', 'Bear'], default='Doji')
            df['Trend'] = 'Neutral'
            df.at[df.index[0], 'Trend'] = 'Neutral'
            
            higher_high = None
            lower_low = None

            bullish_pullback_count = 0
            bearish_pullback_count = 0
            bullish_pullback = False
            bearish_pullback = False

            swing_high = None
            swing_low = None

            uptrend_lowest_pullback = None
            downtrend_highest_pullback = None

            previous_high = df['high'].iloc[0]
            previous_low = df['low'].iloc[0]

            current_trend = 'Neutral'
            df.iloc[0, df.columns.get_loc('Trend')] = current_trend


            df['Price_Status'] = None
            price_status = None

            # Iterating logic starts
            for i, row in df.iloc[1:].iterrows():
                # Insert trend tracing logic here and update df['Trend'] accordingly
                current_high = row['high']
                current_low = row['low']
                candle_type = row['Candle']
                current_close = row['close']

                # clear_output()
                # Trend rotation
                if current_trend == "Neutral":
                    if swing_low is None and swing_high is None:
                        swing_high = current_high
                        swing_low = current_low
                        higher_high = current_high
                        lower_low = current_low

                if current_close > swing_high and current_close > higher_high:
                    current_trend = 'Uptrend'
                    bullish_pullback_count = 0
                    bullish_pullback = False
                    bearish_pullback_count = 0
                    bearish_pullback = False
                    if uptrend_lowest_pullback is not None:
                        swing_low = uptrend_lowest_pullback
                elif current_close < swing_low and current_low < lower_low:
                    current_trend = 'Downtrend'
                    bearish_pullback_count = 0
                    bearish_pullback = False
                    bullish_pullback_count = 0
                    bullish_pullback = False
                    if downtrend_highest_pullback is not None:
                        swing_high = downtrend_highest_pullback
                # Finding the range in the beginning of the dataset

                df.at[i, 'Trend'] = current_trend

                if bullish_pullback or bearish_pullback:
                    price_status = 'Consolidation'
                else:
                    price_status = 'Extension'

                df.at[i, 'Price_Status'] = price_status
                # ================================================================================================== #

                if current_trend == 'Uptrend':
                    if current_high > previous_high and current_low > swing_high and not bullish_pullback:
                        if higher_high is not None:
                            higher_high = max(higher_high, current_high)
                        else:
                            higher_high = current_high
                    if current_close < higher_high and candle_type == 'Bear':
                        bullish_pullback_count += 1
                    if bullish_pullback_count >= 2:
                        bullish_pullback = True
                        swing_high = higher_high

                    if bullish_pullback and current_close > swing_low:
                        if current_low < previous_low:
                            uptrend_lowest_pullback = current_low

                elif current_trend == 'Downtrend':
                    if current_low < previous_low and current_low < swing_low and not bearish_pullback:
                        if lower_low is not None:
                            lower_low = min(lower_low, current_low)
                        else:
                            lower_low = current_low
                    if current_close > lower_low and candle_type == 'Bull':
                        bearish_pullback_count += 1
                    if bearish_pullback_count >= 2:
                        bearish_pullback = True
                        swing_low = lower_low

                    if bearish_pullback and current_close < swing_high:
                        if current_high > previous_high:
                            downtrend_highest_pullback = current_high
                            higher_high = current_high

                df.at[i, 'Swing_high'] = swing_high
                df.at[i, 'Swing_low'] = swing_low
                previous_low = current_low
                previous_high = current_high
                
                # =================================================================================================================================
                
            
            # Extract the final 'Trend' value and update trend_data
            final_trend = df['Trend'].iloc[-1]
            column_name = f'PATI_{timeframe}'
            trend_data[column_name] = final_trend
            trend_data[f'MOMO_{timeframe}'] = df['MOMO'].iloc[-1]
            trend_data[f'OE_{timeframe}'] = df['OE'].iloc[-1]
        else:
            print(f"Failed to fetch data for {instrument} with granularity {timeframe}: {response.status_code}")
            column_name = f'PATI_{timeframe}'
            trend_data[column_name] = 'Error'

    trend_data_list.append(trend_data)

trend_df = pd.DataFrame(trend_data_list, columns=trend_df_columns)
trend_df.set_index('Asset', inplace=True)


total_kb = total_bytes / 1024
total_mb = round((total_kb / 1024), 2)

trend_df['Uptrend_Count'] = trend_df.apply(lambda row: (row == 'Uptrend').sum(), axis=1)
trend_df['Downtrend_Count'] = trend_df.apply(lambda row: sum(-1 for val in row if val == 'Downtrend'), axis=1)
trend_df = trend_df.sort_index()
trend_df['Score'] = trend_df['Uptrend_Count'] + trend_df['Downtrend_Count']

print(f'The total KB downloaded was {total_kb} ({total_mb})MB')



  df.at[df.index[i], 'OE'] = 'Uptrend'
  df.at[df.index[i], 'OE'] = 'Downtrend'


The total KB downloaded was 24.1201171875 (0.02)MB


  df.at[df.index[i], 'OE'] = 'Uptrend'


In [5]:
trend_df.dtypes

MOMO_H1             object
MOMO_H4             object
MOMO_D              object
PATI_H1             object
PATI_H4             object
PATI_D              object
OE_H1               object
OE_H4              float64
OE_D               float64
Uptrend_Count        int64
Downtrend_Count      int64
Score                int64
dtype: object

In [None]:
fx_dataframes['AUD_USD', 'H1'].tail()

In [None]:
# Assign weights
weights = {'D': 3, 'H4': 2, 'H1': 1}

# Calculate weighted scores for uptrends and downtrends
trend_df['Uptrend_Score'] = (trend_df['MOMO_D'] == 'Uptrend') * weights['D'] + \
                            (trend_df['MOMO_H4'] == 'Uptrend') * weights['H4'] + \
                            (trend_df['MOMO_H1'] == 'Uptrend') * weights['H1'] + \
                            (trend_df['PATI_D'] == 'Uptrend') * weights['D'] + \
                            (trend_df['PATI_H4'] == 'Uptrend') * weights['H4'] + \
                            (trend_df['PATI_H1'] == 'Uptrend') * weights['H1']

trend_df['Downtrend_Score'] = (trend_df['MOMO_D'] == 'Downtrend') * weights['D'] + \
                              (trend_df['MOMO_H4'] == 'Downtrend') * weights['H4'] + \
                              (trend_df['MOMO_H1'] == 'Downtrend') * weights['H1'] + \
                              (trend_df['PATI_D'] == 'Downtrend') * weights['D'] + \
                              (trend_df['PATI_H4'] == 'Downtrend') * weights['H4'] + \
                              (trend_df['PATI_H1'] == 'Downtrend') * weights['H1']

# Calculate a net trend score (positive for uptrends, negative for downtrends)
trend_df['Net_Trend_Score'] = trend_df['Uptrend_Score'] - trend_df['Downtrend_Score']


## Plotting

In [None]:
import seaborn as sns
import matplotlib.pyplot as plt


In [None]:
def trend_heat_map(trend_df):
    image_path = './images/latest_trend_heatmap.png'
    trend_numeric = trend_df.replace(
        {'Uptrend': 1, 'Downtrend': -1, 'Neutral': 0})
    trend_signals = trend_numeric[[
        'MOMO_H1', 'MOMO_H4', 'MOMO_D', 'PATI_H1', 'PATI_H4', 'PATI_D']]

    plt.figure(figsize=(10, 8))
    sns.heatmap(trend_signals, cmap='RdYlGn', annot=True, fmt="d")
    plt.title('Trend Strength Heatmap')

    plt.savefig(image_path)
    plt.close()


In [None]:
trend_heat_map(trend_df)

In [None]:
trend_numeric = trend_df.replace({'Uptrend': 1, 'Downtrend': -1, 'Neutral': 0})

# Select only the columns with trend signals
trend_signals = trend_numeric[['MOMO_H1', 'MOMO_H4', 'MOMO_D', 'PATI_H1', 'PATI_H4', 'PATI_D']]

plt.figure(figsize=(10, 8))
sns.heatmap(trend_signals, cmap='RdYlGn', annot=True, fmt="d")
plt.title('Trend Strength Heatmap')
plt.show()


In [None]:
# 'Uptrend_Count' and 'Downtrend_Count' are already calculated in your DataFrame
trend_df[['Uptrend_Count', 'Downtrend_Count']].plot(kind='bar', color=['green', 'red'], figsize=(14, 6))
plt.title('Trend Signal Strength per Instrument')
plt.ylabel('Count')
plt.xlabel('Asset')
plt.xticks(rotation=45)
plt.legend(title='Trend')
plt.tight_layout()
plt.show()

In [None]:
import matplotlib.pyplot as plt
import numpy as np

# Select a specific instrument, e.g., 'AUD_JPY'
instrument_trends = trend_numeric.loc['AUD_CAD', ['MOMO_H1', 'MOMO_H4', 'MOMO_D', 'PATI_H1', 'PATI_H4', 'PATI_D']]

labels=np.array(['MOMO_H1', 'MOMO_H4', 'MOMO_D', 'PATI_H1', 'PATI_H4', 'PATI_D'])
stats=instrument_trends.values

angles=np.linspace(0, 2*np.pi, len(labels), endpoint=False).tolist()

stats=np.concatenate((stats,[stats[0]]))
angles+=angles[:1]

fig, ax = plt.subplots(figsize=(6, 6), subplot_kw=dict(polar=True))
ax.fill(angles, stats, color='red', alpha=0.25)
ax.plot(angles, stats, color='red', linewidth=2)
ax.set_yticklabels([])
ax.set_xticks(angles[:-1])
ax.set_xticklabels(labels)

plt.title('Trend Strength Radar Chart for AUD_JPY', size=20, color='red', y=1.1)
plt.show()



In [None]:
df['OE'] = np.nan

for i in range(7, len(df)):
    current_slice = df.iloc[i-7:i+1]  # Slice of current and previous 7 rows
    # Check if any RSI value in the slice is above 80
    if (current_slice['RSI'] > 80).any():
        df.at[df.index[i], 'OE'] = 'Downtrend'
    # Check if any RSI value in the slice is below 20
    elif (current_slice['RSI'] < 20).any():
        df.at[df.index[i], 'OE'] = 'Uptrend'

In [None]:
trend_df_columns = ['Asset'] + [f'MOMO_{timeframe}' for timeframe in timeframes] + [f'PATI_{timeframe}' for timeframe in timeframes] + [f'OE_{timeframe}' for timeframe in timeframes]


In [None]:
trend_df.columns