In [0]:
import os
import cryptocompare

import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql import DataFrame
from delta.tables import *
from pyspark.sql.types import *

import yfinance as yahooFinance
import pandas as pd
import datetime

import requests
import json
from functools import reduce
import matplotlib.pyplot as plt
import numpy as np
%matplotlib inline

import tweepy
import pandas as pd
from datetime import datetime, timedelta

In [0]:
AZURE_KEY = dbutils.secrets.getBytes(scope="demo_secrets", key="azure_key").decode("utf-8")
spark.conf.set("fs.azure.account.key.databrickstore.blob.core.windows.net", AZURE_KEY)

In [0]:
container_name = "ud108519"
storage_account_name = "kosmobiker"
dbutils.fs.ls(f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net/data")

In [0]:
# dbutils.fs.mount(
#   source = f"wasbs://{container_name}@{storage_account_name}.blob.core.windows.net",
#   mount_point = "/mnt/data",
#   extra_configs = {f"fs.azure.account.key.{storage_account_name}.blob.core.windows.net": AZURE_KEY})
dbutils.fs.refreshMounts()

In [0]:
#setup cryptocompare
crypto_compare_key = dbutils.secrets.getBytes(scope="demo_secrets", key="CRYPTO_COMPARE_KEY").decode("utf-8")
cryptocompare.cryptocompare._set_api_key_parameter(crypto_compare_key)

#setup twitter

consumer_key = dbutils.secrets.getBytes(scope="demo_secrets", key="CONSUMER_KEY").decode("utf-8")
consumer_secret = dbutils.secrets.getBytes(scope="demo_secrets", key="CONSUMER_SECRET").decode("utf-8")

auth = tweepy.AppAuthHandler(consumer_key, consumer_secret)
api = tweepy.API(auth, wait_on_rate_limit=True)

In [0]:
%sql

CREATE SCHEMA IF NOT EXISTS delta_lake
COMMENT 'main schema to be used'
LOCATION 'dbfs:/mnt/data/delta_lake';

USE delta_lake;

# Bronze Level

## Coin list

In [0]:
coin_info_schema = StructType([
        StructField("Id", StringType(), True),
        StructField("Name",StringType(),True),
        StructField("FullName", StringType(),True),
        StructField("CoinName", StringType(),True),
        StructField("Symbol", StringType(),True),
        StructField("Description", StringType(),True),
        StructField("ContentCreatedOn", LongType(), True),
        StructField("Algorithm",StringType(),True),
        StructField("ProofType",StringType(),True),
        StructField("AssetTokenStatus", StringType(),True),
        StructField("ImageUrl", StringType(),True),
        StructField("Url", StringType(),True),
        StructField("Sponsored", BooleanType(), True),
        StructField("Taxonomy", MapType(StringType(), StringType(), True), True),
        StructField("Rating", MapType(StringType(), MapType(StringType(), StringType(), True), True), True)        
])


coin_info = cryptocompare.get_coin_list()
coin_info_df = spark.createDataFrame(coin_info.values(), schema=coin_info_schema)

In [0]:
coin_info_df\
    .coalesce(1)\
    .write\
    .format('delta')\
    .mode('overwrite')\
    .saveAsTable("delta_lake.coin_list")

In [0]:
display(spark.read.table("coin_list").orderBy('ContentCreatedOn').limit(10))

Id,Name,FullName,CoinName,Symbol,Description,ContentCreatedOn,Algorithm,ProofType,AssetTokenStatus,ImageUrl,Url,Sponsored,Taxonomy,Rating
1182,BTC,Bitcoin (BTC),Bitcoin,BTC,"Bitcoin uses peer-to-peer technology to operate with no central authority or banks; managing transactions and the issuing of bitcoins is carried out collectively by the network. Although other cryptocurrencies have come before, Bitcoin is the first decentralized cryptocurrency - Its reputation has spawned copies and evolution in the space.With the largest variety of markets and the biggest value, Bitcoin is here to stay. As with any new invention, there can be improvements or flaws in the initial model however the community and a team of dedicated developers are pushing to overcome any obstacle they come across. It is also the most traded cryptocurrency and one of the main entry points for all the other cryptocurrencies. The price is as unstable as always and it can go up or down by 10%-20% in a single day.Bitcoin is an SHA-256 POW coin with almost 21,000,000 total minable coins. The block time is 10 minutes. See below for a full range of Bitcoin markets where you can trade US Dollars for Bitcoin, crypto to Bitcoin and many other fiat currencies too.Bitcoin Whitepaper PDF - A Peer-to-Peer Electronic Cash SystemBlockchain data provided by: Blockchain (main source), Blockchair (backup)",1417635237,SHA-256,PoW,,/media/37746251/btc.png,/coins/btc/overview,False,"Map(CollateralType -> , FCA -> Exchange, CollateralInfo -> , CollateralizedAsset -> No, Industry -> Financial and Insurance Activities, Access -> Permissionless, FINMA -> Payment, CollateralizedAssetType -> )","Map(Weiss -> Map(TechnologyAdoptionRating -> A-, MarketPerformanceRating -> D+, Rating -> B+))"
1183,NXT,Nxt (NXT),Nxt,NXT,"Nxt is an open-source blockchain platform and the first to rely entirely on a proof-of-stake consensus protocol. Launched in November 2013 and written from scratch in Java, Nxt is proof that blockchain technology is not only about simple transfer of value but also has the potential to revolutionize many aspects of our lives with the various decentralized applications that can be built with it. Today, Nxt remains one of the most tested and reliable platforms in the industry, influencing numerous other projects. With its many easy to use modular built-in features, Nxt covers most of the dApp use cases and at the same time is perfectly suitable for private blockchain implementations.Where did NXT come from? Nxt is an iconic project in the history of blockchain development. Launched as early as November 2013 it represents the first pure proof of stake blockchain ever deployed. Started by an anonymous developer who disappeared shortly thereafter, with a crowdsale that raised only a symbolic amount, the Nxt project formed a wonderful community around it, and gave birth to many successful follow-up projects, including own Ardor platform.What are the advantages of NXT?NXT has simple but flexible architecture makes blockchain adoption easy, andcan be extended to fit any public or private use case.Nxt was the first blockchain to launch a user-friendly and yet fully decentralizedasset exchange. Anyone can issue and trade tokens on top of Nxt, and orders arematched and executed directly on the blockchain itself.The Nxt blockchain has been live in production for more than 6 years withvirtually no downtime, no major bugs, and no successful exploits. Millions of Nxttransactions have been conducted reliably, establishing its reputation as one ofthe most secure platforms in the blockchain industry.",1417635253,PoS,PoS/LPoS,Finished,/media/38553729/nxt.png,/coins/nxt/overview,False,"Map(CollateralType -> , FCA -> Utility, CollateralInfo -> , CollateralizedAsset -> No, Industry -> Blockchain-Specific Application, Access -> Permissionless, FINMA -> Utility, CollateralizedAssetType -> )","Map(Weiss -> Map(TechnologyAdoptionRating -> E-, MarketPerformanceRating -> D-, Rating -> E-))"
2349,PPC,PeerCoin (PPC),PeerCoin,PPC,A peer-to-peer crypto-currency design derived from Satoshi Nakamoto's Bitcoin. Proof-of-Stake replaces Proof-of-Work to provide most of the network security. Under this hybrid design proof-of-work mainly provides initial minting and is largely non-essential in the long run. The security level of the network is not highly energy-dependent thus providing an energy-efficient and more cost-competitive peer-to-peer crypto-currency. Proof-of-Stake is based on coin age and generated by each node via a hashing scheme bearing similarity to Bitcoins but over limited search space. Blockchain history and transaction settlement are further protected by a centrally broadcasted checkpoint mechanism.,1424105258,,PoS/PoW,,/media/19864/peercoin-logo.png,/coins/ppc/overview,False,"Map(CollateralType -> , FCA -> Exchange, CollateralInfo -> , CollateralizedAsset -> No, Industry -> Financial and Insurance Activities, Access -> Permissionless, FINMA -> Payment, CollateralizedAssetType -> )","Map(Weiss -> Map(TechnologyAdoptionRating -> D+, MarketPerformanceRating -> D, Rating -> D+))"
3638,PRC,ProsperCoin (PRC),ProsperCoin,PRC,"ProsperCoin is a PoW cryptocurrency with a modified Scrypt algorithm. Created to bring to bring prosperity to its users, PRC can be sent to anyone for a small fee and almost instantly.",1424886122,Scrypt,PoW,,/media/20393/prc.png,/coins/prc/overview,False,"Map(CollateralType -> , FCA -> , CollateralInfo -> , CollateralizedAsset -> , Industry -> , Access -> , FINMA -> , CollateralizedAssetType -> )","Map(Weiss -> Map(TechnologyAdoptionRating -> , MarketPerformanceRating -> , Rating -> ))"
3639,YBC,YbCoin (YBC),YbCoin,YBC,YB coin is a Chinese clone of YACcoin. It uses scrypt N+1 and ChaCha. The coin has a relatively low block time at 60 seconds but being a clone hasnt really brought much to the table in terms of innovation. Again like Yacoin there is a dynamically adjusted economic model related to the network power that balances the incentivisation to mine and use it depending on its popularity. An inverse relationship.,1424886292,Multiple,PoS,,/media/19975/ybc.png,/coins/ybc/overview,False,"Map(CollateralType -> , FCA -> , CollateralInfo -> , CollateralizedAsset -> , Industry -> , Access -> , FINMA -> , CollateralizedAssetType -> )","Map(Weiss -> Map(TechnologyAdoptionRating -> , MarketPerformanceRating -> , Rating -> ))"
3640,DANK,DarkKush (DANK),DarkKush,DANK,"DarkKush is a cryptocurrency built for the marijuana community, it's a PoW/PoS hybrid coin.",1424886507,X13,PoW/PoS,,/media/20247/dank.png,/coins/dank/overview,False,"Map(CollateralType -> , FCA -> , CollateralInfo -> , CollateralizedAsset -> , Industry -> , Access -> , FINMA -> , CollateralizedAssetType -> )","Map(Weiss -> Map(TechnologyAdoptionRating -> , MarketPerformanceRating -> , Rating -> ))"
3641,GIVE,GiveCoin (GIVE),GiveCoin,GIVE,"GiveCoin is a CryptoCurrency with a charitable angle - the idea being to emPoWer charities and encourage giving. The coin is based on DarkCoins ASIC resistant X11 algorithm, has a block time of 60 seconds - block reward of 1000 coins per block - halving every six months - a total supply of 500 million and a Kimoto gravity well difficulty retargetting. There was a premine of 5%, 2% will be donated to charities on a most voted basis each month, 1% for disaster relief, 1% for bugs and bounties and 1% for development and marketing.",1424889416,X11,PoW,,/media/20297/give.png,/coins/give/overview,False,"Map(CollateralType -> , FCA -> , CollateralInfo -> , CollateralizedAsset -> , Industry -> , Access -> , FINMA -> , CollateralizedAssetType -> )","Map(Weiss -> Map(TechnologyAdoptionRating -> , MarketPerformanceRating -> , Rating -> ))"
3642,KOBO,KoboCoin (KOBO),KoboCoin,KOBO,"Kobocoin is a digital currency and payment system similar to Bitcoin, with an African heritage. All nodes verify transactions in a public distributed ledger calledthe blockchain. The ledger uses its own unit of account, also called KoboCoin(s). KOBO can be sent to anyone in the world for small fees and almost instantly and can also be used for Micropayments.",1424944750,X15,PoW/PoS,,/media/35521133/kobo.png,/coins/kobo/overview,False,"Map(CollateralType -> , FCA -> , CollateralInfo -> , CollateralizedAsset -> , Industry -> , Access -> , FINMA -> , CollateralizedAssetType -> )","Map(Weiss -> Map(TechnologyAdoptionRating -> , MarketPerformanceRating -> , Rating -> ))"
3643,DT,DarkToken (DT),DarkToken,DT,"DarkToken aims to offer a refreshing new take on what a crypto-currency should be by detailing every aspect of the coin, from start to finish. DarkToken takes the next step in coin development by fixing it from the start. Rigorous, energy intensive mining over long periods of time, or even worse, short PoW flash-mines, are extremely detrimental to not only the environment, but the crypto community as a whole. Networks can be maintained perfectly using Proof of Stake.",1424944815,NIST5,PoW/PoS,,/media/20031/dt.png,/coins/dt/overview,False,"Map(CollateralType -> , FCA -> , CollateralInfo -> , CollateralizedAsset -> , Industry -> , Access -> , FINMA -> , CollateralizedAssetType -> )","Map(Weiss -> Map(TechnologyAdoptionRating -> , MarketPerformanceRating -> , Rating -> ))"
3644,CETI,CETUS Coin (CETI),CETUS Coin,CETI,Cetus Coin is a Scrypt Proof of Work and Proof of Stake hybrid cryptocurrency.,1424944878,Scrypt,PoW/PoS,,/media/20228/ceti.png,/coins/ceti/overview,False,"Map(CollateralType -> , FCA -> , CollateralInfo -> , CollateralizedAsset -> , Industry -> , Access -> , FINMA -> , CollateralizedAssetType -> )","Map(Weiss -> Map(TechnologyAdoptionRating -> , MarketPerformanceRating -> , Rating -> ))"


## Historical data

In [0]:
df = (spark.read.table("coin_list"))
list_of_coins = [
    "BTC",
    "ETH",
    "BUSD",
    "USDT",
    "XRP",
    "SOL",
    "BNB",
    "DOT",
    "SHIB",
    "LTC"
  ]
# list_of_currencies = [
#     "USD",
#     "EUR",
#     "JPY",
#     "BTC"
#   ]
list_of_currencies = ['USD']
created_on = {row['Name']:row['ContentCreatedOn'] for row in df.collect() if row['Name'] in list_of_coins}
created_on

In [0]:
list_of_currencies = ["USD", "BTC"]
dataframe_schema = StructType([
        StructField("time",LongType(), True),
        StructField("high",DoubleType(),True),
        StructField("low", DoubleType(),True),
        StructField("open", DoubleType(),True),
        StructField("volumefrom", DoubleType(),True),
        StructField("volumeto", DoubleType(),True),
        StructField("close", DoubleType(),True),
        StructField("conversionType", StringType(),True),
        StructField("conversionSymbol", StringType(),True),
        StructField("coin_currency", StringType(),True)
])

In [0]:
def get_historical_data(coin:str,
                        cur:str,
                        created_on:dict,
                        schema,
                        ts=int(datetime.now().timestamp()),
                        limit=2000):
    data = []
    done = False
    while not done:
        try:
            tmp_json = cryptocompare.get_historical_price_hour(coin, cur, limit=limit, toTs=ts)
            if ts < created_on[coin]:
                done = True
            else:
                data.extend(tmp_json)
                ts -= 3600000
                done = False
        except Exception as err:
            print(err)
            done = True
    return sqlContext.read.json(sc.parallelize(data), schema=schema)

In [0]:
for coin in list_of_coins:
    for cur in list_of_currencies:
        if coin != cur:
            try:
                df = get_historical_data(coin, cur, created_on, dataframe_schema)
                df = df.fillna(value=f"{coin}_{cur}", subset=['coin_currency']) 
                (df.coalesce(1)
                    .write
                    .format('delta')
                    .mode('append')
                    .saveAsTable("BRONZE_OHLC_DATA")
                )
                print(f"{coin}_{cur} was uploaded!")
            except Exception as err:
                print(err)

In [0]:
df = spark.read.table('BRONZE_OHLC_DATA')
display(df.limit(10))

time,high,low,open,volumefrom,volumeto,close,conversionType,conversionSymbol,coin_currency
1653609600,1805.97,1769.51,1791.0,19431.98,34751485.65,1772.02,direct,,ETH_USD
1653613200,1777.77,1735.56,1772.02,33619.18,59000573.1,1740.62,direct,,ETH_USD
1653616800,1765.66,1728.67,1740.62,33691.07,58824140.36,1751.8,direct,,ETH_USD
1653620400,1776.98,1750.14,1751.8,24136.92,42591091.88,1766.85,direct,,ETH_USD
1653624000,1775.07,1739.89,1766.85,18602.43,32644752.31,1742.64,direct,,ETH_USD
1653627600,1756.14,1711.37,1742.64,26496.34,45972944.52,1733.52,direct,,ETH_USD
1653631200,1774.29,1721.37,1733.52,25296.92,44181954.85,1769.11,direct,,ETH_USD
1653634800,1795.96,1761.65,1769.11,26503.89,47061450.39,1762.42,direct,,ETH_USD
1653638400,1786.56,1749.79,1762.42,24186.27,42866225.2,1763.2,direct,,ETH_USD
1653642000,1784.36,1760.53,1763.2,13539.77,23986899.42,1783.19,direct,,ETH_USD


In [0]:
%sql
DESCRIBE HISTORY BRONZE_OHLC_DATA
LIMIT 5

--RESTORE TABLE BRONZE_OHLC_DATA TO VERSION AS OF 38


version,timestamp,userId,userName,operation,operationParameters,job,notebook,clusterId,readVersion,isolationLevel,isBlindAppend,operationMetrics,userMetadata
18,2022-08-18T08:21:43.000+0000,7874556634270339,u.darhevich@godeltech.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(1687249738423510),0712-125157-gg80rrpi,17,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 132066, numOutputBytes -> 2940542)",
17,2022-08-18T08:20:52.000+0000,7874556634270339,u.darhevich@godeltech.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(1687249738423510),0712-125157-gg80rrpi,16,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 132066, numOutputBytes -> 3377575)",
16,2022-08-18T08:19:52.000+0000,7874556634270339,u.darhevich@godeltech.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(1687249738423510),0712-125157-gg80rrpi,15,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 24012, numOutputBytes -> 304013)",
15,2022-08-18T08:19:37.000+0000,7874556634270339,u.darhevich@godeltech.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(1687249738423510),0712-125157-gg80rrpi,14,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 24012, numOutputBytes -> 529182)",
14,2022-08-18T08:19:18.000+0000,7874556634270339,u.darhevich@godeltech.com,WRITE,"Map(mode -> Append, partitionBy -> [])",,List(1687249738423510),0712-125157-gg80rrpi,13,WriteSerializable,True,"Map(numFiles -> 1, numOutputRows -> 38019, numOutputBytes -> 849124)",


