# Spark Project - Binance Crypto Strategy 

In this project we create a strategy to buy and sell crypto

The tecnologies used are:
- Data source is Binance trough Binance API (Rest Api for Batch, and websocket for streaming)
- HDFS for Batch Storage
- Kafka for Streaming Storage
- Spark for Batch and Stream Proccesing
- MariaDB as Serving Layer

![Expanation](explanation.png)

## Batch

1) Initialize hadoop in terminal - hadoop-start.sh

### Start Spark Session

In [1]:
import findspark
findspark.init()

In [2]:
#Spark Session
import os
os.environ['PYSPARK_SUBMIT_ARGS'] =  '--packages "org.apache.spark:spark-sql-kafka-0-10_2.12:3.0.3" --jars "/usr/share/java/mariadb-java-client.jar,/opt/hive3/lib/hive-hcatalog-core-3.1.2.jar" pyspark-shell'



from pyspark.sql.session import SparkSession
from pyspark.context import SparkContext

spark = (SparkSession.builder
    .appName("Binance")
    .config("hdfs://localhost:9000/warehouse","spark.sql.warehouse.dir")
    .enableHiveSupport()
    .getOrCreate())

Ivy Default Cache set to: /home/osbdet/.ivy2/cache
The jars for the packages stored in: /home/osbdet/.ivy2/jars
:: loading settings :: url = jar:file:/opt/spark3/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1ad0ebbb-73ac-46c5-850b-eb540bc44d89;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.0.3 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.0.3 in central
	found org.apache.kafka#kafka-clients;2.4.1 in central
	found com.github.luben#zstd-jni;1.4.4-3 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.2 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.commons#commons-pool2;2.6.2 in central
:: resolution report :: resolve 1308ms :: artifacts dl 22ms
	:: modules in us

### Libraries and functions

In [3]:
#Binance
from binance.spot import Spot 

#SQL
from pyspark.sql.functions import *
from pyspark.sql import types as t
from pyspark.sql import functions as f
from pyspark.sql.types import StructType,StructField, StringType, DoubleType
from pyspark.sql.functions import lit, to_date, unix_timestamp, col,lag
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window

#MLlib
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.stat import Correlation

#Python
import pandas as pd

### Extract the history from Binance and Load it in HDFS raw layer
we will use 20 crypto coins for the project and extract the last 500 klines of 1 minute for each one

In [4]:
#Coins to use
coins = ["BTCUSDT", "ETHUSDT", 'XRPUSDT', 'ADAUSDT', 'MATICUSDT', 
         'BNBUSDT', 'VETUSDT', 'LINKUSDT', 'YFIUSDT', 'UMAUSDT', 
       'WAXPUSDT', 'ICXUSDT', 'ZRXUSDT', 'SUSHIUSDT', 'BETAUSDT', 
        'REPUSDT', 'VTHOUSDT', 'XVGUSDT', 'VOXELUSDT', 'CLVUSDT']


client = Spot()


# Insert in HDFS a folder for each coin and save files there

for coin in coins:
        a = client.klines(coin, "1m", limit=500)
        df = spark.createDataFrame(a)
        df.write.mode('overwrite').csv(f"hdfs://localhost:9000/datalake/raw/binance/{coin}/")

                                                                                

### Read, Transform and Load files in HDFS standard layer

In [5]:
#wanted Schema of the dataframe

schema = StructType([
  StructField('open_time', StringType(), True),
  StructField('open', DoubleType(), True),
  StructField('high', DoubleType(), True),
  StructField('low', DoubleType(), True),
  StructField('close', DoubleType(), True),
  StructField('volume', DoubleType(), True),
  StructField('close_time', StringType(), True),
  StructField('crypto', StringType(), True)
  ])

df2 = spark.createDataFrame([], schema)

In [6]:
#Read raw files in hdfs and organize them, put correct type and header
for coin in coins:
    df_coin = spark.read.option("inferSchema" , "true").csv(f"hdfs://localhost:9000/datalake/raw/binance/{coin}/")

    df2 =  df2.union(
            df_coin.select(df_coin.schema.names[:7])\
            .withColumnRenamed(df_coin.schema.names[0],"open_time")\
            .withColumnRenamed(df_coin.schema.names[1],"open")\
            .withColumnRenamed(df_coin.schema.names[2],"high")\
            .withColumnRenamed(df_coin.schema.names[3],"low")\
            .withColumnRenamed(df_coin.schema.names[4],"close")\
            .withColumnRenamed(df_coin.schema.names[5],"volume")\
            .withColumnRenamed(df_coin.schema.names[6],"close_time")\
            .withColumn("open_time", col("open_time")/1000)\
            .withColumn("close_time", col("close_time")/1000)\
            .withColumn("open_time",f.date_format(col('open_time').cast(dataType=t.TimestampType()), "yyyy-MM-dd-hh:mm"))\
            .withColumn("close_time",f.date_format(col('close_time').cast(dataType=t.TimestampType()), "yyyy-MM-dd-hh:mm"))\
            .withColumn('crypto',lit(coin)))
    
