In [6]:
import yfinance as yf
from datetime import datetime
import warnings

# Suprimir avisos específicos
warnings.filterwarnings("ignore", category=FutureWarning, module="yfinance")


In [7]:
def get_yahoo_data_history(symbol : str, period : str, interval: str, start = '1900-01-01', end = datetime.now(), prepost : bool = True):
        '''
        Data collection from yahoo

        Parameters:
        period : str
            Valid periods: 1d,5d,1mo,3mo,6mo,1y,2y,5y,10y,ytd,max Either Use period parameter or use start and end
        interval : str
            Valid intervals: 1m,2m,5m,15m,30m,60m,90m,1h,1d,5d,1wk,1mo,3mo Intraday data cannot extend last 60 days
        start: str
            Download start date string (YYYY-MM-DD) or _datetime, inclusive. Default is 1900-01-01 E.g. for start="2020-01-01", the first data point will be on "2020-01-01"
        end: str
            Download end date string (YYYY-MM-DD) or _datetime, exclusive. Default is now E.g. for end="2023-01-01", the last data point will be on "2022-12-31"
        prepost : bool
            Include Pre and Post market data in results? Default is False
        debug: bool
            If passed as False, will suppress error message printing to console.
        '''
        yahoo_data_history = yf.Ticker(symbol).history(period=period, interval=interval, start=start, end=end, prepost=prepost)
        return yahoo_data_history

history = get_yahoo_data_history(symbol="MSFT",period='1y', interval='1d', end = datetime.now(), prepost=True)

In [8]:
history

Unnamed: 0_level_0,Open,High,Low,Close,Volume,Dividends,Stock Splits
Date,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1
1986-03-13 00:00:00-05:00,0.054485,0.062498,0.054485,0.059827,1031788800,0.0,0.0
1986-03-14 00:00:00-05:00,0.059827,0.063032,0.059827,0.061963,308160000,0.0,0.0
1986-03-17 00:00:00-05:00,0.061963,0.063566,0.061963,0.063032,133171200,0.0,0.0
1986-03-18 00:00:00-05:00,0.063032,0.063566,0.060895,0.061429,67766400,0.0,0.0
1986-03-19 00:00:00-05:00,0.061429,0.061963,0.059827,0.060361,47894400,0.0,0.0
...,...,...,...,...,...,...,...
2025-02-03 00:00:00-05:00,411.600006,415.410004,408.660004,410.920013,25679100,0.0,0.0
2025-02-04 00:00:00-05:00,412.690002,413.920013,409.739990,412.369995,20532100,0.0,0.0
2025-02-05 00:00:00-05:00,412.350006,413.829987,410.399994,413.290009,16316700,0.0,0.0
2025-02-06 00:00:00-05:00,414.000000,418.200012,414.000000,415.820007,16309800,0.0,0.0


In [2]:
import talib
import pandas as pd

