In [0]:
import time 
import requests
import boto3
from datetime import date
from botocore.client import Config
import json
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.functions import *
import pandas as pd
from io import BytesIO

In [0]:
# Extract traders data from: coinapi.io 
def extract_traders_data():
    url = "https://rest.coinapi.io/v1/trades/latest"
    headers = {"X-CoinAPI-Key" : dbutils.secrets.get(scope = "crypto_key", key = "key_api")}
    response = requests.get(url, headers=headers)
    data = response.json()
    bucket_name = "crypto-currency-data-prosimplee"
    file_name = "crypto/data/bronze/" + str(date.today()) + "_crypto_.json"
    s3 = boto3.resource("s3")
    try:
        s3.Bucket(bucket_name).put_object(Key=file_name, Body=json.dumps(data), ACL='private')
        print("Success")
    except ValueError:
        print("Extract Traders Data: FAILED!")
    
extract_traders_data()

In [0]:
# Get bronze values from S3 Bucket & Transform & Put silver data into S3 Bucket 
def silver_crypto():
    s3 = boto3.resource("s3")
    content_object = s3.Object("crypto-currency-data-prosimplee", "crypto/data/bronze/" + str(date.today()) + "_crypto_.json")
    try:
        file_content = content_object.get()["Body"].read().decode("utf-8")
        json_content = json.loads(file_content)
        crypto_silver_data = []
        for crypto_raw in json_content:
            crypto_silver_data.append({
                "user_id": crypto_raw["uuid"],    
                "symbol_id": crypto_raw["symbol_id"], 
                "action": crypto_raw["taker_side"], 
                "size": crypto_raw["size"],
                "time_exchange": crypto_raw["time_exchange"]})
        
        crypto_silver_df = pd.DataFrame(crypto_silver_data)   
        bucket_name = "crypto-currency-data-prosimplee"
        file_name = "crypto/data/silver/" + str(date.today()) + "_crypto_.parquet"
        s3 = boto3.resource("s3")
        out_buffer = BytesIO()
        try:
            crypto_silver_df.to_parquet(out_buffer, index=False)   
            s3.Bucket(bucket_name).put_object(Key=file_name, Body=out_buffer.getvalue(), ACL="private") 
        except ValueError:
            print("Parquet traders data (silver) values into S3: FAILED!")
                                                               
    except ValueError:
        print("Connection to S3 (traders data): FAILED!")
        
silver_crypto()

In [0]:
# Create SparkSession
spark = SparkSession.builder.getOrCreate()

In [0]:
# Get traders data (gold) from S3 Bucket & Create Table (crypto_data)
golden_crypto = spark.read \
.option("inferSchema", True) \
.parquet("s3a://crypto-currency-data-prosimplee/crypto/data/silver/" + str(date.today()) + "_crypto_.parquet") \
.createOrReplaceTempView("crypto_data")

spark.conf.set("spark.databricks.io.cache.enabled", False)

golden_crypto_table = spark.table("crypto_data")

golden_crypto_table.limit(10).toPandas()

Unnamed: 0,user_id,symbol_id,action,size,time_exchange
0,3eee35f7-49d8-40bf-8821-ae29bb959ba2,OKEX_SPOT_LAT_USDT,BUY,484.1335,2022-08-03T14:28:09.3170000Z
1,61bf0b3f-587e-4f2d-8f99-f8b35e262a94,DIGIFINEX_SPOT_BCH_USDT_566CCD,SELL,0.01303,2022-08-03T14:27:22.0000000Z
2,995eec52-f4ba-4c85-ab36-7b1afb8dbb86,DIGIFINEX_SPOT_BCH_USDT_566CCD,SELL,0.01639,2022-08-03T14:27:21.0000000Z
3,3c13ef9d-7aef-4292-9563-bd77413dc990,DIGIFINEX_SPOT_BCH_USDT_566CCD,SELL,0.87628,2022-08-03T14:27:21.0000000Z
4,0cf0ebeb-c5b9-40af-86d9-ff61314f04d2,DIGIFINEX_SPOT_CRV_USDT,BUY,5.0,2022-08-03T14:27:34.0000000Z
5,06b7b220-c186-4be3-a51b-2dca7ab93367,DIGIFINEX_SPOT_CRV_USDT,SELL,395.047156,2022-08-03T14:27:15.0000000Z
6,a666bb1c-b0d4-4c16-8083-594e58be90a9,DIGIFINEX_SPOT_BCH_USDT_566CCD,BUY,0.01375,2022-08-03T14:27:21.0000000Z
7,1ad3f14c-b162-4a8c-98a0-c3fb8a7bf83c,DIGIFINEX_SPOT_CRV_USDT,SELL,7.473777,2022-08-03T14:24:39.0000000Z
8,645d2643-590b-462a-989d-8addd8babcc7,DIGIFINEX_SPOT_BCH_USDT_566CCD,BUY,0.01445,2022-08-03T14:27:20.0000000Z
9,3d03c253-bcf8-43e8-a682-0480f4684b05,DIGIFINEX_SPOT_CRV_USDT,SELL,788.762449,2022-08-03T14:24:36.0000000Z