#Save it in the std layer in HDFS
df2.write.mode('overwrite').option('header','true').csv(f"hdfs://localhost:9000/datalake/std/binance/")

                                                                                

### Spark Batch Proccesing

The idea is to generate the strategy to buy and sell. we will try to find correlations between the lags of bitcoin vs other coins, and use it to earn money

In [7]:
#Read the file from HDFS
df = spark.read.option("inferSchema" , "true").option('header','true').csv(f"hdfs://localhost:9000/datalake/std/binance/")

                                                                                

In [8]:
#create a dataframe that has as columns all the cryptos including the lags of bitcoing
w = Window().partitionBy().orderBy(col("close_time"))

df_org = (df.withColumn('crypto', regexp_replace('crypto', 'USDT', '')) #eliminate USDT
            .select('close_time','close','crypto')                  #Selecting 3 rows
            .withColumn('close_time',to_timestamp(col("close_time"),"yyyy-MM-dd-hh:mm")) #Timestamp
            .groupBy("close_time").pivot('crypto').sum("close")  #Pivot bitcoins
            .sort(col("close_time"))
            .withColumn("BTC_1", lag("BTC",1).over(w)).na.drop() #Lags of bitcoins
            .withColumn("BTC_2", lag("BTC",2).over(w)).na.drop()
            .withColumn("BTC_3", lag("BTC",3).over(w)).na.drop()
            .withColumn("BTC_4", lag("BTC",4).over(w)).na.drop()
            .withColumn("BTC_5", lag("BTC",5).over(w)).na.drop())

df_org.limit(3).toPandas()

                                                                                

Unnamed: 0,close_time,ADA,BETA,BNB,BTC,CLV,ETH,ICX,LINK,MATIC,...,WAXP,XRP,XVG,YFI,ZRX,BTC_1,BTC_2,BTC_3,BTC_4,BTC_5
0,2022-03-22 00:15:00,0.951,0.43819,407.5,42913.67,0.349,3024.41,0.788,15.76,1.522,...,0.2908,0.8492,0.01038,20738.74,0.5733,42917.85,42942.83,42938.58,42974.93,42974.0
1,2022-03-22 00:16:00,0.954,0.43754,407.2,42898.91,0.349,3023.06,0.789,15.76,1.522,...,0.2907,0.8499,0.01038,20739.91,0.5732,42913.67,42917.85,42942.83,42938.58,42974.93
2,2022-03-22 00:17:00,0.954,0.43792,407.2,42897.22,0.349,3023.35,0.789,15.78,1.523,...,0.2908,0.8502,0.01039,20738.76,0.5734,42898.91,42913.67,42917.85,42942.83,42938.58


### Spark ML correlation Matrix
Finding the most correlated cryptos with the lags of bitcoin

In [9]:
#Vectorize the data rame in order to put it inside the corr function
assembler = VectorAssembler(
  inputCols = df_org.columns[1:]
  , outputCol = "features"
)

assembled = assembler.transform(df_org)
pearson_corr = Correlation.corr(assembled, "features")
corr_list = pearson_corr.head()[0].toArray().tolist()
pearson_corr_df = spark.createDataFrame(corr_list,df_org.columns[1:])

                                                                                

In [10]:
#Creating a column with the names of the cryptos
crypto = df_org.columns[1:]
index = spark.createDataFrame([(l,) for l in crypto], ['crypto'])

pearson_corr_df = pearson_corr_df.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
index = index.withColumn("row_idx", row_number().over(Window.orderBy(monotonically_increasing_id())))
final_df = pearson_corr_df.join(index, pearson_corr_df.row_idx == index.row_idx).\
             drop("row_idx")


### Deciding on most correlated cryptos

In [11]:
# Because the generated matrix will be always small data  25 rows by 26 columns (number of coins) we will use Pandas to continue
pandas_df = final_df.toPandas()
pandas_df

