In [1]:
from pyspark.sql import SparkSession, SQLContext
import pyspark.sql.functions as func

import datetime

from backtesting import Backtest, Strategy
from helpers.signals import get_supertrend

import pandas as pd
import numpy as np



In [2]:
spark = (
    SparkSession
    .Builder()
    .appName('trading_signals')
    .getOrCreate()
    )
sqlContext = SQLContext(spark)

In [53]:
(
    spark.read.parquet('./datasets/*')
    .filter(func.col('Timestamp') < datetime.date.today())
    .groupBy('Symbol')
    .agg(func.min('Timestamp'))
).toPandas().to_clipboard()

In [65]:
pdf = (
    spark.read.parquet('./datasets/*')
    .filter(func.col('Timestamp') < datetime.date.today())
    #.filter(func.col('timestamp') >= datetime.date(2017, 1, 1))
    .filter(func.col('Symbol').like('%EUR'))
    .toPandas()
    .sort_values(['Symbol','Timestamp'])
)

In [69]:
st_df = pdf.groupby('Symbol').apply(get_supertrend, period = 10, multiplier = 3)

 # Plotly express

In [72]:
import plotly.io as pio
import plotly.express as px

pio.renderers.default = 'notebook_connected'


def plot_supertrend(df):
    _df = df.copy()

    mask = _df['In_uptrend'] == 1
    _df.loc[mask, 'green_st'] = _df.loc[mask, 'Supertrend']
    _df.loc[~mask, 'red_st'] = _df.loc[~mask, 'Supertrend']
    
    fig = px.line(_df, x ='Timestamp', y = ['Close', 'red_st', 'green_st'], facet_col = 'Symbol', facet_col_wrap=4, height = 800)
    
    fig.update_yaxes(matches=None)

    return fig
    
plot_supertrend(st_df)

# Back Test

In [82]:
def get_backtesting_df(df_in, symbol):
    
    bt_df = df_in[df_in['Symbol'] == symbol].copy()
    bt_df = bt_df.dropna(subset = ['Supertrend_Signal'])
    bt_df = bt_df.sort_values('Timestamp')
    bt_df['Timestamp'] = pd.to_datetime(bt_df['Timestamp'])
    bt_df = bt_df.set_index('Timestamp')

    return bt_df

def get_signal(pdf):

    signal = pdf['Supertrend_Signal']    
    
    return signal

class Supertrend(Strategy):

    def init(self):
        
        super().init()
        self.signal = self.I(get_signal, self.data, name = 'supertrend')
    
    def next(self):
        if self.signal == 1:
            self.position.close()
            self.buy()
        if self.signal == -1:
            self.position.close()

In [116]:
returns = {}
for s in pdf['Symbol'].unique():
    bt = Backtest(get_backtesting_df(st_df , s), Supertrend, commission=.002, cash=100_000)
    res = bt.run()
    ret = res['Return [%]']
    returns[s] = ret
    print(f'{s}: {ret}')

ADAEUR: 9.500718070481526
AVAXEUR: 199.96275554019923
BCHEUR: -27.17428656359864
BNBEUR: 2296.05146642177
BTCEUR: 562.7060189296874
BTTEUR: -28.05163622334428
CHZEUR: -10.650282114511779
DOGEEUR: -48.067542881448915
DOTEUR: -7.593863227922455
EGLDEUR: 294.01122785398866
ENJEUR: 9.687731809595338
EOSEUR: -45.89107476352597
ETCEUR: -12.83408087996676
ETHEUR: 1156.9798807401123
GRTEUR: -32.34966636430871
HOTEUR: -5.1581367053909055
ICPEUR: -47.38066501016998
LAZIOEUR: -37.86203217170907
LINKEUR: -15.436168781494096
LTCEUR: -29.393914264862065
LUNAEUR: 557.2149403599491
MATICEUR: 41.770640348061455
PORTOEUR: 0.0
RUNEEUR: 38.88627279771041
SHIBEUR: 283.83630765840076
SOLEUR: 522.2817420149919
SXPEUR: -47.976980866455065
THETAEUR: -8.697396276403408
TRXEUR: 15.379093793265508
UNIEUR: -24.67929937596894
VETEUR: 46.79045568567747
WINEUR: 44.99605980946141
WRXEUR: -36.14694690084658
XLMEUR: -23.79260779101642
XRPEUR: 373.4251136205133
YFIEUR: -26.77276653125003


In [129]:
# best strategies
ret_df = (
    pd.DataFrame.from_dict(returns, orient = 'index', columns = ['return'])
    .sort_values('return', ascending = False)
    .reset_index()
    .rename(columns={'index' : 'Symbol'})
)

In [136]:
# market makers
def get_market_makers(df_in):

    df = df_in.copy()

    df['daily_traded_value'] = df['Volume'] * df['Close']
    mask = df['Timestamp'].dt.year == 2022
    df = df[mask]
    df = df.groupby(['Symbol'], as_index = False)['daily_traded_value'].mean()
    
    return df


In [140]:
portfolio_candidates = (
    get_market_makers(pdf)
    .merge(ret_df, on = 'Symbol')
    .sort_values('daily_traded_value', ascending=False)
)
portfolio_candidates

Unnamed: 0,Symbol,daily_traded_value,return
4,BTCEUR,73999730.0,562.706019
13,ETHEUR,67440740.0,1156.979881
20,LUNAEUR,40564360.0,557.21494
24,SHIBEUR,7848618.0,283.836308
0,ADAEUR,7696245.0,9.500718
3,BNBEUR,6138845.0,2296.051466
25,SOLEUR,5081554.0,522.281742
26,SXPEUR,4007118.0,-47.976981
34,XRPEUR,3198530.0,373.425114
8,DOTEUR,2678995.0,-7.593863


In [142]:
portfolio_candidates.to_clipboard(index=False)