# Cryptocurrency Analysis & Trading Bot

Note: Clusters 8.3 ML were used in a Databricks Community Edition environment.

In [None]:
from requests import Request, Session
import json

import pyspark.sql.functions as f
from pyspark.sql import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

import datetime
import time
import pprint

!/databricks/python3/bin/python -m pip install --upgrade pip
!pip install yfinance
import yfinance as yf

In [None]:
# Select the conversion currencies available
CONVERSION_CURRENCIES = ['USD', 'EUR', 'GBP', 'JPY', 'AUD']
CONVERSION_CURRENCIES_SYM = {'USD': 'US$', 'EUR': '€', 'GBP': '£', 'JPY': '¥', 'AUD': 'A$'} 

dbutils.widgets.dropdown('Select the conversion currency', 'USD', CONVERSION_CURRENCIES)
SEL_CONV_CURR = dbutils.widgets.get('Select the conversion currency')


# Select the coins/tokens to work with
COINS = ['BTC', 'ETH', 'BNB', 'USDT', 'ADA', 'SOL', 'XRP', 'DOT', 'SHIB', 'DOGE']
# We are here working with the top 10 market cap coins (as of November 1st, 2021) for demonstration purposes

# 1. Data Gathering

## 1.1. Historical Quotes

In [None]:
def yfinance_tickerlist(coins = COINS, conversion_currency = SEL_CONV_CURR):
    '''yfinance requires that we assemble a string with all coins for which we want to
    call its data. This function grabs the list of coins and produces the string
    '''

    string = ''

    for coin in coins:
        # Solana (SOL) and Polkadot (DOT) are represented in Yahoo Finance as SOL1 and DOT1, respectively
        if coin in ['SOL', 'DOT']:
            string += f'{coin}1-{conversion_currency} '
        else:
            string += f'{coin}-{conversion_currency} '

    return string.strip()


def new_price_coin_columns(coins = yfinance_tickerlist().split(' ')):
    '''yfinance requires that we assemble a string with all coins for which we want to
    call its data. This function grabs the list of coins and produces the string
    '''

    coins = [coin[:-4] for coin in coins]
    coins.sort()
    nr_coins = len(coins)

    price_categories = ['AdjClose', 'Close', 'High', 'Low', 'Open', 'Volume']

    composite_columns = []
    for price_category in price_categories:
        for coin in coins:
            composite_column = f'{price_category}99{coin}' # the separator can not be a special character at the stacking stage below, hence the number (99)
            composite_columns.append(composite_column)

    return composite_columns


def stacking_string(coins = COINS, out_columns_name = "(Coin, CoinData)"):
    '''Due to the format in which spark dataframes receive the dataframe downloaded from yfinance, we need to unpivot
    as many columns as there are coins. This function produces the string used in sql code to do the unpivotting
    '''

    nr_coins = len(coins)

    string_aux = ''
    for coin in coins:
        string_aux += f"'{coin}', {coin}, "

    string_aux = string_aux[:-2]

    string = f"stack({str(nr_coins)}, {string_aux}) as {out_columns_name}"
    return string

In [None]:
# Yahoo Finance call returns a Pandas df - let's create a pyspark dataframe from it right away, handle it with Spark and then register it as a temp table 
df_historicalquotes = spark.createDataFrame(yf.download(tickers = yfinance_tickerlist(), period = '10y', interval = '1d').reset_index()) \
    .toDF('Date', *new_price_coin_columns())

# We will use the numerical '99' separator since, apparently, special characters are not allowed
df_historicalquotes = df_historicalquotes \
    .select('Date', *[f.col(c).cast("float").alias(c) for c in df_historicalquotes.columns[1:]]) \
    .selectExpr('Date', stacking_string(coins = new_price_coin_columns(),
                                     out_columns_name = "(Category, Value)")) \
    .withColumn('PriceCategory', f.split(f.col('Category'), '99').getItem(0)) \
    .withColumn('Coin', f.split(f.col('Category'), '99').getItem(1)) \
    .drop('Category') \
    .groupBy('Coin', 'Date') \
    .pivot('PriceCategory') \
    .agg(f.first('Value')) \
    .withColumn('ConversionCurrency', f.lit(SEL_CONV_CURR)) \
    .where((f.col('Volume').isNotNull()) & (f.col('ConversionCurrency') == SEL_CONV_CURR)) \
    .orderBy(f.col('Date').asc(), f.col('Coin').desc())
# We chose to agregate by 'first' value but any agregation function would work out here, since there's only value for each combination of Coin, Date and Price Category

df_historicalquotes.createOrReplaceTempView("HistoricalQuotes")

## 1.2. Latest Quotes - Streaming

A solution for simulating a streaming scenario is to call CoinMarketData 'LatestQuotes' endpoint every 260 seconds and get a json file with this information for the set of coin we are using. Every json file is stored in a folder in the Databricks FileSystem (dbfs) which is then read by a streaming dataframe to append each new block of data to the previous calls.