In [0]:
# Functions for take symbol_from and symbol_to 
def extract_symbol_from(column):
    symb_from = column.split("_")[2]
    return symb_from

sym_from_udf = F.udf(extract_symbol_from)

def extract_symbol_to(column):
    symb_to = column.split("_")[3]
    return symb_to

sym_to_udf = F.udf(extract_symbol_to)
    

In [0]:
# Apply Functions on our columns
golden_crypto = golden_crypto_table \
    .withColumn("symbol_from",sym_from_udf(golden_crypto_table.symbol_id)) \
    .withColumn("symbol_to",sym_to_udf(golden_crypto_table.symbol_id))\
    .withColumn("time_exchange", to_timestamp(F.col("time_exchange").cast("timestamp"))) 


In [0]:
golden_crypto.toPandas().head(10)

Unnamed: 0,user_id,symbol_id,action,size,time_exchange,symbol_from,symbol_to
0,3eee35f7-49d8-40bf-8821-ae29bb959ba2,OKEX_SPOT_LAT_USDT,BUY,484.1335,2022-08-03 14:28:09.317,LAT,USDT
1,61bf0b3f-587e-4f2d-8f99-f8b35e262a94,DIGIFINEX_SPOT_BCH_USDT_566CCD,SELL,0.01303,2022-08-03 14:27:22.000,BCH,USDT
2,995eec52-f4ba-4c85-ab36-7b1afb8dbb86,DIGIFINEX_SPOT_BCH_USDT_566CCD,SELL,0.01639,2022-08-03 14:27:21.000,BCH,USDT
3,3c13ef9d-7aef-4292-9563-bd77413dc990,DIGIFINEX_SPOT_BCH_USDT_566CCD,SELL,0.87628,2022-08-03 14:27:21.000,BCH,USDT
4,0cf0ebeb-c5b9-40af-86d9-ff61314f04d2,DIGIFINEX_SPOT_CRV_USDT,BUY,5.0,2022-08-03 14:27:34.000,CRV,USDT
5,06b7b220-c186-4be3-a51b-2dca7ab93367,DIGIFINEX_SPOT_CRV_USDT,SELL,395.047156,2022-08-03 14:27:15.000,CRV,USDT
6,a666bb1c-b0d4-4c16-8083-594e58be90a9,DIGIFINEX_SPOT_BCH_USDT_566CCD,BUY,0.01375,2022-08-03 14:27:21.000,BCH,USDT
7,1ad3f14c-b162-4a8c-98a0-c3fb8a7bf83c,DIGIFINEX_SPOT_CRV_USDT,SELL,7.473777,2022-08-03 14:24:39.000,CRV,USDT
8,645d2643-590b-462a-989d-8addd8babcc7,DIGIFINEX_SPOT_BCH_USDT_566CCD,BUY,0.01445,2022-08-03 14:27:20.000,BCH,USDT
9,3d03c253-bcf8-43e8-a682-0480f4684b05,DIGIFINEX_SPOT_CRV_USDT,SELL,788.762449,2022-08-03 14:24:36.000,CRV,USDT


In [0]:
# Loading traders data into MSSQL database
db_crypto_data = golden_crypto.select(F.col("user_id"), 
                                      F.col("action"), 
                                      F.col("size"), 
                                      F.col("symbol_from"), 
                                      F.col("symbol_to"), 
                                      F.col("time_exchange"))



database = dbutils.secrets.get(scope = "database", key = "name")
table = "dbo.crypto_data"
user = dbutils.secrets.get(scope = "username", key = "usr")
password  = dbutils.secrets.get(scope = "mssql", key = "password")
server_name = dbutils.secrets.get(scope = "server", key = "name")