## Daily data

In [0]:
path_to_daily_data = "dbfs:/mnt/data/data/daily_crypto_data"
list_of_pathes = [path for path in dbutils.fs.ls(path_to_daily_data)]
def get_minutes_data(path):
    data = []
    for path in list_of_pathes:
        name = path.name[:-1]
        path_json = path.path
        tmp = spark.read.json(path_json, schema=dataframe_schema)
        tmp = tmp.fillna(value=f"{name}", subset=['coin_currency']) 
        data.append(tmp)
    return reduce(DataFrame.unionAll, data)

In [0]:
per_minute_data = get_minutes_data(list_of_pathes).createOrReplaceTempView('per_minute_data')

In [0]:
%sql
MERGE INTO BRONZE_OHLC_DATA AS target
USING per_minute_data AS source
ON target.time = source.time AND target.coin_currency = source.coin_currency
WHEN NOT MATCHED
  THEN INSERT *

num_affected_rows,num_updated_rows,num_deleted_rows,num_inserted_rows
1895262,0,0,1895262


## Twitter data

In [0]:
path_to_twitter_data = "dbfs:/mnt/data/data/daily_tweets"

twitter_schema = StructType(fields=[
    StructField('id', LongType(), True),
    StructField('created_at', TimestampType(), True),
    StructField('ceil_datetime', TimestampType(), True),
    StructField('month', IntegerType(), True),
    StructField('weekday', IntegerType(),True),
    StructField('day', IntegerType(), True),
    StructField('hour', IntegerType(),True),
    StructField('minute', IntegerType(), True),
    StructField('QUERY', StringType(), True),
    StructField('ticker', StringType(), True),
    StructField('text', StringType(), True),
    StructField('favorite_count', IntegerType(), True),
    StructField('result_type', StringType(), True),
    StructField('user_name', StringType(), True),
    StructField('followers_count', IntegerType(), True),
    StructField('retweet_count', IntegerType(), True),
])