If we were using the paid version of Databricks, we could schedule a job to call the API every 260 seconds, but since Jobs are not available in Databricks Community Edition, a workaround is to have another notebook "indefinitely" running a while loop to call the API every 260 seconds. This is not very elegant though, so we'll leave this piece of code in the end of this very notebook, thus allowing all other cells to run before it and run for as long as one wants it to run, providing new datapoints to our streaming dataframe.

#### 1.2.1. API Requests preparation

In [None]:
# API Request config - latest quotes endpoint
URL_ENDPOINT = 'https://pro-api.coinmarketcap.com/v1/cryptocurrency/quotes/latest'
REQUEST_PARAMETERS = {'symbol': ','.join(COINS),
                      'convert': SEL_CONV_CURR}
SESSION_HEADERS = {'Accepts': 'application/json',
                   'X-CMC_PRO_API_KEY': configfile['api_key']}


def cmc_api_request(endpoint = URL_ENDPOINT, parameters = REQUEST_PARAMETERS, headers = SESSION_HEADERS):
    '''Function to call CoinMarketCap API and return a json file
    with the latest quotes from a list of input crypto coins.
    '''

    session = Session()
    session.headers.update(headers)

    response = session.get(endpoint, params = parameters)
    resp_dict = json.loads(response.text)

    return resp_dict


def parsing_response_json():
    '''Parsing the json to be read directly by the streaming
    dataframe, with field selection.
    '''

    resp_dict = cmc_api_request()

    # Saving the status to append later
    status = resp_dict['status']

    # Bringing quote keys to the outer structure
    for key in resp_dict['data'].keys():
        for quote_key in resp_dict['data'][key]['quote'][SEL_CONV_CURR].keys():
            resp_dict['data'][key][quote_key] = resp_dict['data'][key]['quote'][SEL_CONV_CURR][quote_key]

        #Removing the original quote key + unnecessary fields + dimensional fields, which will be stored in the metadata dataframe
        for unnecessary_column in ['quote', 'is_fiat', 'is_active', 'platform', 'date_added', 'name', 'num_market_pairs', 'slug', 'tags', 'max_supply']:
            resp_dict['data'][key].pop(unnecessary_column)

        #Converting some fields to float to ensure schema consistency
        for inconsistent_field in ['circulating_supply', 'total_supply']:
            resp_dict['data'][key][inconsistent_field] = float(resp_dict['data'][key][inconsistent_field])

    resp_dict = resp_dict['data']
    resp_dict['status'] = status

    return resp_dict
  


# Defining the schema from the first call (it's immutable, according to the API Documentation)
rdd = spark.sparkContext.parallelize([json.dumps(parsing_response_json())])
json_df = spark.read.json(rdd)
json_schema = json_df.schema

#### 1.2.2. StreamingDF

In [None]:
path_latestquotes = '/FileStore/Dados_Input_ABD/LatestQuotes/'
path_streamingcheckpoint = '/tmp/Dados_Input_ABD/StreamingCheckpoint'
path_streamingoutput = '/FileStore/Dados_Input_ABD/StreamingOutput'

dbutils.fs.mkdirs(path_latestquotes)
dbutils.fs.mkdirs(path_streamingcheckpoint)
dbutils.fs.mkdirs(path_streamingoutput)


streamingDF = spark.readStream \
    .schema(json_schema) \
    .json(path = path_latestquotes) \
    .selectExpr(stacking_string(), "status") \
    .withColumn("CoinID", f.col('CoinData').getItem('id')) \
    .withColumn("Timestamp", f.col('status').getItem('timestamp')) \
    .withColumn("Price", f.col('CoinData').getItem("price")) \
    .withColumn("ConversionCurrency", f.lit(SEL_CONV_CURR)) \
    .withColumn("TotalSupply", f.col('CoinData').getItem("price")) \
    .withColumn("MarketCap", f.col('CoinData').getItem("market_cap")) \
    .withColumn("FullyDilutedMarketCap", f.col('CoinData').getItem("fully_diluted_market_cap")) \
    .withColumn("MarketCapRank", f.col('CoinData').getItem("cmc_rank")) \
    .withColumn("MarketCapDominance", f.col('CoinData').getItem("market_cap_dominance")) \
    .withColumn("PriceChange1h", f.col('CoinData').getItem("percent_change_1h")) \
    .withColumn("PriceChange24h", f.col('CoinData').getItem("percent_change_24h")) \
    .withColumn("PriceChange7d", f.col('CoinData').getItem("percent_change_7d")) \
    .withColumn("PriceChange30d", f.col('CoinData').getItem("percent_change_30d")) \
    .withColumn("PriceChange60d", f.col('CoinData').getItem("percent_change_60d")) \
    .withColumn("PriceChange90d", f.col('CoinData').getItem("percent_change_90d")) \
    .withColumn("VolumeTraded24h", f.col('CoinData').getItem("volume_24h")) \
    .withColumn("VolumeTradedChange24h", f.col('CoinData').getItem("volume_change_24h")) \
    .drop('CoinData', 'status') \
    .filter(f.col('ConversionCurrency') == SEL_CONV_CURR)

display(streamingDF)

In [None]:
streamingDF.writeStream \
    .format('parquet') \
    .option('checkpointLocation', path_streamingcheckpoint) \
    .option('path', path_streamingoutput) \
    .outputMode('append') \
    .trigger(processingTime = '1 second') \
    .start()