Unnamed: 0,ADA,BETA,BNB,BTC,CLV,ETH,ICX,LINK,MATIC,REP,...,XRP,XVG,YFI,ZRX,BTC_1,BTC_2,BTC_3,BTC_4,BTC_5,crypto
0,1.0,0.415743,0.864081,0.596729,0.316532,0.086048,0.603857,0.052274,0.365264,0.453185,...,-0.310661,0.533595,-0.62172,0.534791,0.595978,0.596349,0.597303,0.597185,0.596119,ADA
1,0.415743,1.0,0.553445,0.603352,0.272292,0.500315,0.58751,0.313811,0.594902,0.489707,...,0.286921,0.210468,0.130167,0.65752,0.603827,0.601489,0.596058,0.591145,0.586713,BETA
2,0.864081,0.553445,1.0,0.760948,0.173926,0.417108,0.745548,0.42613,0.69339,0.587398,...,0.073704,0.617785,-0.322605,0.711539,0.753971,0.74597,0.739752,0.738385,0.736212,BNB
3,0.596729,0.603352,0.760948,1.0,0.330894,0.456905,0.860314,0.676038,0.677118,0.874783,...,0.116008,0.787796,-0.204802,0.920292,0.984527,0.968588,0.953463,0.940664,0.927085,BTC
4,0.316532,0.272292,0.173926,0.330894,1.0,0.000286,0.495775,-0.12577,0.12066,0.428936,...,-0.161601,0.139575,-0.24331,0.415293,0.330337,0.324502,0.321641,0.317587,0.317771,CLV
5,0.086048,0.500315,0.417108,0.456905,0.000286,1.0,0.378697,0.651143,0.788016,0.284049,...,0.45262,0.225176,0.508673,0.50059,0.429427,0.401742,0.376917,0.356309,0.333232,ETH
6,0.603857,0.58751,0.745548,0.860314,0.495775,0.378697,1.0,0.51739,0.698875,0.816087,...,0.2116,0.575163,-0.201729,0.871654,0.853725,0.84401,0.835185,0.826195,0.814804,ICX
7,0.052274,0.313811,0.42613,0.676038,-0.12577,0.651143,0.51739,1.0,0.693621,0.535452,...,0.476033,0.613271,0.347842,0.637796,0.658688,0.638614,0.621031,0.604759,0.587701,LINK
8,0.365264,0.594902,0.69339,0.677118,0.12066,0.788016,0.698875,0.693621,1.0,0.553364,...,0.569962,0.371194,0.263174,0.724588,0.657849,0.638831,0.619078,0.602582,0.583476,MATIC
9,0.453185,0.489707,0.587398,0.874783,0.428936,0.284049,0.816087,0.535452,0.553364,1.0,...,0.143097,0.707079,-0.212858,0.795113,0.873541,0.866483,0.858649,0.852879,0.847302,REP


In [12]:
# Melt the Matrix to find the most correlated pair against the bitcoin lags
vals =pandas_df.melt(id_vars = 'crypto',var_name='variable')
vals = vals[~vals['crypto'].str.contains('[0-9]',regex=True)]
vals = vals[vals['variable'].str.contains('[0-9]',regex=True)]
vals = vals[~vals['crypto'].str.contains('BTC',regex=True)]
vals = vals[vals['variable'].str.contains('BTC',regex=True)]

# We will focus on the top 20 cryptos with highest correlations to BTC-lags 
corr_top_20 = vals[(vals['value']<1) & (vals['value']>0.75)].sort_values('value',ascending=False).head(20)

### Look with strategy yields better resuls in the past 100 minutes

In [13]:
# Create list with unique cryptos from top 20 to loop through later
corr_pairs = list(corr_top_20["crypto"].unique())

# Defining our lags
BTC_lags = [1,2,3,4,5]

# Creating empty dfs to fill with returns for top 5 cryptos based on price correlation 
first_df = pd.DataFrame({})
second_df = pd.DataFrame({})
third_df = pd.DataFrame({})
fourth_df = pd.DataFrame({})
fifth_df = pd.DataFrame({})

In [14]:
# creating df needed for training - using limited small data range from our spark df_org
train_data = df_org.tail(100) #Because we only using last 100 rows with are going to pass to pandas
train_data = pd.DataFrame(train_data, columns = df_org.columns)
train_data = train_data.reset_index()