class CandlesPatterns:
    """
    Detect all candlestick patterns using TA-Lib.
    """

    def __init__(self):
        self.result_candles_history_df = pd.DataFrame(columns=['Pattern', 'Signal', 'Relevance', 'Stoploss'])
        self.result_candles_df = pd.DataFrame(columns=['Pattern', 'Signal', 'Relevance', 'Stoploss'])
        
        
    def detect_pattern(self, data: pd.DataFrame, pattern_function, pattern_name: str):
        """
        General method to detect a specific candlestick pattern.
        """
        data_pass = data

        detection = pattern_function(data_pass['Open'], data_pass['High'], data_pass['Low'], data_pass['Close'])

        non_zero_detection = detection[detection != 0]
        if not non_zero_detection.empty:
            for date, signal in non_zero_detection.items():
                # Cálculo do Stoploss baseado no padrão
                if pattern_name in [
                    "doji", "dragonfly_doji", "gravestone_doji", "engulfing",
                    "morning_star", "evening_star", "marubozu", "harami",
                    "harami_cross", "kicking", "kicking_by_length", "tasuki_gap",
                    "gap_side_by_side_white", "counter_attack", "piercing",
                    "dark_cloud_cover", "tri_star"
                ]:
                    stoploss = round(data_pass.loc[date, 'Low'],5) if signal > 0 else round(data_pass.loc[date, 'High'],5)
                elif pattern_name in [
                    "morning_doji_star", "hammer", "inverted_hammer",
                    "thrusting", "matching_low", "three_white_soldiers",
                    "three_outside", "three_stars_in_south"
                ]:
                    stoploss = round(data_pass.loc[date, 'Low'],5)
                elif pattern_name in [
                    "evening_doji_star", "hanging_man", "shooting_star",
                    "on_neck", "in_neck", "three_black_crows",
                    "three_inside", "advance_block", "stalled_pattern"
                ]:
                    stoploss = round(data_pass.loc[date, 'High'],5)
                else:
                    stoploss = None

                new_entry = pd.DataFrame({
                    'Pattern': [pattern_name],
                    'Signal': [signal],
                    'Relevance': ['Flat'],
                    'Stoploss': [stoploss],
                }, index=[date])

                self.result_candles_history_df = pd.concat([self.result_candles_history_df, new_entry], ignore_index=False)

                self.result_candles_df = self.result_candles_df[self.result_candles_df['Pattern'] != pattern_name]
                self.result_candles_df = pd.concat([self.result_candles_df, new_entry], ignore_index=False)

        return detection

    def doji(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLDOJI, "doji")

    def dragonfly_doji(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLDRAGONFLYDOJI, "dragonfly_doji")

    def gravestone_doji(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLGRAVESTONEDOJI, "gravestone_doji")

    def engulfing(self, data: pd.DataFrame):
        return self.detect_pattern(data, talib.CDLENGULFING, "engulfing")

    def morning_star(self, data: pd.DataFrame):
        return self.detect_pattern(data, talib.CDLMORNINGSTAR, "morning_star")

    def evening_star(self, data: pd.DataFrame):
        return self.detect_pattern(data, talib.CDLEVENINGSTAR, "evening_star")

    def morning_doji_star(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLMORNINGDOJISTAR, "morning_doji_star")

    def evening_doji_star(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLEVENINGDOJISTAR, "evening_doji_star")

    def hammer(self, data: pd.DataFrame):
        return self.detect_pattern(data, talib.CDLHAMMER, "hammer")

    def inverted_hammer(self, data: pd.DataFrame):
        return self.detect_pattern(data, talib.CDLINVERTEDHAMMER, "inverted_hammer")

    def hanging_man(self, data: pd.DataFrame):
        return self.detect_pattern(data, talib.CDLHANGINGMAN, "hanging_man")

    def shooting_star(self, data: pd.DataFrame):
        return self.detect_pattern(data, talib.CDLSHOOTINGSTAR, "shooting_star")

    def marubozu(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLMARUBOZU, "marubozu")

    def harami(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLHARAMI, "harami")

    def harami_cross(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLHARAMICROSS, "harami_cross")

    def spinning_top(self, data: pd.DataFrame):
        '''
        NEED FIBONACCI
        '''
        return self.detect_pattern(data, talib.CDLSPINNINGTOP, "spinning_top")

    def kicking(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLKICKING, "kicking")

    def kicking_by_length(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLKICKINGBYLENGTH, "kicking_by_length")

    def tasuki_gap(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLTASUKIGAP, "tasuki_gap")

    def gap_side_by_side_white(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLGAPSIDESIDEWHITE, "gap_side_by_side_white")

    def counter_attack(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLCOUNTERATTACK, "counter_attack")

    def piercing(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLPIERCING, "piercing")

    def dark_cloud_cover(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLDARKCLOUDCOVER, "dark_cloud_cover")

    def tri_star(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLTRISTAR, "tri_star")

    def on_neck(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLONNECK, "on_neck")

    def in_neck(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLINNECK, "in_neck")

    def thrusting(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLTHRUSTING, "thrusting")

    def matching_low(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLMATCHINGLOW, "matching_low")

    def three_black_crows(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDL3BLACKCROWS, "three_black_crows")

    def three_white_soldiers(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDL3WHITESOLDIERS, "three_white_soldiers")

    def three_inside(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDL3INSIDE, "three_inside")

    def three_outside(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDL3OUTSIDE, "three_outside")

    def three_stars_in_south(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDL3STARSINSOUTH, "three_stars_in_south")

    def advance_block(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLADVANCEBLOCK, "advance_block")

    def stalled_pattern(self, data: pd.DataFrame):
        # takeprofit = None
        return self.detect_pattern(data, talib.CDLSTALLEDPATTERN, "stalled_pattern")

    # def abandoned_baby(self, data: pd.DataFrame):
    #     return self.detect_pattern(data, talib.CDLABANDONEDBABY, "abandoned_baby")

    # def unique_3_river(self, data: pd.DataFrame):
    #     return self.detect_pattern(data, talib.CDLUNIQUE3RIVER, "unique_3_river")

    # def belt_hold(self, data: pd.DataFrame):
    #     return self.detect_pattern(data, talib.CDLBELTHOLD, "belt_hold")

    # def separating_lines(self, data: pd.DataFrame):
    #     return self.detect_pattern(data, talib.CDLSEPARATINGLINES, "Separating Lines")

    # def upside_gap_two_crows(self, data: pd.DataFrame):
    #     return self.detect_pattern(data, talib.CDLUPSIDEGAP2CROWS, "Upside Gap Two Crows")