# Since previous data is not overwritten, we do not need a 'Complete' output mode - 'Append' is faster and enough in this case 

time.sleep(2)

static_streamingDF = spark.read.parquet(path_streamingoutput)
static_streamingDF.display()

**Quick Insight:** Bitcoin represents more than 41% of the whole crypto market! That's insane!

## 1.3. Coin Metadata

Let's now build a dataframe with metadata regarding the selected coins. For that, we will call again CoinMarketCap and Yahoo Finance API's:
* Yahoo Finance API provides each coin description;
* CoinMarketCap API provides the rest of the metadata we want, including url's for the coin website and social media.
After merging both tables on the coin's symbols, we are left with a full detailed dataset on each coin.

#### 1.3.1. yfinance for descriptions
Since we are only grabbing coin descriptions, any conversion pair (EUR, USD, JPY, etc.) is fine to call the API. Let's use USD.

In [None]:
path_coinmetadata_yf = '/FileStore/Dados_Input_ABD/CoinMetadata_yf/'
dbutils.fs.mkdirs(path_coinmetadata_yf)

for coin in yfinance_tickerlist(conversion_currency = 'USD').split(' '):
    print(coin[:-4] + ':')

    t_coin = yf.Ticker(coin)

    dict1 = t_coin.info
    yf_metadata_of_interest = ['symbol', 'description']
    new_dict = {key: dict1[key] for key in yf_metadata_of_interest}

    #Writing the json file to dbfs
    dbutils.fs.put(path_coinmetadata_yf + coin[:-4] + '.json', json.dumps(new_dict), True)

In [None]:
# Creating an empty dataframe with empty schema
df_meta_yf = spark.createDataFrame(data = [], schema = StructType([]))
  
for json_file in dbutils.fs.ls(path_coinmetadata_yf):
  
    df_coin_meta = spark.read \
                      .json(json_file.path) \
                      .select('symbol', 'description') \
                      .toDF('Symbol', 'Description')

    # If we are handling the first coin, use a copy the dataframe of their dataframe, or else append them
    if df_meta_yf.rdd.isEmpty():
        df_meta_yf = df_coin_meta.alias('df_meta_yf')
    else:
        df_meta_yf = df_meta_yf.union(df_coin_meta)

# Removing the 1's in SOL and DOT and the '-' + conversion_currency suffix with regular expressions
df_meta_yf = df_meta_yf.select(f.regexp_replace(f.regexp_replace('Symbol', r'[0-9]', ''), '-USD', '').alias('Symbol'), 'Description')

#### 1.3.2. CoinMarketCap for all the rest

In [None]:
#path_coinmetadata_cmc = '/FileStore/CoinMetadata_cmc/'
path_coinmetadata_cmc = '/FileStore/Dados_Input_ABD/CoinMetadata_cmc/'
dbutils.fs.mkdirs(path_coinmetadata_cmc)

# API Request config - metadata endpoint
METAD_URL_ENDPOINT = 'https://pro-api.coinmarketcap.com/v1/cryptocurrency/info'
METAD_REQUEST_PARAMETERS = {'symbol': ','.join(COINS)}



def parsing_cmc_metadata_response_json():
  
    resp_dict = cmc_api_request(endpoint = METAD_URL_ENDPOINT, parameters = METAD_REQUEST_PARAMETERS)

    outer_alt_resp_dict = {}

    # Bringing 'urls' keys to the outer structure
    for key in resp_dict['data'].keys():
        for url_key in resp_dict['data'][key]['urls'].keys():
            resp_dict['data'][key][url_key] = resp_dict['data'][key]['urls'][url_key]

        #Retaining only the fields of interest
        metadata_of_interest = ['id', 'symbol', 'name', 'slug', 'date_added', 'logo', 'platform', 'tag-names', 'website', 'twitter', 'reddit', 'source_code', 'announcement', 'technical_doc']
        alt_resp_dict = {met_key: resp_dict['data'][key][met_key] for met_key in metadata_of_interest}

        # Handling 'platform' key, which is None for almost every coin in the top 10 market cap list
        if alt_resp_dict['platform'] is not None:
            alt_resp_dict['platform'] = resp_dict['data'][key]['platform']['name']
        else:
            alt_resp_dict['platform'] = ''

        #Retrieving the values from url fields, which come wrapped in a list
        for list_field in ['website', 'twitter', 'reddit', 'source_code', 'announcement', 'technical_doc']:
            try:
                alt_resp_dict[list_field] = alt_resp_dict[list_field][0]
            except:
                alt_resp_dict[list_field] = ''

        outer_alt_resp_dict[key] = alt_resp_dict

    return outer_alt_resp_dict

In [None]:
filename = 'Metadata_' + datetime.datetime.now().strftime('%d-%m-%Y') + '.json'
dbutils.fs.put(path_coinmetadata_cmc + filename, json.dumps(parsing_cmc_metadata_response_json()), True)