# this function takes crypto and lag and will fill the empty dfs with returns info
def lag_strategy_returns(crypto, lag, returns_df):
    buy_price = []
    sell_price = []
    gain_loss = []

    for index, row in train_data.iterrows():
        if index == len(train_data)-lag:
            break
            
        elif train_data.loc[index, "BTC"] > train_data.loc[index,"BTC"+"_"+str(lag)]  * 1:
            buy_price.append(train_data.loc[index, crypto])
            sell_price.append(train_data.loc[index+lag, crypto])
            
        else:
            pass
    
    zip_object = zip(buy_price, sell_price)
    for buy_price_i, sell_price_i in zip_object:
        gain_loss.append(buy_price_i-sell_price_i)
        
    gain_df = pd.DataFrame(
    {'buy_price': buy_price,
     'sell_price': sell_price,
     'gain/loss': gain_loss,
    })
    
    value =  [gain_df["gain/loss"].sum()]
    
    returns_df["BTC"+"_"+str(lag)] = value

                                                                                

In [15]:
# looping through top cryptos for correlation and filling top 5 returns dfs
for crypto in corr_pairs: 
    for lag in BTC_lags:
        if crypto == corr_pairs[0]:
            lag_strategy_returns(crypto, lag, first_df)
            first_df.rename(index={0:crypto}, inplace = True)
        elif crypto == corr_pairs[1]:
            lag_strategy_returns(crypto, lag, second_df)
            second_df.rename(index={0:crypto}, inplace = True)
        elif crypto == corr_pairs[2]:
            lag_strategy_returns(crypto, lag, third_df)
            third_df.rename(index={0:crypto}, inplace = True)
        elif crypto == corr_pairs[3]:
            lag_strategy_returns(crypto, lag, fourth_df)
            fourth_df.rename(index={0:crypto}, inplace = True)
        elif crypto == corr_pairs[4]:
            lag_strategy_returns(crypto, lag, fifth_df)
            fifth_df.rename(index={0:crypto}, inplace = True)

In [16]:
# putting everything together to get an overview of which strategy is most successful
master_returns = pd.concat([first_df, second_df, third_df, fourth_df, fifth_df])

master_returns

Unnamed: 0,BTC_1,BTC_2,BTC_3,BTC_4,BTC_5
VET,8e-05,-0.00054,-0.00058,-0.00187,-0.00238
ZRX,-0.0017,-0.0069,-0.0121,-0.026,-0.028
REP,-0.05,-0.24,-0.24,-0.31,-0.33
ICX,-0.002,-0.013,-0.016,-0.02,-0.02
WAXP,-0.0007,-0.0028,-0.004,-0.0029,-0.0033


### Strategy to use

In [17]:
# allowing output for decision-making - which crypto to buy and how long to hold
lag_to_sell = (master_returns.max().idxmax()).replace("BTC_","")
lag_val = master_returns.max().idxmax()
crypto_to_buy = master_returns[lag_val].idxmax()
trend_symbol = ['BTCUSDT']
lag_to_sell = int(lag_to_sell)
buy_symbol = crypto_to_buy+"USDT"

print(f"when BTC grows > 0.025% in any minute, We buy: {crypto_to_buy} and sell it after: {lag_to_sell} minutes")

when BTC grows > 0.025% in any minute, We buy: VET and sell it after: 1 minutes


## Stream

1) Initialize kafka in terminal

    - sudo service kafka start

2) call in kafka the producer:

    python3 binancestreaming_producer-project.py kline_1m "btcusdt,ethusdt,xrpusdt,adausdt,maticusdt,bnbusdt,vetusdt,linkusdt,yfiusdt,umausdt,waxpusdt,icxusdt,zrxusdt,sushiusdt,betausdt,repusdt,vthousdt,xvgusdt,voxelusdt,clvusdt" -b localhost:9092 -t klines
    


    

### Read Streams from Kafka

In [18]:
klines = (spark.readStream
                .format("kafka") 
                .option("kafka.bootstrap.servers", "localhost:9092") 
                .option("subscribe", "klines") 
                .option("startingOffsets", "latest") 
                .option("kafka.group.id", "IE") 
                .load())

### Process in two streams datasets

One detecting the trend in a crypto, and one selecting the crypto to buy and sell

In [19]:
#Read the files for Organizing

"""TrenSetter: createing a stream dataframe that looks for the performamnce of BTC in each k_line and apllys the strategy 
    to use. for every minute it says if buy = 'yes' or buy= 'No' """