In [0]:
tweets_df = spark.read.json(path_to_twitter_data, schema=twitter_schema)
tweets_df.limit(10).display()

id,created_at,ceil_datetime,month,weekday,day,hour,minute,QUERY,ticker,text,favorite_count,result_type,user_name,followers_count,retweet_count
1546465252623241216,2022-07-11T12:03:42.000+0000,2022-07-11T12:15:00.000+0000,7,1,11,12,15,bitcoin,BTC,Bullish on #Bitcoin,18047,popular,saylor,2619208,2619
1546203207646564355,2022-07-10T18:42:26.000+0000,2022-07-10T18:45:00.000+0000,7,0,10,18,45,bitcoin,BTC,"Q: how is @binance doing now #bitcoin price is $20k? Me: when @binance started, #bitcoin price was $2k.",12179,popular,cz_binance,6609831,1526
1546308105427832834,2022-07-11T01:39:16.000+0000,2022-07-11T01:45:00.000+0000,7,1,11,1,45,bitcoin,BTC,"FUN FACT: If you invested $1,000 in #Bitcoin 10 years ago, youd have $2,856,000 today.",6032,popular,WatcherGuru,1223599,933
1546736081684508675,2022-07-12T05:59:53.000+0000,2022-07-12T06:00:00.000+0000,7,2,12,6,0,bitcoin,BTC,RT @tonimoral23: Still early #HODL #Bitcoin https://t.co/0yFRlwNQvM,0,recent,djshankvidya,16,292
1546736081667768320,2022-07-12T05:59:53.000+0000,2022-07-12T06:00:00.000+0000,7,2,12,6,0,bitcoin,BTC,Do you believe that #Bitcoin and #vechain has found it bottom. https://t.co/kHosDXzSeS,0,recent,ighosotu_ovie,410,0
1546736080669802498,2022-07-12T05:59:53.000+0000,2022-07-12T06:00:00.000+0000,7,2,12,6,0,bitcoin,BTC,RT @SocialGood_Inc: Win $100 in $BTC Follow us & RT this for a chance to win a cool $100 worth of #Bitcoin !! We'll tag the lucky 39t,0,recent,MongeeRebeka,146,1346
1546736078094213120,2022-07-12T05:59:52.000+0000,2022-07-12T06:00:00.000+0000,7,2,12,6,0,bitcoin,BTC,"Basically Bitcoin permabull maxis, but for RW politics https://t.co/b9EFbtYr5C",0,recent,DirleKamael,118,0
1546736078027390977,2022-07-12T05:59:52.000+0000,2022-07-12T06:00:00.000+0000,7,2,12,6,0,bitcoin,BTC,RT @GRDecter: Peter Schiff 2021: Bitcoin is worth zero Peter Schiff 2022: I will sell my bank for bitcoin https://t.co/tvUECnyXTh,0,recent,PumpHope_Exit,260,28
1546736074986344448,2022-07-12T05:59:51.000+0000,2022-07-12T06:00:00.000+0000,7,2,12,6,0,bitcoin,BTC,"RT @Newtonmarvin1: #MarvinInu gonna reach the universe , #marvin is gonna be the top best coin , dont doubt the best token , make sure you",0,recent,cryptoalkoeko,65,4
1546736071068762112,2022-07-12T05:59:50.000+0000,2022-07-12T06:00:00.000+0000,7,2,12,6,0,bitcoin,BTC,Who have the balls of steel to push #Bitcoin back o $13k? https://t.co/4VnBqklIql,0,recent,CYBERPUNK0NEWS,163,0