db_crypto_data.write.mode("append") \
    .format("jdbc") \
    .option("url", f"jdbc:sqlserver://{server_name};databaseName={database};") \
    .option("dbtable", table) \
    .option("user", user) \
    .option("password", password) \
    .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
    .save()

In [0]:
# Extract (bronze) a directory of all cryptocurrency names
def via_api_extract_crypto_names():
    url = "https://rest.coinapi.io/v1/assets"
    headers = {"X-CoinAPI-Key" : dbutils.secrets.get(scope = "crypto_key", key = "key_api")}
    response = requests.get(url, headers=headers)
    data = response.json()
    bucket_name = "crypto-currency-data-prosimplee"
    file_name = "crypto/catalog/stag/crypto_catalog.json"
    s3 = boto3.resource("s3")
    try:
        s3.Bucket(bucket_name).put_object(Key=file_name, Body=json.dumps(data), ACL="private")
        print("Success")
    except ValueError:
        print("Extract Crypto Names via Api: FAILED!")
    
via_api_extract_crypto_names()

In [0]:
# Extract (parquet) a directory of all cryptocurrency names
def from_s3_extract_crypto_names():
    s3 = boto3.resource("s3")
    content_object = s3.Object("crypto-currency-data-prosimplee", "crypto/catalog/stag/crypto_catalog.json")
    try:
        file_content = content_object.get()["Body"].read().decode("utf-8")
        json_content = json.loads(file_content)
        crypto_names = []
        for cr_n in json_content:
            try:
                dictionary_crypto = {"symbol_id" : cr_n["asset_id"], 
                                     "symbol_name" : cr_n["name"]}
                crypto_names.append(dictionary_crypto)
            except ValueError:
                print("Crypto Name ValueError!")

        crypto_name_dictionary = pd.DataFrame(crypto_names)
        bucket_name = "crypto-currency-data-prosimplee"
        file_name = "crypto/catalog/clean/crypto_catalog.parquet"
        s3 = boto3.resource("s3")
        out_buffer = BytesIO()
        try:
            crypto_name_dictionary.to_parquet(out_buffer, index=False)   
            s3.Bucket(bucket_name).put_object(Key=file_name, Body=out_buffer.getvalue(), ACL="private")
        except ValueError:
            print("Parquet Crypto Names (silver) values into S3: FAILED!")                                                        
    except ValueError:
        print("Connection to S3 (crypto names): FAILED!")
        
from_s3_extract_crypto_names()

In [0]:
# Create Table (crypto_names)
crypto_names = spark.read\
.option("inferSchema", True)\
.parquet("s3a://crypto-currency-data-prosimplee/crypto/catalog/clean/crypto_catalog.parquet")\
.createOrReplaceTempView("crypto_names")

cr_names_table = spark.table("crypto_names")

cr_names_table.limit(10).toPandas()


Unnamed: 0,symbol_id,symbol_name
0,USD,US Dollar
1,BTC,Bitcoin
2,PLN,Zloty
3,EUR,Euro
4,CNY,Yuan Renminbi
5,JPY,Yen
6,AUD,Australian Dollar
7,CHF,Swiss Franc
8,SEK,Swedish Krona
9,GBP,Pound Sterling


In [0]:
# Search for the average sale size of cryptocurrencies
avg_size_sell = golden_crypto \
    .groupBy(F.col("symbol_from"), F.col("symbol_to"), F.col("action")) \
    .agg(F.avg(F.col("size")).alias("avg_size")) \
    .where(F.col("action") == "SELL")

avg_size_sell.limit(10).toPandas()

Unnamed: 0,symbol_from,symbol_to,action,avg_size
0,BCH,USDT,SELL,1.645177
1,JST,USDT,SELL,1464.9219
2,ROOM,ETH,SELL,702.095
3,FTM,USDT,SELL,43.562683
4,ETH,USDT,SELL,97.0
5,ATOM,USDC,SELL,0.011
6,MKR,ETH,SELL,0.0001
7,CRV,USDT,SELL,397.094461
8,WE,USDT,SELL,0.176183
9,LEOS,USDT,SELL,207.0


In [0]:
result_sell = avg_size_sell.join(cr_names_table, avg_size_sell.symbol_from == cr_names_table.symbol_id, how = "inner").select(F.col("symbol_from"), 
                                                                                                                              F.col("symbol_name").alias("symbol_from_name"), 
                                                                                                                              F.col("symbol_to"), 
                                                                                                                              F.col("avg_size"))