df_meta_cmc = spark.read \
    .json(path_coinmetadata_cmc + filename) \
    .selectExpr(stacking_string()) \
    .withColumn("ID", f.col('CoinData').getItem('id')) \
    .withColumn("Symbol", f.col('CoinData').getItem('symbol')) \
    .withColumn("Name", f.col('CoinData').getItem('name')) \
    .withColumn("Slug", f.col('CoinData').getItem("slug")) \
    .withColumn("DateAdded", f.col('CoinData').getItem("date_added")) \
    .withColumn("Logo", f.col('CoinData').getItem("logo")) \
    .withColumn("Platform", f.col('CoinData').getItem("platform")) \
    .withColumn("Tags", f.col('CoinData').getItem("tag-names")) \
    .withColumn("URL_Website", f.col('CoinData').getItem("website")) \
    .withColumn("URL_Twitter", f.col('CoinData').getItem("twitter")) \
    .withColumn("URL_Reddit", f.col('CoinData').getItem("reddit")) \
    .withColumn("URL_SourceCode", f.col('CoinData').getItem("source_code")) \
    .withColumn("URL_Announcement", f.col('CoinData').getItem("announcement")) \
    .withColumn("URL_TechnicalDoc", f.col('CoinData').getItem("technical_doc")) \
    .drop('Coin', 'CoinData', 'status')

#### 1.3.3. Merging them

In [None]:
df_meta = df_meta_cmc.join(df_meta_yf,
                           on = 'Symbol',
                           how = 'inner')

df_meta.createOrReplaceTempView("CoinMetadata")

# 2. Quick Market Assessment
**Before we get too excited about predicting the market** and hopefully make some money for ourselves, let's first plot some important KPI's and "measure the market temperature".

## 2.1. Computing Returns and Volatilities

In [None]:
# 1. We will first eliminate datapoints where Open = 0, which occurs for SHIBA INU, which for long had a price so small that the API simply doesn't show enough digits for it to be higher than 0
# Since we are computing returns using the logarithm of Close price divided by Open price, it can't be zero, otherwise we would have indefinite values

# 2. Volatilities, being computed as the standard deviation of daily returns, have to be filled with zero for the case we are in the first day of the month (with 1 datapoint only)

df_market_assessm = df_historicalquotes \
    .filter(f.col('Open') != 0) \
    .withColumn('DailyReturn', f.log(f.col('Close') / f.col('Open'))) \
    .withColumn("Year", f.year("Date")) \
    .withColumn("Month", f.month("Date")) \
    .groupBy(f.col("Year"), f.col("Month"), f.col("Coin")) \
    .agg(f.round(f.avg("Close"), 4).alias("AvgPrice"), 
        f.round(f.avg("DailyReturn"), 4).alias("AvgReturn"),
        f.round(f.stddev("DailyReturn"), 4).alias("Volatility")) \
    .fillna(0, "Volatility") \
    .orderBy(f.col('Coin'), f.col("Year").desc(), f.col("Month").desc()) \
    .select('Coin', 'Year', 'Month', 'AvgPrice', 'AvgReturn', 'Volatility')

df_market_assessm.display()

To compare the indicators side by side, let's normalize them using SparkML VectorAssembler and Pipeline objects.

In [None]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinMaxScaler, VectorAssembler
from pyspark.sql.types import DoubleType

cols_to_scale = ['AvgPrice', 'AvgReturn', 'Volatility']

# Defining a udf to convert the list output of the transformed object into scalar values
list_to_value_udf = f.udf(lambda x: float(list(x)[0]), DoubleType())

for column in cols_to_scale:

    assembler = VectorAssembler(inputCols = [column], outputCol = column + '_Vect')
    scaler = MinMaxScaler(inputCol = column + '_Vect', outputCol = column + '_Scaled')
    pipeline = Pipeline(stages = [assembler, scaler])

    df_market_assessm = pipeline.fit(df_market_assessm) \
        .transform(df_market_assessm) \
        .withColumn(column + "_Scaled", list_to_value_udf(column + "_Scaled")) \
        .drop(column + "_Vect")


df_returns_volatilities = df_market_assessm \
    .groupby('Coin') \
    .agg(f.avg('AvgReturn_Scaled').alias('Return'), f.avg('Volatility_Scaled').alias('Volatility'))

Apparently, Solana (SOL) has been providing its investors with the highest daily average returns, followed by Polkadot (DOT) and Binance Coin (BNB). This makes sense since all of them are relatively recent coins and therefore experimented higher rails of upward trends before stabilizing just recently.   
As for volatilities, three coins are worth mentioning:
* USDT has clearly the lowest volatility because it's a stable coin - meaning its indexed to a fiat currency like US dollars or Euros. USDT, in this case, is indexed to US dollars and thus hardly varies at all.
* SHIB (Shiba Inu) has the highest volatility. While there may be other factors influencing it, the fact that it is a memecoin (without any technology intent behind) explains most of it, since its price is almost exclusively ruled by investor speculation. While DOGE (Dogecoin) also started as a memecoin, there is currently a developing project behind it, which may explain why it has a lower volatility.
* BTC has the second lowest volatility. Again, other factors may explain this fact, but one of them is that Bitcoin is the oldest cryptocurrency and, as we have seen, represents more than 40% of the whole crypto market, and therefore has a higher inertia, and consequently a lower variability/volatility.