In [0]:
tweets_df\
    .write\
    .format('delta')\
    .mode('append')\
    .saveAsTable("BRONZE_TWEET_DATA")

# SILVER LEVEL

## Enrichment of daily OHLC data

In [0]:
bronze_ohlc = spark.read.table("BRONZE_OHLC_DATA").dropDuplicates(['time', 'coin_currency'])
def bronze_to_silver_ohlc(df):
    df = (df.withColumn('date_time', F.from_unixtime(F.col('time'), 'yyyy-MM-dd HH:mm:ss'))
                             .withColumn('year', F.from_unixtime(F.col("time"),"yyyy").cast(IntegerType()))
                             .withColumn('month', F.from_unixtime(F.col("time"),"MM").cast(IntegerType()))
                             .withColumn('day', F.from_unixtime(F.col("time"),"dd").cast(IntegerType()))
                             .withColumn('hour', F.from_unixtime(F.col("time"),"HH").cast(IntegerType()))
                             .withColumn('minute', F.from_unixtime(F.col("time"),"mm").cast(IntegerType()))
                             .withColumn('coin', F.split(F.col('coin_currency'), '_').getItem(0))
                             .withColumn('currency', F.split(F.col('coin_currency'), '_').getItem(1))
                             .withColumn('delta', (F.col('close') - F.col('open'))*100/F.col('open'))
                             .withColumnRenamed('time', 'time_stamp')
                             .withColumnRenamed('volumefrom', 'volume_fsym')
                             .withColumnRenamed('volumeto', 'volume_tsym')
                             .withColumnRenamed('coin', 'ticker')
                    )
    return df.select('ticker', 'date_time', 'open', 'high', 'low', 'close', 'volume_fsym', 'volume_tsym',
                     'currency', 'delta', 'time_stamp', 'year', 'month', 'day', 'hour', 'minute')