sell_df = result_sell.join(cr_names_table, avg_size_sell.symbol_to == cr_names_table.symbol_id, how = "inner").select(F.col("symbol_from"),
                                                                                                                      F.col("symbol_from_name"), 
                                                                                                                      F.col("symbol_to"),
                                                                                                                      F.col("symbol_name").alias("symbol_to_name"), 
                                                                                                                      F.col("avg_size"))
sell_df.limit(10).toPandas()

Unnamed: 0,symbol_from,symbol_from_name,symbol_to,symbol_to_name,avg_size
0,BTC,Bitcoin,USDT,Tether,0.003
1,ETH,Ethereum,USDT,Tether,97.0
2,ATOM,Cosmos,USDC,USDC,0.011
3,NMR,Numeraire,BUSD,Binance USD,0.52
4,BCH,Bitcoin Cash,USDT,Tether,1.645177
5,MKR,Maker,ETH,Ethereum,0.0001
6,FTM,FTM,USDT,Tether,43.562683
7,CRV,CRV,USDT,Tether,397.094461
8,JST,JST,USDT,Tether,1464.9219
9,FLOW,FLOW,USDC,USDC,0.04


In [0]:
# The received data is written to the S3 bucket
sell_df.write \
 .mode("OVERWRITE") \
 .option("header","true") \
 .parquet("s3a://crypto-currency-data-prosimplee/crypto/data/gold/sell_" + str(date.today()) + "_crypto_.parquet")

In [0]:
# Search for the average buy size of cryptocurrencies
avg_size_buy = golden_crypto \
    .groupBy(F.col("symbol_from"), F.col("symbol_to"), F.col("action")) \
    .agg(F.avg(F.col("size")).alias("avg_size")) \
    .where(F.col("action") == "BUY")

avg_size_buy.limit(10).toPandas()

Unnamed: 0,symbol_from,symbol_to,action,avg_size
0,GF,USDT,BUY,369.935769
1,LAT,USDT,BUY,484.1335
2,USDC,USDT,BUY,14.920218
3,CELR,USDC,BUY,5.0
4,CRV,USDT,BUY,324.40915
5,CRO,BTC,BUY,0.05
6,WE,USDT,BUY,0.085414
7,CYCLUB,USDT,BUY,161.0002
8,LDO,USDT,BUY,67.32
9,BCH,USDT,BUY,2.136763


In [0]:
avg_size_buy = avg_size_buy.join(cr_names_table, avg_size_buy.symbol_from == cr_names_table.symbol_id, how = "inner").select(F.col("symbol_from"), 
                                                                                                                             F.col("symbol_name").alias("symbol_from_name"), 
                                                                                                                             F.col("symbol_to"), 
                                                                                                                             F.col("avg_size"))
buy_df = avg_size_buy.join(cr_names_table, avg_size_buy.symbol_to == cr_names_table.symbol_id, how = "inner").select(F.col("symbol_from"),
                                                                                                                     F.col("symbol_from_name"), 
                                                                                                                     F.col("symbol_to"),
                                                                                                                     F.col("symbol_name").alias("symbol_to_name"), 
                                                                                                                     F.col("avg_size"))
buy_df.limit(10).toPandas()

Unnamed: 0,symbol_from,symbol_from_name,symbol_to,symbol_to_name,avg_size
0,ETH,Ethereum,USDT,Tether,0.119
1,KNC,Kyber Network,USDT,Tether,20.26
2,BCH,Bitcoin Cash,USDT,Tether,2.136763
3,LAT,LAT,USDT,Tether,484.1335
4,THETA,Theta Token,USDC,USDC,0.846988
5,ACH,ACH,USDT,Tether,1854.0
6,USDC,USDC,USDT,Tether,14.920218
7,CRO,Crypto.com Chain,BTC,Bitcoin,0.05
8,CRV,CRV,BTC,Bitcoin,10.131
9,CRV,CRV,USDT,Tether,324.40915


In [0]:
# The received data is written to the S3 bucket
avg_size_buy.write \
 .mode("OVERWRITE") \
 .option("header","true") \
 .parquet("s3a://crypto-currency-data-prosimplee/crypto/data/gold/buy_" + str(date.today()) + "_crypto_.parquet")