## 2.2. Comparing Crypto with the Stock Market
After having handled this crypto data for a while now, one curious detail naturally emerged from it. Indeed, despite all the hype around cryptocurrency lately, the whole crypto market cap does not seem to be that high. Have a look:

In [None]:
# Let's use Bitcoin market cap and its dominance to compute the whole crypto market cap
# Since it is currently at near all-time highs, it is a good time to do this calculation, but we could also use the absolute maximum
whole_crypto_marketcap = static_streamingDF.withColumn('MaxTimestamp', f.max('Timestamp').over(Window.partitionBy('CoinID'))) \
                                           .filter(f.col('Coin') == 'BTC') \
                                           .where(f.col('Timestamp') == f.col('MaxTimestamp')) \
                                           .withColumn('WholeMarketCap', f.col('MarketCap') / (f.col('MarketCapDominance') / 100)) \
                                           .first()['WholeMarketCap']

whole_crypto_marketcap = '%.2f' % round(whole_crypto_marketcap / 1000000000000, 2)

print(f'Whole Crypto Market Cap: {whole_crypto_marketcap} T {CONVERSION_CURRENCIES_SYM[SEL_CONV_CURR]}')

Alright, we are talking in the order of trillions. Let's now compare it with the highest market cap stocks in the S&P 500:

In [None]:
STOCKS_FOR_ASSESSMENT = {'Apple': 'AAPL', 'Microsoft': 'MSFT', 'Google': 'GOOG'}

for stock in STOCKS_FOR_ASSESSMENT:
  
    mc = yf.Ticker(STOCKS_FOR_ASSESSMENT[stock]).info['marketCap']

    # if the selected conversion currency is not USD, convert yahoo finance result (mandatorily in USD) to that conversion currency
    if SEL_CONV_CURR != 'USD':
        conv = yf.Ticker(f'{SEL_CONV_CURR}USD=X').info['previousClose']
    mc /= conv

    mc = '%.2f' % round(mc / 1000000000000, 2)
    print(f'{stock} Market Cap: {mc} T {CONVERSION_CURRENCIES_SYM[SEL_CONV_CURR]}')

Wow! **Apple market cap is still higher than the whole crypto market cap!**
(Had we delivered this work 3 days earlier, Microsoft market cap would be higher too!)

# 3. Some Technical Analysis
**Now that we are feeling comfortable with this data, let's dig in in some fundamentals and perform some technical analysis.** Technical analysis is used specially by day traders to help define the best moment to entry the market to purchase (or to sell). The number of techniques used by different traders is probably in the order of hundreds or thousands, so let's focus our work in some of the most commonly used.

To explain the techniques and assess our calculations, let's grab Cardano (ADA) as our use case (using Bitcoin would be extremely cliché).

In [None]:
TECH_ANALYSIS_COIN = 'ADA'

## 3.1. Crossover with Moving Averages

Moving Averages are one of the most fundamental and useful indicators in technical analysis, since they allow the analyst to identify the general trend of the stock (or crypto) by filtering out short-term noise and (likely) meaningless fluctuations. They can be calculated as **Simple Moving Averages (SMA)**, by simply averaging prices on a defined rolling period, or as Exponential Moving Averages (EMA), by placing a greater weight on more recent datapoints (the closer the higher).

Moving Averages have several applications in technical analysis. Crossover, for instance, is a strategy that makes use of two Moving Averages, one for a shorter term and another for a longer term, and advises to buy when the short-term MA is going up and crosses the long-term MA. We typically call this cross a **golden cross**. Conversely, when the short-term MA is going down, it advises to sell as it indicates (according to this strategy) a downward trend - a death cross.

Let's use 20 days for the shorter term and 100 for the longer one.

In [None]:
# Let's make use of SQL API interval objects to compute and plot moving averages 
query_moving_averages = """
    SELECT *,
           mean(Close) OVER (
                PARTITION BY Coin
                ORDER BY CAST(Date AS timestamp) 
                RANGE BETWEEN INTERVAL 20 DAYS PRECEDING AND CURRENT ROW) AS MovingAvg_20Days,
           mean(Close) OVER (
                PARTITION BY Coin
                ORDER BY CAST(Date AS timestamp) 
                RANGE BETWEEN INTERVAL 100 DAYS PRECEDING AND CURRENT ROW) AS MovingAvg_100Days
    FROM HistoricalQuotes
  """

df_moving_averages = spark.sql(query_moving_averages)
df_moving_averages_ex = df_moving_averages.filter(f.col('Coin') == TECH_ANALYSIS_COIN)

**What do we see?** Focusing on recent times:
* We have a golden cross around the beginning of August 2021, which in retrospective was a huge buying tip since Cardano went on to climb to 100% of its price at the time in the following month.
* Furthermore, we can see a death cross around the end of October / beginning of November 2021, which again proved to be a good selling tip, since Cardano is still struggling today to come back to the prices of August/ September.

Anyway, why look for crosses "manually" (in the line plot above) when we can calculate them with spark? Let's do it.

