In [None]:
%load_ext pycodestyle_magic
%pycodestyle_on
# %pycodestyle_off -to turn it off

In [32]:
from minio import Minio
from pyspark import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_timestamp, month, year,monotonically_increasing_id,row_number,concat, udf,lit
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType, LongType,TimestampNTZType,StringType
from pyspark.sql.window import Window
from pyspark.conf import SparkConf
import requests
import json
import pandas as pd
from binance import Client
from datetime import datetime, date
import os
from dotenv import load_dotenv, dotenv_values
from binance.helpers import date_to_milliseconds, interval_to_milliseconds
from binance.exceptions import BinanceRequestException, BinanceAPIException
from dateutil.relativedelta import relativedelta
from dateutil.parser import parse
import time
import logging
import psycopg2
load_dotenv()

True

In [2]:
API_KEY = os.getenv("API_KEY")
SECRET_KEY = os.getenv("SECRET_KEY")
MINIO_USER = os.getenv("MINIO_ROOT_USER")
MINIO_PASSWORD = os.getenv("MINIO_ROOT_PASSWORD")
client_binance = Client(API_KEY, SECRET_KEY)

In [3]:
spark = SparkSession.builder \
    .appName("CryptoETL") \
    .config("spark.jars", "/Users/hamza/Desktop/projects/postgresql-42.7.5.jar") \
    .getOrCreate()
# Get the SparkContext from the SparkSession
sc = spark.sparkContext
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", MINIO_USER)#turn into access key in the future 
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", MINIO_PASSWORD)
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://localhost:9000")
sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.attempts.maximum", "1")
sc._jsc.hadoopConfiguration().set("fs.s3a.connection.establish.timeout", "5000")
sc._jsc.hadoopConfiguration().set("fs.s3a.connection.timeout", "10000")

25/04/25 11:32:35 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [4]:
#read data for 1 crypto
#schema check/validation need to do this, create spark df using schema struct
#implement transformations
#add unique id for crypto,price,time
#split into 3 tables
#upload to postgres db

In [None]:
logger = logging.getLogger(__name__)
logger.setLevel(logging.DEBUG) #set to debug to capture all levels
if logger.hasHandlers():
    logger.handlers.clear()
logger.propagate = False

In [None]:
handler = logging.StreamHandler()
handler.setLevel(logging.DEBUG)
logger.addHandler(handler)

In [4]:
cryptos = ['BTCUSDT','ETHUSDT','LTCUSDT','BNBUSDT','DOGEUSDT'] #consider improving maintainability
client_minio = Minio(
        "localhost:9000",  # Make sure you're using port 9000 for the S3 API
        #minio_url,
        access_key = MINIO_USER,
        secret_key = MINIO_PASSWORD,
        secure=False  # Disable SSL if you're not using SSL certificates
        )

25/04/25 11:32:48 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [8]:
schema = StructType([\
                    StructField(name = 'datetime',dataType = TimestampNTZType(),nullable = False), \
                    StructField(name= 'Open Price',dataType = LongType(),nullable =False), \
                    StructField(name= 'Close Price',dataType = LongType(),nullable =False), \
                    StructField(name= 'Volume',dataType = LongType(),nullable = False)\
                                ])

In [51]:
def parquet_to_df(client,crypto,schema):
    #read from parquet from minio and combines into dataframe
    objects = client.list_objects("binancedata", prefix=crypto, recursive=True)
    filenames = [obj.object_name for obj in objects]
    filenames = [f for f in filenames if "_SUCCESS" not in f]
    df = spark.createDataFrame(data = [],schema = schema)
    for file in filenames:
        df_parquet = spark.read.parquet(f"s3a://binancedata/{file}")
        df = df.union(df_parquet)
    return df

In [None]:
def data_cleaning():
    pass
    #drop duplicates
    #convert data types
    #drop null values

In [29]:
read_sql = "SELECT * FROM crypto"

df_crypto = spark.read \
    .format("jdbc") \
    .option("url", "jdbc:postgresql://localhost:5432/crypto") \
    .option("user", "postgres") \
    .option("password", "postgres") \
    .option("query", read_sql)\
    .option("driver", "org.postgresql.Driver")\
    .load()