In [0]:
silver_df = bronze_to_silver_ohlc(bronze_ohlc)
silver_df.write\
    .partitionBy('year')\
    .format('delta')\
    .mode('overwrite')\
    .saveAsTable("SILVER_OHLC_DATA")

In [0]:
%sql
-- MERGE INTO SILVER_OHLC_DATA AS target
-- USING tmp_silver AS source
-- ON source.time_stamp=target.time_stamp AND source.ticker=target.ticker AND source.currency=target.currency
-- WHEN NOT MATCHED
--    THEN INSERT *

## Cleaning of Twitter Data

In [0]:
tweets_df = spark.read.table('BRONZE_TWEET_DATA')
def transform_daily_tweets(df):
    df = df.withColumn('year', F.year(F.col("created_at")).cast(IntegerType()))
    w = Window.partitionBy("id").orderBy(*[F.desc(c) for c in ["favorite_count","followers_count", "retweet_count"]])
    return df.withColumn("row_num", F.row_number().over(w))\
                        .filter(F.col('row_num') == 1)\
                        .drop(F.col('row_num'))

In [0]:
output = transform_daily_tweets(tweets_df)
output.write\
    .format('delta')\
    .mode('overwrite')\
    .option("overwriteSchema", "true") \
    .saveAsTable("SILVER_TWEET_DATA")