In [None]:
lag_window = Window.orderBy('Date').partitionBy('Coin')

df_crosses = df_moving_averages.withColumn('ShortTermIsHigher', f.when(f.col('MovingAvg_20Days') > f.col('MovingAvg_100Days'), 1).otherwise(0)) \
                               .withColumn('ShortTermIsHigher_Lag', f.lag(f.col('ShortTermIsHigher'), 1).over(lag_window)) \
                               .withColumn('Cross', f.when(f.col('ShortTermIsHigher') > f.col('ShortTermIsHigher_Lag'), 'Golden Cross') \
                                                     .otherwise(f.when(f.col('ShortTermIsHigher') < f.col('ShortTermIsHigher_Lag'), 'Death Cross') \
                                                     .otherwise('No'))) \
                               .filter(f.col('Cross') != 'No') \
                               .select('Coin', 'Date', 'Cross')
  
df_crosses_ex = df_crosses.filter(f.col('Coin') == TECH_ANALYSIS_COIN)

## 3.2. RSI (Relative Strength Index)

RSI is arguably one of the most used KPI's in technical analysis in the cryptocurrency world (although it is also used for stock assets) to optimize the timing of entering the market for purchasing (or selling). It is fundamentally an indicator of price momentum, ranging from 0 to 100, and typically classifying an asset (in this case, a coin) as overbought (i.e., overvaluated and, consequently, likely set to be corrected downwards) when RSI > 70 and as oversold when RSI < 30.

It is formally calculated as:

\\( RSI = 100 - \frac{100}{1 + \frac{Average Gain}{Average Loss}} \\)   where the average percentage of gain and losses is usually calculated for a rolling-period of 14 (days, for instance).

In [None]:
query_rsi_prep = """
    SELECT Coin,
           Date,
           AdjClose
    FROM HistoricalQuotes
  """

rolling_window = Window.orderBy(f.col('Date').cast('long')).partitionBy('Coin').rowsBetween(-14, 0)

# the first 14 rows will be deleted since those points are overinfluenced by the calculations
df_rsi = spark.sql(query_rsi_prep) \
              .withColumn('AdjClose_Lag1', f.lag(f.col('AdjClose'), 1).over(lag_window)) \
              .withColumn('AdjClose_Lag1_Diff', f.col('AdjClose') - f.col('AdjClose_Lag1')) \
              .withColumn('MinDate', f.min('Date').over(Window.partitionBy('Coin'))) \
              .where(f.col('Date') > f.col('MinDate')) \
              .withColumn('Up', f.when(f.col('AdjClose_Lag1_Diff') > 0, f.col('AdjClose_Lag1_Diff')).otherwise(0)) \
              .withColumn('Down', f.when(f.col('AdjClose_Lag1_Diff') < 0, f.col('AdjClose_Lag1_Diff') * (-1)).otherwise(0)) \
              .withColumn('RollingUp', f.avg('Up').over(rolling_window)) \
              .withColumn('RollingDown', f.avg('Down').over(rolling_window)) \
              .withColumn('RS', f.col('RollingUp') / f.col('RollingDown')) \
              .withColumn('RSI', 100 - (100 / (1 + f.col('RS')))) \
              .withColumn('Index', f.monotonically_increasing_id()) \
              .filter(f.col('Index') > 14) \
              .select('Coin', 'Date', 'RSI')

df_rsi_ex = df_rsi.filter(f.col('Coin') == TECH_ANALYSIS_COIN)

# 4. "Predicting" the market

## 4.1. Assessing autocorrelation

Time series forecasts usually need data to be statistically independent, i.e. non-autocorrelated. Autocorrelation occurs when some (or most) datapoints value depend on the value of other datapoints of the same series. Let's check that through different periods: 1, 10 and 100 (days, in this case).

In [None]:
df_autocorr = df_historicalquotes.filter(f.col('Coin') == TECH_ANALYSIS_COIN) \
                                 .withColumn('AdjClose_Lag1', f.lag(f.col('AdjClose'), 1).over(lag_window)) \
                                 .withColumn('AdjClose_Lag10', f.lag(f.col('AdjClose'), 10).over(lag_window)) \
                                 .withColumn('AdjClose_Lag100', f.lag(f.col('AdjClose'), 100).over(lag_window))

df_autocorr.select('Coin', 'Date', 'AdjClose', 'AdjClose_Lag1').display()

Short-term data is strongly autocorrelated, it appears!

In [None]:
df_autocorr.select('Coin', 'Date', 'AdjClose', 'AdjClose_Lag10').display()