In [3]:
import yfinance as yf
symbol = 'SPY'
start, end = '2015-01-01', '2024-07-05'

data = yf.Ticker(symbol).history(period='1y', interval='1d', start=start, end=end)

cm = CandlesPatterns()

# Loop para detectar padrões
for candle_function in dir(cm):
    if (not candle_function.startswith("__") and 
        callable(getattr(cm, candle_function)) and 
        candle_function != "detect_pattern"):
        pattern_function = getattr(cm, candle_function)
        try:
            candle_result = pattern_function(data)
        except Exception as e:
            print(f"Error detecting pattern {candle_function}: {e}")

# Verificar padrões detectados e valores de stoploss
print("\nAll Detected Patterns with Stoploss:")
cm.result_candles_df


  self.result_candles_history_df = pd.concat([self.result_candles_history_df, new_entry], ignore_index=False)
  self.result_candles_history_df = pd.concat([self.result_candles_history_df, new_entry], ignore_index=False)
  self.result_candles_df = pd.concat([self.result_candles_df, new_entry], ignore_index=False)
  self.result_candles_df = pd.concat([self.result_candles_df, new_entry], ignore_index=False)



All Detected Patterns with Stoploss:


Unnamed: 0,Pattern,Signal,Relevance,Stoploss
2024-06-18 00:00:00-04:00,advance_block,-100,Flat,543.35483
2023-08-18 00:00:00-04:00,counter_attack,100,Flat,424.303
2022-07-22 00:00:00-04:00,dark_cloud_cover,-100,Flat,385.83253
2024-07-01 00:00:00-04:00,doji,100,Flat,539.0468
2024-07-01 00:00:00-04:00,dragonfly_doji,100,Flat,539.0468
2024-07-02 00:00:00-04:00,engulfing,100,Flat,540.16957
2023-07-20 00:00:00-04:00,evening_doji_star,-100,Flat,445.94879
2024-02-26 00:00:00-05:00,evening_star,-100,Flat,502.30675
2023-11-24 00:00:00-05:00,gap_side_by_side_white,100,Flat,447.15796
2024-03-28 00:00:00-04:00,gravestone_doji,100,Flat,517.76279


In [4]:
import talib
import pandas as pd