In [39]:
def add_crypto_id(df_crypto,crypto,currency):
    df_crypto = df_crypto.withColumn("trading pair", concat(df_crypto.ticker, lit(currency)))
    #obtain the crypto from concatenating currency and ticker
    df_crypto = df_crypto.filter(col("trading pair") == crypto)
    crypto_id = df_crypto.collect()[0]['id']
    df_id = df.withColumn("crypto_id",lit(crypto_id))
    return df_id

In [45]:
def generate_time_id(dt_value):
    #hard coded
    ts = dt_value.strftime("%Y%m")
    return int(ts)

In [43]:
def add_time_id(generate_time_id,df):
    dt_udf = udf(generate_time_id,IntegerType())
    df_with_udf = df.withColumn("time_id", dt_udf(df["datetime"]))
    #df_time = df_time.withColumnRenamed('time_id','id')
    return df_with_udf

In [71]:
def upload_time(df):
    #need to futureproof
    df_year = df.withColumn("year", year(df["datetime"]))
    df_month = df_year.withColumn("month", month(df_year["datetime"]))
    df_time_filtered = df_month.select(['time_id','datetime','year','month'])
    df_time = df_time_filtered.withColumnRenamed('time_id','id')
    try:
        df_time.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/crypto") \
        .option("dbtable", "time") \
        .option("user", "postgres") \
        .option("password", "postgres") \
        .option("driver", "org.postgresql.Driver") \
        .mode("append")\
        .save()
        print("successfully uploaded to time table")
    except Exception as e:
        #logger.error(e,stack_info=True,exc_info=True)
        print(e)

In [52]:
df = parquet_to_df(client_minio,"BNBUSDT",schema)
df_id = add_crypto_id(df_crypto,"BNBUSDT","USDT")
df_time_id =add_time_id(generate_time_id,df_id)

In [75]:
df_time_id.show(5)

+-------------------+-----------+-----------+------------------+---------+-------+
|           datetime| Open Price|Close Price|            Volume|crypto_id|time_id|
+-------------------+-----------+-----------+------------------+---------+-------+
|2019-01-01 00:00:00| 6.11390000| 6.22000000| 72126339.77000000|        5| 201901|
|2019-10-01 01:00:00|15.84930000|19.90990000| 53481014.91000000|        5| 201910|
|2019-11-01 00:00:00|19.90870000|15.71180000| 46441747.10000000|        5| 201911|
|2019-12-01 00:00:00|15.70300000|13.71610000| 38189582.55000000|        5| 201912|
|2019-02-01 00:00:00| 6.22500000|10.27790000|110685041.86000000|        5| 201902|
+-------------------+-----------+-----------+------------------+---------+-------+
only showing top 5 rows



In [86]:
upload_time(df_time_id)

successfully uploaded to time table


                                                                                

In [84]:
def upload_price(df):
    df_filtered = df.select(['crypto_id','time_id','Open Price','Close Price','Volume'])
    df_rename = df_filtered.withColumnsRenamed({'Open Price':'open',
                            'Close Price':'close',
                            'Volume':'volume'}
                           )
    df_rename = df_rename.withColumn("open",df_rename.open.cast(IntegerType()))
    df_rename = df_rename.withColumn("close",df_rename.close.cast(IntegerType()))
    df_rename = df_rename.withColumn("volume",df_rename.volume.cast(IntegerType()))
    
    try:
        df_rename.write \
        .format("jdbc") \
        .option("url", "jdbc:postgresql://localhost:5432/crypto") \
        .option("dbtable", "price") \
        .option("user", "postgres") \
        .option("password", "postgres") \
        .option("driver", "org.postgresql.Driver") \
        .mode("append")\
        .save()
        print("successfully uploaded to price table")
    except Exception as e:
        #logger.error(e,stack_info=True,exc_info=True)
        print(e)
    

In [87]:
upload_price(df_time_id)

25/04/25 15:33:13 WARN DAGScheduler: Broadcasting large task binary with size 1000.5 KiB


successfully uploaded to price table


                                                                                

In [None]:
#apply functional programming
#time is universal so dont need to readd