**When we look to 10-day differences, though, the autocorrelation starts to vanish, indicating that we can still model our time series reasonably well** (for 100 days, there's virtually zero autocorrelation - go ahead and uncomment the second line to check for yourself).

## 4.2. Assessing stationarity

Most time series analysis are built under another assumption, which is that the series mean and variance do not change much over time, i.e., is stationary. Statistical tests such as Dickey-Fuller's are formally used to assess stationarity (or the lack of it), but we can do that directly by having a glance at the chart of any cryptocurrency. Take ADA, for instance: we have seen in chapter 2.1 that there is a clear upward trend along time (and perhaps some light seasonality too), and therefore **the time series is strongly non-stationary**.

To solve this problem, let's use subtract consecutive datapoints and model the differences instead of the original values:

In [None]:
df_forecast = df_historicalquotes.filter(f.col('Coin') == TECH_ANALYSIS_COIN)

# Making the data approximately stationary by computing daily returns in the original dataset
df_forecast = df_forecast.withColumn('Daily Return', f.round(f.log(f.col('Close') / f.col('Open')), 4))
df_forecast.select("Date", "Daily Return").display()

## 4.3. Data preprocessing

In [None]:
def transform(df):
  
    df = df.withColumn('Date_dt', f.to_date(f.unix_timestamp(f.col('Date'), 'MM-dd-yyyy').cast("timestamp")))

    days = lambda i: i * 86400

    w = Window().orderBy(f.col("Date").cast("timestamp").cast("long")) \
              .rangeBetween(-days(10), 0)

    lag_window = Window.orderBy("Date_dt")

    df = df.withColumn('Daily_Return_0', f.round(f.log(df_forecast.Close / df_forecast.Open), 4)) \
         .withColumn('rolling_average', f.round(f.avg("Daily_Return_0").over(w), 4)) \
         .withColumn('lag_forecast', f.lag(f.col('Daily_Return_0'), -1).over(lag_window))

    df = df.select("Date", "Daily_Return_0", "rolling_average", "lag_forecast")

    return df



def add_lag(df, n_lags):

inputs = ["Daily_Return_0"]
for i in range(1, n_lags + 1):

    w = Window.orderBy("Date")
    value_lag = f.lag(f'Daily_Return_{i-1}').over(w)
    df = df.withColumn(f'Daily_Return_{i}', value_lag)

    inputs.append(f"Daily_Return_{i}")
    # remove na's created by the lags added
    df = df.dropna(how = 'any')

return df, inputs



def train_test_split(df, n_iteration):
    # Train Dataset
    train_df = df_prepared.where("rank <= " + str(n_iteration)).drop("rank")

    # Test Dataset
    test_df = df_prepared.where("rank > " + str(n_iteration)).drop("rank")
    test_df = test_df.limit(1)

    return train_df, test_df

df_prepared = transform(df_forecast)
df_prepared, inputs = add_lag(df_prepared, 6)
df_prepared = df_prepared.withColumn("rank", f.percent_rank().over(Window.partitionBy().orderBy("Date")))

## 4.4. Building the model

In [None]:
import math as m
from pyspark.ml.regression import RandomForestRegressor
from pyspark.ml.feature import VectorIndexer
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.sql.types import DoubleType 

def random_forest(inputs, df):
  
    stages = []
    inputs.append("rolling_average")
    assembler = VectorAssembler(inputCols = inputs, outputCol = "features")

    # Train a RandomForest model with usual hyperparameters' values
    rf = RandomForestRegressor(featuresCol = "features", 
                             labelCol = "lag_forecast",
                             maxDepth = 5,
                             subsamplingRate = 0.8)

    # Build stages for pipeline
    stages += [assembler, rf]

    # Chain assembler and forest in a Pipeline
    pipeline = Pipeline(stages = stages)

    # training set percentage
    i = 0.9

    schema = StructType([StructField('prediction', DoubleType(), True),
                       StructField('lag_forecast', DoubleType(), True),
                       StructField('test_rmse', DoubleType(), True)])

    forecast = spark.createDataFrame([], schema)

    while i < 1:

        train_df, test_df = train_test_split(df, i)

        pipelineModel = pipeline.fit(train_df)

        # Make predictions, access rmse for each point
        predictions = pipelineModel.transform(test_df)

        predictions = predictions.withColumn('test_rmse', ((predictions.prediction - predictions.lag_forecast) ** 2) ** 0.5) \
                                 .select("prediction", "lag_forecast","test_rmse")

        forecast = forecast.union(predictions)

        i += 0.01

    return forecast

forecast = random_forest(inputs, df_prepared)
forecast.display()

In [None]:
forecast.agg(f.avg('test_rmse')).show()

Alright, that's not good news. Our average RMSE on the test set is on the order of prices, which is far from optimal. We knew from the beginning, however, that we would hardly be able to build an accurate model for predicting the movements of such a complex system such as the cryptocurrency market (or any other live trading market actually). In fact, predicting the market is such an Holy Graal that, if the model was any good, something would be really wrong.

Anyway, again, that's completely not the point of this work. Thus, **we will instead be using the technical analysis indicators to build our trading bot**, right away.

# 5. Live Trading Bot

Alright, so far we have implemented some of the most used techniques to help time the market, but we have only seen the results for the historical dataframe. **Now we want to take advantage of this knowledge to build a trading assistance bot that will tell us what to do with each coin (Sell, Buy or Do Nothing), in real-time.**

Ideally, we would compute these metrics in the streaming dataframe itself (which would have at least some days as historic), but unfortunately window operations are not (yet) supported in spark streaming dataframes, and therefore we are not able to calculate moving averages, RSI's and forecast time series, all depending on lags. Anyways, since this is for demonstration purposes only, we will use the metrics computed for the historical dataset to provide trading assistance in the streaming df, even though we know that the outcome will only change once per day, even if we have data for every 5 minutes.

We will be using a simple model for deciding what to do with each coin:
* **BUY**:
  * Main Condition: If there's been a Golden Cross in the past x days;
  * Secondary Condition: If the RSI has been below 30 at least once in the past y days.

* **SELL**:
  * Main Condition: If there's been a Death Cross in the past x days;
  * Secondary Condition: If the RSI has been above 70 at least once in the past y days.
  
* **DO NOTHING**:
  * If neither BUY nor SELL conditions apply.
  
*Note 1: Main and Secondary conditions mean that the Main Condition will be evaluated first for BUY/SELL, and then the Secondary. This is to avoid contraditory cases where for instance a Coin had a Golden Cross in the past few days, indicating BUY, but also had the RSI above 70, indicating SELL.*   
*Note 2: x and y are number of days defined by the user.*

In [None]:
CROSS_CUTOFF_DAYS = 7

# Dataframe with the last Golden and Death Crosses by Coin and days passed since
df_lastcross = df_crosses \
    .withColumn('Coin', f.regexp_replace('Coin', '1', '')) \
    .groupBy('Coin', 'Cross') \
    .agg(f.max('Date').alias('LastTime')) \
    .groupBy('Coin') \
    .pivot('Cross') \
    .agg(f.first('LastTime')) \
    .toDF('Coin', 'LastDeathCross', 'LastGoldenCross') \
    .withColumn("CurrentDate", f.current_date()) \
    .withColumn("DaysSinceLastDeathCross", f.datediff(f.col("CurrentDate"), f.col("LastDeathCross"))) \
    .withColumn("DaysSinceLastGoldenCross", f.datediff(f.col("CurrentDate"), f.col("LastGoldenCross"))) \
    .select('Coin', 'DaysSinceLastDeathCross', 'DaysSinceLastGoldenCross')


RSI_CUTOFF_DAYS = 7

# Dataframe with maximum and minimum RSI's in the last n days (defined in the variable RSI_CUTOFF_DAYS above), by Coin
df_minmaxrsi = df_rsi \
    .withColumn('Coin', f.regexp_replace('Coin', '1', '')) \
    .withColumn("current_date", f.current_date()) \
    .withColumn("diff_in_days", f.datediff(f.col("current_date"), f.col("Date"))) \
    .filter(f.col("diff_in_days") <= RSI_CUTOFF_DAYS) \
    .groupBy('Coin') \
    .agg(f.max('RSI').alias(f'MaxRSI_{RSI_CUTOFF_DAYS}days'), f.min('RSI').alias(f'MinRSI_{RSI_CUTOFF_DAYS}days'), f.avg('RSI').alias(f'AvgRSI_{RSI_CUTOFF_DAYS}days'))


# Final Dataframe with the Recomendations
df_assistbot = static_streamingDF \
    .alias('df_assistbot') \
    .withColumn('Timestamp', f.expr("substring(Timestamp, 1, length(Timestamp) - 5)")) \
    .withColumn('Timestamp', f.regexp_replace('Timestamp', 'T', ' ')) \
    .withColumn('Timestamp', f.to_timestamp(f.col('Timestamp'), 'yyyy-MM-dd HH:mm:ss').alias('Timestamp')) \
    .withColumn('MaxTimestamp', f.max('Timestamp').over(Window.partitionBy('Coin'))) \
    .where(f.col('Timestamp') == f.col('MaxTimestamp')) \
    .select('Coin', 'Timestamp', 'Price') \
    .join(df_lastcross, on = 'Coin', how = 'inner') \
    .join(df_minmaxrsi, on = 'Coin', how = 'inner')

df_assistbot.display()

In [None]:
main_condition_to_buy = f.col('DaysSinceLastGoldenCross') <= CROSS_CUTOFF_DAYS
sec_condition_to_buy = f.col(f'MinRSI_{RSI_CUTOFF_DAYS}days') <= 30

main_condition_to_sell = f.col('DaysSinceLastDeathCross') <= CROSS_CUTOFF_DAYS
sec_condition_to_sell = f.col(f'MaxRSI_{RSI_CUTOFF_DAYS}days') >= 70


df_assistbot = df_assistbot.withColumn('Recommendation', f.when(main_condition_to_buy, 'BUY') \
                                                          .otherwise(f.when(main_condition_to_sell, 'SELL') \
                                                          .otherwise(f.when(sec_condition_to_buy, 'BUY') \
                                                          .otherwise(f.when(sec_condition_to_sell, 'SELL') \
                                                          .otherwise('-'))))) \
                           .select('Coin', 'Timestamp', 'Recommendation')


# Let us use the show method this time to see all coins at once
df_assistbot.show()

# Appendix - Calling CoinMarketCAP API LatestQuotes endpoint, recursively

In [None]:
while True:

    filename = 'Top10MarketCap_' + datetime.datetime.now().strftime('%d-%m-%Y-%Hh%M') + '.json'
    dbutils.fs.put(path_latestquotes + filename, json.dumps(parsing_response_json()), True)

    time.sleep(260)