class TrendMetrics():
    """
    A class that encapsulates technical analysis metrics using TA-Lib.
    """

    def __init__(self):
        self.result_df = pd.DataFrame(columns=['function', 'signal'])
        self.crossover_info = pd.DataFrame(columns=['function', 'signal', 'period_low', 'period_mid', 'period_high', 'ema_low', 'ema_mid', 'ema_high'])
        self.sma_bands_info = pd.DataFrame(columns=['function', 'signal', 'period', 'std', 'lower_band', 'middle_band', 'upper_band'])
        self.rsi_info = pd.DataFrame(columns=['function', 'signal', 'period', 'upper_level', 'lower_level'])

    def get_crossover(self, data: pd.DataFrame, l1: int, l2: int, l3: int):
        """
        This function measures the crossover of 3 EMAs using TA-Lib.
        
        Parameters:
        - data: DataFrame containing the price data with a 'close' column.
        - l1, l2, l3: Periods for the 3 EMAs.
        
        Returns:
        - Updates self.crossover_signal with 'Buy', 'Sell', or 'Flat'.
        """
        period_low, period_mid, period_high = l1, l2, l3
        
        # Compute EMAs
        ema1 = talib.EMA(data['Close'], timeperiod=l1)
        ema2 = talib.EMA(data['Close'], timeperiod=l2)
        ema3 = talib.EMA(data['Close'], timeperiod=l3)

        # Current values
        ema_low, ema_mid, ema_high = ema1.iloc[-1], ema2.iloc[-1], ema3.iloc[-1]

        # Determine signal
        if ema_low > ema_mid > ema_high:
            crossover_signal = 'Buy'
        elif ema_low < ema_mid < ema_high:
            crossover_signal = 'Sell'
        else:
            crossover_signal = 'Flat'

        # Save result
        self.result_df = pd.concat([self.result_df, pd.DataFrame({
            'function': ['Crossover'],
            'signal': [crossover_signal]
        })], ignore_index=True)

        # Save result
        self.crossover_info = pd.concat([self.crossover_info, pd.DataFrame({
            'function': ['Crossover'],
            'signal': [crossover_signal],
            'period_low': [period_low],
            'period_mid': [period_mid],
            'period_high': [period_high],
            'ema1_now': [ema_low],
            'ema2_now': [ema_mid],
            'ema3_now': [ema_high],
        })], ignore_index=True)

    def get_sma_bands(self, data: pd.DataFrame, length: int=15, std_dev: int = 1):
        """
        This function calculates Bollinger Bands and detects signals based on them.
        
        Parameters:
        - data: DataFrame containing the price data with a 'close' column.
        - length: SMA period.
        - std_dev: Number of standard deviations for the bands.
        
        Returns:
        - Updates self.bbands_signal with 'Buy', 'Sell', or 'Flat'.
        """

        period, std = length, std_dev

        # Compute Bollinger Bands
        upper_band, middle_band, lower_band = talib.BBANDS(
            data['Close'], timeperiod=length, nbdevup=std_dev, nbdevdn=std_dev, matype=0
        )

        # Current values
        last_close = data['Close'].iloc[-1]
        lower_band, middle_band, upper_band = lower_band.iloc[-1], middle_band.iloc[-1], upper_band.iloc[-1]

        # Determine signal
        if last_close <= lower_band:
            bbands_signal = 'Buy'
        elif last_close >= upper_band:
            bbands_signal = 'Sell'
        else:
            bbands_signal = 'Flat'

        # Save result
        self.result_df = pd.concat([self.result_df, pd.DataFrame({
            'function': ['Bollinger_Bands'],
            'signal': [bbands_signal]
        })], ignore_index=True)
        
        self.sma_bands_info = pd.concat([self.sma_bands_info, pd.DataFrame({
            'function': ['Bollinger_Bands'],
            'signal': [bbands_signal],
            'period': [period],
            'std': [std],
            'lower_band': [lower_band],
            'middle_band': [middle_band],
            'upper_band': [upper_band]
        })], ignore_index=True)

    def get_rsi(self, data: pd.DataFrame, length: int = 25, overbought: int = 70, oversold: int = 30):
        """
        This function calculates the RSI and generates a signal based on overbought/oversold levels.
        
        Parameters:
        - data: DataFrame containing the price data with a 'close' column.
        - length: RSI period.
        - overbought: RSI overbought threshold.
        - oversold: RSI oversold threshold.
        
        Returns:
        - Updates self.rsi_signal with 'Buy', 'Sell', or 'Flat'.
        """

        period, upper_level, lower_level = length, overbought, oversold 

        # Compute RSI
        rsi = talib.RSI(data['Close'], timeperiod=length)
        rsi_now = rsi.iloc[-1]

        # Determine signal
        if rsi_now >= overbought:
            rsi_signal = 'Sell'
        elif rsi_now <= oversold:
            rsi_signal = 'Buy'
        else:
            rsi_signal = 'Flat'

        # Save result
        self.result_df = pd.concat([self.result_df, pd.DataFrame({
            'function': ['RSI'],
            'signal': [rsi_signal]
        })], ignore_index=True)

        self.rsi_info = pd.concat([self.rsi_info, pd.DataFrame({
            'function': ['RSI'],
            'signal': [rsi_signal],
            'period': [period],
            'upper_level': [upper_level],
            'lower_level': [lower_level]
        })], ignore_index=True)