# GOLD LEVEL

## Merge to Gold Table

In [0]:
silver_ohlc = spark.read.table("silver_ohlc_data")
silver_tweets = spark.read.table("silver_tweet_data")

gold_df = silver_tweets.join(
                            silver_ohlc,
                            how='inner', 
                            on=['ticker', 'year', 'month', 'day', 'hour', 'minute']
                            )
gold_df.write\
        .partitionBy('year')\
        .format('delta')\
        .mode('overwrite')\
        .saveAsTable("GOLD_TABLE")

In [0]:
%sql
SELECT * FROM GOLD_TABLE
WHERE year = 2022
ORDER BY created_at DESC
LIMIT 25

ticker,month,day,hour,minute,id,created_at,ceil_datetime,weekday,QUERY,text,favorite_count,result_type,user_name,followers_count,retweet_count,date_time,open,high,low,close,volume_fsym,volume_tsym,currency,delta,time_stamp,year
ETH,8,13,6,0,1558332524614787072,2022-08-13T06:00:00.000+0000,2022-08-13T06:00:00.000+0000,6,ethereum,RT @aptozbirds: Aptos Birds Pass will be mintable for FREE on Ethereum. Pass will be a ticket to mint on Aptos once Mainnet is live. Fi,0,recent,matthewmba1,376,98,2022-08-13 06:00:00,0.0807,0.08136,0.08063,0.08136,14355.41,1163.41,BTC,0.8178438661710135,1660370400,2022
ETH,8,13,6,0,1558332521636577282,2022-08-13T06:00:00.000+0000,2022-08-13T06:00:00.000+0000,6,ethereum,"As the Asian trading session ends, the global #cryptocurrency market cap is up 3.17% in the last 24 hours. 91 o https://t.co/5Gg7Mxmb1D",0,recent,CurrencyRush,30,0,2022-08-13 06:00:00,0.0807,0.08136,0.08063,0.08136,14355.41,1163.41,BTC,0.8178438661710135,1660370400,2022
ETH,8,13,6,0,1558332521636577282,2022-08-13T06:00:00.000+0000,2022-08-13T06:00:00.000+0000,6,ethereum,"As the Asian trading session ends, the global #cryptocurrency market cap is up 3.17% in the last 24 hours. 91 o https://t.co/5Gg7Mxmb1D",0,recent,CurrencyRush,30,0,2022-08-13 06:00:00,1995.35,2020.32,1995.21,2007.07,34642.61,69643166.49,USD,0.5873656250783085,1660370400,2022
ETH,8,13,6,0,1558332522198704131,2022-08-13T06:00:00.000+0000,2022-08-13T06:00:00.000+0000,6,ethereum,"Ethereum Price Broke Past $1,800 Despite Higher Demand At Lower Levels https://t.co/jBFzqrsKkM #market #marketnews #bitcoininfo",0,recent,factiive,252,0,2022-08-13 06:00:00,1995.35,2020.32,1995.21,2007.07,34642.61,69643166.49,USD,0.5873656250783085,1660370400,2022
ETH,8,13,6,0,1558332524614787072,2022-08-13T06:00:00.000+0000,2022-08-13T06:00:00.000+0000,6,ethereum,RT @aptozbirds: Aptos Birds Pass will be mintable for FREE on Ethereum. Pass will be a ticket to mint on Aptos once Mainnet is live. Fi,0,recent,matthewmba1,376,98,2022-08-13 06:00:00,1995.35,2020.32,1995.21,2007.07,34642.61,69643166.49,USD,0.5873656250783085,1660370400,2022
ETH,8,13,6,0,1558332522123051008,2022-08-13T06:00:00.000+0000,2022-08-13T06:00:00.000+0000,6,ethereum,RT @ENZA_ETH: The future of gaming is here. The first ever AAA caliber Play2Earn game on the Ethereum Blockchain is here. Get ready for,0,recent,pengabditesti,53,2176,2022-08-13 06:00:00,0.0807,0.08136,0.08063,0.08136,14355.41,1163.41,BTC,0.8178438661710135,1660370400,2022
ETH,8,13,6,0,1558332522198704131,2022-08-13T06:00:00.000+0000,2022-08-13T06:00:00.000+0000,6,ethereum,"Ethereum Price Broke Past $1,800 Despite Higher Demand At Lower Levels https://t.co/jBFzqrsKkM #market #marketnews #bitcoininfo",0,recent,factiive,252,0,2022-08-13 06:00:00,0.0807,0.08136,0.08063,0.08136,14355.41,1163.41,BTC,0.8178438661710135,1660370400,2022
ETH,8,13,6,0,1558332522123051008,2022-08-13T06:00:00.000+0000,2022-08-13T06:00:00.000+0000,6,ethereum,RT @ENZA_ETH: The future of gaming is here. The first ever AAA caliber Play2Earn game on the Ethereum Blockchain is here. Get ready for,0,recent,pengabditesti,53,2176,2022-08-13 06:00:00,1995.35,2020.32,1995.21,2007.07,34642.61,69643166.49,USD,0.5873656250783085,1660370400,2022
SHIB,8,13,6,0,1558332505404612608,2022-08-13T05:59:56.000+0000,2022-08-13T06:00:00.000+0000,6,Shibtoken,@Shibtoken @UnificationUND guys wanna share with you this awesome video which help's me with passive income everyday https://t.co/LkCHOOoOYl,0,recent,JeetuAhuja8,91,0,2022-08-13 06:00:00,1.286e-05,1.296e-05,1.272e-05,1.275e-05,187010336546.71,2405270.07,USD,-0.8553654743390408,1660370400,2022
SHIB,8,13,6,0,1558332505404612608,2022-08-13T05:59:56.000+0000,2022-08-13T06:00:00.000+0000,6,Shibtoken,@Shibtoken @UnificationUND guys wanna share with you this awesome video which help's me with passive income everyday https://t.co/LkCHOOoOYl,0,recent,JeetuAhuja8,91,0,2022-08-13 06:00:00,5e-10,5e-10,5e-10,5e-10,392384000.0,0.204,BTC,0.0,1660370400,2022