trend_setter = (klines.selectExpr("CAST(value AS STRING)") 
                .select(split("value",'\|').alias("fields")) 
                .withColumn("trend_detection_time",col("fields").getItem(0).cast(DoubleType())) 
                .withColumn("trend_symbol",col("fields").getItem(1)) 
                .withColumn("open",col("fields").getItem(3).cast(DoubleType())) 
                .withColumn("close",col("fields").getItem(4).cast(DoubleType())) 
                .withColumn("trend_detection_time", col("trend_detection_time")/1000)
                .withColumn("operation_time", col("trend_detection_time")+60*lag_to_sell)
                .withColumn("operation_time",f.date_format(col('operation_time').cast(dataType=t.TimestampType()), "yyyy-MM-dd-hh:mm"))
                .withColumn("trend_detection_time",f.date_format(col('trend_detection_time').cast(dataType=t.TimestampType()), "yyyy-MM-dd-hh:mm"))
                .withColumn('operation_time',to_timestamp(col("operation_time"),"yyyy-MM-dd-hh:mm"))
                .withColumn('trend_detection_time',to_timestamp(col("trend_detection_time"),"yyyy-MM-dd-hh:mm"))
                .drop("fields")
                .filter(col("trend_symbol").isin(trend_symbol))
                .withWatermark("trend_detection_time", "20 minutes") #Do not accept information that has a 20 minute delay
                .dropDuplicates(subset=['operation_time']) #Stream Deduplication
                .withColumn("trend_symbol_gain", col("close")/col("open")-1)
                .withColumn("buy", when(col("trend_symbol_gain") >= 0,"Yes").otherwise("No"))
                .select("trend_symbol","trend_detection_time","operation_time","trend_symbol_gain","buy"))

"""money_earner: createing a stream dataframe that contains the gains for each minute (close - open) for the coin
                selected in the strategy"""

money_earner = (klines.selectExpr("CAST(value AS STRING)") 
                .select(split("value",'\|').alias("fields")) 
                .withColumn("operation_time",col("fields").getItem(0).cast(DoubleType())) 
                .withColumn("buy_symbol",col("fields").getItem(1)) 
                .withColumn("open",col("fields").getItem(3).cast(DoubleType())) 
                .withColumn("close",col("fields").getItem(4).cast(DoubleType())) 
                .withColumn("operation_time", col("operation_time")/1000)
                .withColumn("operation_time",f.date_format(col('operation_time').cast(dataType=t.TimestampType()), "yyyy-MM-dd-hh:mm"))
                .withColumn('operation_time',to_timestamp(col("operation_time"),"yyyy-MM-dd-hh:mm"))
                .drop("fields")
                .filter(col("buy_symbol")== buy_symbol)
                .withWatermark("operation_time", "20 minutes") #Do not accept information that has a 20 minute delay
                .dropDuplicates(subset=['operation_time']) #Stream Deduplication
                .withColumn("money_gain", col("close")-col("open"))) 


### Join streams dataframes

In [20]:
#Stream Dataframe with information of the gains when the strategy was executed
#We buy in trend_detection time and sell in the operation time

account = money_earner.join(trend_setter, "operation_time")   

## Serving

### Load the stream dataframe in MariaDB
We will query the table to know the results of the strategy

1) Initialize MariaDB in terminal 
    
        - sudo service mariadb start

2) Create the table storing the stream dataframe: 

        - mariadb -u osbdet -p < binance_gains.sql
   

In [21]:

def foreach_batch_function(df, epoch_id):
    print ("Batch %d received" % epoch_id)
    
    # databases connection properties
    url = "jdbc:mariadb://localhost:3306/binance"
    table = "gains"
    mode = "append"
    props = {"user": "osbdet",
             "password":"osbdet123$", 
             "driver":"org.mariadb.jdbc.Driver"}
    (df.select("trend_symbol",
               "trend_detection_time",
               "trend_symbol_gain",
               "buy",
               "buy_symbol",  
               "operation_time", 
               "open", 
               "close", 
               "money_gain")
        .write
        .jdbc(url,table,mode,props)
     )

In [22]:
query = (account.writeStream.foreachBatch(lambda df,epochId:foreach_batch_function(df, epochId))).trigger(processingTime='60 seconds').start()

Batch 0 received


                                                                                

Batch 1 received


                                                                                

Batch 2 received


                                                                                

Batch 3 received


                                                                                

Batch 4 received


                                                                                

Batch 5 received


                                                                                

Batch 6 received


                                                                                

Batch 7 received


                                                                                

Batch 8 received


                                                                                

Batch 9 received


                                                                                

Batch 10 received


                                                                                

Batch 11 received


                                                                                

Batch 12 received


                                                                                

Batch 13 received


                                                                                

3) to see the results of the strategy put in other terminal: 

        - mariadb -u osbdet -p
        - use binance
        - SELECT * FROM gains;