In [22]:
import talib
import itertools
import numpy as np
import pandas as pd
from tqdm import tqdm
from joblib import Parallel, delayed
from datetime import datetime, timedelta

class ParamsOptimization():
    """
    A class for optimizing technical indicators' parameters and evaluating strategy performance.
    """

    def __init__(self):
        
        self.crossover_params = pd.DataFrame(columns=['Ticker', 'EMA1', 'EMA2', 'EMA3', 'Sharpe', 'MaxDrawdown', 'Expectancy'])
        self.bbands_params = pd.DataFrame(columns=['Ticker', 'Period', 'Std', 'Sharpe', 'MaxDrawdown', 'Expectancy'])

    def optimize(self, asset_type: str, symbol: str, period: str, interval: str):
        """
        Run optimization for all strategies.
        """
        data = self.fetch_data(asset_type, symbol, period, interval)
        self.crossover_results = self.optimize_crossover(data, symbol)
        self.bbands_results = self.optimize_bbands(data, symbol)


    def fetch_data(self,asset_type: str, symbol : str, period : str, interval : str):
        """
        Simulate fetching market data for the given ticker. According to same periodicity and timeframe of subject bot setting
        """

        if asset_type == 'stock':
            from backend.datasources.yahoodata import DataHistory
            dh = DataHistory()
            data = dh.get_yahoo_data_history(symbol, period, interval, start=datetime.now(), end=datetime.now() - timedelta(days=365))

        elif asset_type == 'cambial':
            pass
            # Metatrader
        elif asset_type == 'crypto':
            pass
            # crypto
        return data

    def optimize_crossover(self, data : pd.DataFrame, symbol : str):
        """
        Optimize EMA crossover strategy.
        """
        ema1_periods = range(10, 21)
        ema2_periods = range(25, 61)
        ema3_periods = range(100, 200)

        combinations = list(itertools.product(ema1_periods, ema2_periods, ema3_periods))

        results = Parallel(n_jobs=-1)(delayed(self.simulate_crossover)(
            data, symbol, l1, l2, l3) for l1, l2, l3 in tqdm(combinations, desc="Optimizing EMA Crossover"))

        results_df = pd.DataFrame(results)

        return results_df

    def simulate_crossover(self, data : pd.DataFrame, symbol : str, l1 : int, l2 : int, l3 : int):
        """
        Simulate crossover strategy and calculate metrics.
        """
        # Calculate EMAs
        data['ema1'] = talib.EMA(data['Close'], timeperiod=l1)
        data['ema2'] = talib.EMA(data['Close'], timeperiod=l2)
        data['ema3'] = talib.EMA(data['Close'], timeperiod=l3)

        # Generate signals
        data['signal'] = np.where((data['ema1'] > data['ema2']) & (data['ema2'] > data['ema3']), 1,
                                  np.where((data['ema1'] < data['ema2']) & (data['ema2'] < data['ema3']), -1, 0))
        data['returns'] = data['Close'].pct_change() * data['signal'].shift(1)

        # Calculate metrics
        sharpe = self.calculate_sharpe(data['returns'])
        max_drawdown = self.calculate_max_drawdown(data['returns'])
        expectancy = self.calculate_expectancy(data['returns'])

        return {
            'Ticker': symbol,
            'EMA1': l1,
            'EMA2': l2,
            'EMA3': l3,
            'Sharpe': sharpe,
            'MaxDrawdown': max_drawdown,
            'Expectancy': expectancy
        }

    def optimize_bbands(self, data : pd.DataFrame, symbol : str):
        """
        Optimize Bollinger Bands strategy.
        """
        sma_periods = range(10, 21)
        std_devs = range(1, 3)

        combinations = list(itertools.product(sma_periods, std_devs))

        results = Parallel(n_jobs=-1)(delayed(self.simulate_bbands)(
            data, symbol, period, std) for period, std in tqdm(combinations, desc="Optimizing Bollinger Bands"))

        results_df = pd.DataFrame(results)
       
        return results_df

    def simulate_bbands(self, data, symbol, period, std):
        """
        Simulate Bollinger Bands strategy and calculate metrics.
        """
        # Calculate Bollinger Bands
        upperband, middleband, lowerband = talib.BBANDS(
            data['Close'], timeperiod=period, nbdevup=std, nbdevdn=std, matype=0
        )

        # Generate signals
        data['signal'] = np.where(data['Close'] < lowerband, 1,
                                  np.where(data['Close'] > upperband, -1, 0))
        data['returns'] = data['Close'].pct_change() * data['signal'].shift(1)

        # Calculate metrics
        sharpe = self.calculate_sharpe(data['returns'])
        max_drawdown = self.calculate_max_drawdown(data['returns'])
        expectancy = self.calculate_expectancy(data['returns'])

        return {
            'Ticker': symbol,
            'Period': period,
            'Std': std,
            'Sharpe': sharpe,
            'MaxDrawdown': max_drawdown,
            'Expectancy': expectancy
        }

    @staticmethod
    def calculate_sharpe(returns, risk_free_rate=0.025):
        """
        Calculate Sharpe Ratio.
        """
        mean_return = returns.mean()
        std_dev = returns.std()
        if std_dev == 0:
            return 0
        return (mean_return - risk_free_rate) / std_dev

    @staticmethod
    def calculate_max_drawdown(returns):
        """
        Calculate Max Drawdown.
        """
        cumulative = (1 + returns).cumprod()
        running_max = cumulative.cummax()
        drawdown = running_max - cumulative
        return drawdown.max()

    @staticmethod
    def calculate_expectancy(returns):
        """
        Calculate Expectancy.
        """
        wins = returns[returns > 0]
        losses = returns[returns < 0]
        win_rate = len(wins) / len(returns) if len(returns) > 0 else 0
        loss_rate = 1 - win_rate
        avg_win = wins.mean() if len(wins) > 0 else 0
        avg_loss = losses.mean() if len(losses) > 0 else 0
        return (win_rate * avg_win) - (loss_rate * avg_loss)

# Adicionar Fontes Crypt e Cambial
# Adicionar resultados

In [4]:
df = data

tm = TrendMetrics()
tm.get_sma_bands(data=df, length=15, std_dev=1)
tm.get_crossover(data=df, l1=25, l2=50, l3=200)
tm.get_rsi(data=df, length=25, overbought=70, oversold=30)

tm.result_df

  self.sma_bands_info = pd.concat([self.sma_bands_info, pd.DataFrame({


Unnamed: 0,function,signal
0,Bollinger_Bands,Sell
1,Crossover,Buy
2,RSI,Flat


In [23]:
# Instanciar a classe
optimizer = ParamsOptimization()

# Testar otimização de EMA crossover
crossover_results = optimizer.optimize_crossover(data, symbol=symbol)
print("Resultados da Otimização EMA Crossover:")
print(crossover_results)

# Testar otimização de Bollinger Bands
bbands_results = optimizer.optimize_bbands(data, symbol=symbol)
print("\nResultados da Otimização Bollinger Bands:")
print(bbands_results)

Optimizing EMA Crossover: 100%|██████████| 39600/39600 [00:11<00:00, 3312.91it/s]


Resultados da Otimização EMA Crossover:
      Ticker  EMA1  EMA2  EMA3    Sharpe  MaxDrawdown  Expectancy
0        SPY    10    25   100 -2.613950     0.303667    0.006992
1        SPY    10    25   101 -2.617432     0.300424    0.006995
2        SPY    10    25   102 -2.630023     0.290087    0.006981
3        SPY    10    25   103 -2.633408     0.292406    0.006980
4        SPY    10    25   104 -2.634544     0.300940    0.006974
...      ...   ...   ...   ...       ...          ...         ...
39595    SPY    20    60   195 -2.871977     0.474086    0.006832
39596    SPY    20    60   196 -2.871949     0.477427    0.006837
39597    SPY    20    60   197 -2.872561     0.474437    0.006838
39598    SPY    20    60   198 -2.875468     0.465207    0.006835
39599    SPY    20    60   199 -2.876874     0.472461    0.006831

[39600 rows x 7 columns]


Optimizing Bollinger Bands: 100%|██████████| 22/22 [00:00<00:00, 10994.24it/s]


Resultados da Otimização Bollinger Bands:
   Ticker  Period  Std    Sharpe  MaxDrawdown  Expectancy
0     SPY      10    1 -2.962345     0.420373    0.006978
1     SPY      10    2 -6.799412     0.157983    0.008256
2     SPY      11    1 -2.860866     0.533861    0.007054
3     SPY      11    2 -6.456669     0.159455    0.007891
4     SPY      12    1 -2.809079     0.583004    0.007035
5     SPY      12    2 -6.246475     0.123052    0.008021
6     SPY      13    1 -2.814964     0.554000    0.007008
7     SPY      13    2 -5.855324     0.119517    0.007864
8     SPY      14    1 -2.840845     0.575541    0.006970
9     SPY      14    2 -5.917115     0.113973    0.007216
10    SPY      15    1 -2.854063     0.562334    0.006926
11    SPY      15    2 -5.924271     0.115390    0.007134
12    SPY      16    1 -2.840299     0.513837    0.006951
13    SPY      16    2 -5.921385     0.123770    0.007015
14    SPY      17    1 -2.838378     0.523457    0.006951
15    SPY      17    2 -5.839




### Yahoo Tables scrap

In [None]:
import re
import requests
import pandas as pd
from bs4 import BeautifulSoup

def get_yahoo_most_active(self, table_class: str = None) -> pd.DataFrame:
    """
    Extrat data from yahoo most active

    Parameters:
        classe_tabela (str): Classe CSS da tabela para extração (opcional).

    Returns:
        pd.DataFrame: DataFrame with table content.
    """
    try:
        response = requests.get("https://finance.yahoo.com/markets/stocks/most-active/")
        response.raise_for_status()

        soup = BeautifulSoup(response.text, 'html.parser')

        if table_class:
            tabela = soup.find('table', {'class': table_class})
        else:
            tabela = soup.find('table')

        headers = [th.text.strip() for th in tabela.find_all('th')]

        rows = []
        for row in tabela.find_all('tr')[1:]:
            cols = [td.text.strip() for td in row.find_all('td')]
            if cols: 
                rows.append(cols)

        df = pd.DataFrame(rows, columns=headers if headers else None)
        df["Price"] = df["Price"].str.extract(r"^([\d\.]+)")
        df["Change"] = df["Change"].str.replace("+", "", regex=False).astype(float)
        df["Change %"] = df["Change %"].str.replace("%", "", regex=False).astype(float)
        df["P/E Ratio (TTM)"] = df["P/E Ratio (TTM)"].str.replace("-", "0", regex=False).astype(float)
        df["52 Wk Change %"] = df["52 Wk Change %"].str.replace("%", "", regex=False).astype(float)
        return df

    except requests.exceptions.RequestException as e:
        print(f"Error accessing URL: {e}")
    except Exception as e:
        print(f"Error processing data: {e}")

    return pd.DataFrame()

df = get_yahoo_most_active(self)
print(df.head())  # Exibe as primeiras linhas


  Symbol                        Name     Price  Change  Change %    Volume  \
0   INTC           Intel Corporation     24.13    1.65      7.34  242.162M   
1   NVDA          NVIDIA Corporation    135.29    4.15      3.16  195.623M   
2   LCID           Lucid Group, Inc.    3.2600    0.39     13.59  143.291M   
3   BBAI   BigBear.ai Holdings, Inc.      9.78    0.04      0.41  124.756M   
4   SMCI  Super Micro Computer, Inc.     42.28    2.60      6.55  109.701M   

  Avg Vol (3M) Market Cap  P/E Ratio (TTM)  52 Wk Change % 52 Wk Range  
0      74.418M   104.483B             0.00          -48.97              
1     245.113M     3.313T            53.47           80.49              
2      86.335M     9.818B             0.00          -20.94              
3      55.417M      2.46B             0.00          332.89              
4      72.996M    24.758B            21.03          -60.48              


In [29]:
df.info()


<class 'pandas.core.frame.DataFrame'>
RangeIndex: 0 entries
Empty DataFrame


In [1]:
import requests
from bs4 import BeautifulSoup

# URL da página das ações mais ativas
url = 'https://finance.yahoo.com/markets/stocks/most-active/'

# Cabeçalhos para a requisição HTTP
headers = {
    'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/58.0.3029.110 Safari/537.3'
}

# Fazer a requisição HTTP para obter o conteúdo da página
response = requests.get(url, headers=headers)

# Verificar se a requisição foi bem-sucedida
if response.status_code == 200:
    # Analisar o conteúdo da página com o BeautifulSoup
    soup = BeautifulSoup(response.content, 'html.parser')

    # Encontrar todas as linhas da tabela de ações mais ativas
    rows = soup.find_all('tr', attrs={'class': 'simpTblRow'})

    # Iterar sobre as linhas e extrair as informações desejadas
    for row in rows:
        symbol = row.find('td', attrs={'aria-label': 'Symbol'}).text
        name = row.find('td', attrs={'aria-label': 'Name'}).text
        price = row.find('td', attrs={'aria-label': 'Last Price'}).text
        change = row.find('td', attrs={'aria-label': 'Change'}).text
        percent_change = row.find('td', attrs={'aria-label': '% Change'}).text
        volume = row.find('td', attrs={'aria-label': 'Volume'}).text

        # Exibir as informações extraídas
        print(f'Símbolo: {symbol}')
        print(f'Nome: {name}')
        print(f'Preço: {price}')
        print(f'Variação: {change}')
        print(f'Variação (%): {percent_change}')
        print(f'Volume: {volume}')
        print('-----------------------------')
else:
    print(f'Falha ao acessar a página. Status code: {response.status_code}')


## Data Collection Binance


In [5]:
import requests

url = "https://api.binance.com/api/v3/ticker/price"
params = {"symbol": "BTCUSDT"}

response = requests.get(url, params=params)

if response.status_code == 200:
    symbol = response.json()["symbol"]
    price = response.json()["price"]
else:
    error = response.status_code

In [53]:
from decimal import Decimal
from datetime import datetime


def get_crypto_symbol_24h(symbol : str):

    url = "https://api.binance.com/api/v3/ticker/24hr"
    params = {"symbol": symbol}

    response = requests.get(url, params=params)

    if response.status_code == 200:
        priceChangePercent = Decimal(response.json()["priceChangePercent"])
        weightedAvgPrice = Decimal(response.json()["weightedAvgPrice"])
        prevClosePrice = Decimal(response.json()["prevClosePrice"])
        priceChange = Decimal(response.json()["priceChange"])
        lastPrice = Decimal(response.json()["lastPrice"])
        lastQty = Decimal(response.json()["lastQty"])
        bidPrice = Decimal(response.json()["bidPrice"])
        bidQty = Decimal(response.json()["bidQty"])
        askPrice = Decimal(response.json()["askPrice"])
        askQty = Decimal(response.json()["askQty"])
        openPrice = Decimal(response.json()["openPrice"])
        highPrice = Decimal(response.json()["highPrice"])
        lowPrice = Decimal(response.json()["lowPrice"])
        volume = Decimal(response.json()["volume"])
        quoteVolume = Decimal(response.json()["quoteVolume"])
        openTime = datetime.fromtimestamp(response.json()["openTime"] / 1000).strftime("%d-%m-%Y %H:%M:%S")
        closeTime = datetime.fromtimestamp(response.json()["closeTime"] / 1000).strftime("%d-%m-%Y %H:%M:%S")
        firstId = response.json()["firstId"]
        lastId = response.json()["lastId"]
        count = response.json()["count"]
    else:
        print(f"Erro: {response.status_code} - {response.text}")

In [6]:
url = "https://api.binance.com/api/v3/openOrders"
params = {"symbol": "BTCUSDT"}

response = requests.get(url, params=params)
response.json()


{'code': -1102,
 'msg': "Mandatory parameter 'signature' was not sent, was empty/null, or malformed."}