In [2]:
import sys
sys.path.append('/home/jovyan/work')

In [44]:
from pyspark.sql import SparkSession
import nbimporter
from utils.vault_scripts import read_root_token, get_secret_from_vault
from pyspark.sql import functions as F
from pyspark.sql.functions import explode, col, when, lit, expr
from pyspark.sql.types import LongType
import datetime
from datetime import datetime, timedelta

In [5]:
spark = SparkSession.builder.appName("ExpDataAnalysisExchangeRates").getOrCreate()

In [6]:
hadoopConf = spark._jsc.hadoopConfiguration()

In [7]:
AWS_KEY_ID = get_secret_from_vault("aws1", "keyid")
AWS_ACCESS_KEY = get_secret_from_vault("aws2", "accesskey")
AWS_S3_BUCKET = get_secret_from_vault("aws3", "s3bucket")

In [8]:
hadoopConf.set("fs.s3a.access.key", AWS_KEY_ID)
hadoopConf.set("fs.s3a.secret.key", AWS_ACCESS_KEY)
hadoopConf.set("spark.hadoop.fs.s3a.aws.credentials.provider", "org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider")

In [13]:
def read_exchange_data(bucket, layer, path, symbol):
    exchange_rate_data_path = f"s3a://{bucket}/{layer}/{path}/{symbol}/*.json"
    df = spark.read.json(exchange_rate_data_path)
    return df
    

In [14]:
def transform_exchange_data(df):
    df_exploded = df.select(
        F.explode("Data.Data").alias("data"),
        "Data.TimeFrom",
        "Data.TimeTo"
    )
    
    df_exploded = df_exploded.select(
        F.col("data.open").alias("open_price"),
        F.col("data.close").alias("close_price"), 
        F.col("data.high").alias("high_price"),
        F.col("data.low").alias("low_price"),
        F.col("data.conversionSymbol").alias("conversion_symbol"),
        F.col("data.conversionType").alias("conversion_type"), 
        F.col("data.time").alias("time"),
        F.col("data.volumefrom").alias("volume_from"),
        F.col("data.volumeto").alias("volume_to"),
        F.col("TimeFrom").alias("data_time_from"),
        F.col("TimeTo").alias("data_time_to"),
    )
    
    return df_exploded

In [34]:
def filter_exchange_data_df(transformed_df):

    start_date = datetime.datetime(2024, 7, 1)
    end_date = datetime.datetime(2024, 10, 1)

    # Convert to milliseconds
    start_timestamp = int(start_date.timestamp())
    end_timestamp = int(end_date.timestamp())

    filtered_df = transformed_df.filter(
        (transformed_df.time >= start_timestamp) & 
        (transformed_df.time <= end_timestamp)
    )

    return filtered_df

In [38]:
def save_exchange_data_to_s3(bucket, layer, path, symbol, data):

    exchange_rate_data_path = f"s3a://{bucket}/{layer}/{path}/{symbol}/"
    data.write.mode("overwrite").json(exchange_rate_data_path)
    
    print(f"Data successfully saved to: {exchange_rate_data_path}")

In [39]:
# SYMBOLS = ["ETH", "POL", "USDC", "WETH"]
SYMBOLS = ["ETH"]

In [40]:
for symbol in SYMBOLS:
    df = read_exchange_data(AWS_S3_BUCKET, "raw", "exchange_rates_hourly_usdt", symbol)
    transformed_df = transform_exchange_data(df)
    filtered_df = filter_exchange_data_df(transformed_df)
    save_exchange_data_to_s3(AWS_S3_BUCKET, "prepared", "exchange_rates_hourly", symbol, filtered_df)

Data successfully saved to: s3a://nft-framework-storage-ca3f7714-6047-47c1-940e-72b72f0c97ff/prepared/exchange_rates_hourly/ETH/


In [36]:
filtered_df.head()

Row(open_price=3165.07, close_price=3146.45, high_price=3165.76, low_price=3140.41, conversion_symbol='', conversion_type='direct', time=1722574800, volume_from=12500.54, volume_to=39420155.2, data_time_from=1722574800, data_time_to=1729774800)

In [37]:
min_max_timestamps = filtered_df.agg(
    F.from_unixtime(F.min("time")).alias("min_event_timestamp"),
    F.from_unixtime(F.max("time")).alias("max_event_timestamp")
)
min_max_timestamps.show()

+-------------------+-------------------+
|min_event_timestamp|max_event_timestamp|
+-------------------+-------------------+
|2024-07-01 00:00:00|2024-10-01 00:00:00|
+-------------------+-------------------+



In [41]:
path = f"s3a://{AWS_S3_BUCKET}/prepared/exchange_rates_hourly/ETH/*.json"
df = spark.read.json(path)

In [42]:
df.printSchema()

root
 |-- close_price: double (nullable = true)
 |-- conversion_symbol: string (nullable = true)
 |-- conversion_type: string (nullable = true)
 |-- data_time_from: long (nullable = true)
 |-- data_time_to: long (nullable = true)
 |-- high_price: double (nullable = true)
 |-- low_price: double (nullable = true)
 |-- open_price: double (nullable = true)
 |-- time: long (nullable = true)
 |-- volume_from: double (nullable = true)
 |-- volume_to: double (nullable = true)



In [46]:
filtered_df.head()

Row(open_price=3165.07, close_price=3146.45, high_price=3165.76, low_price=3140.41, conversion_symbol='', conversion_type='direct', time=1722574800, volume_from=12500.54, volume_to=39420155.2, data_time_from=1722574800, data_time_to=1729774800)

In [48]:
# Sample checks - > records by each hour
# TODO

df = filtered_df

start_date = datetime(2024, 7, 1)
end_date = datetime(2024, 10, 1)

start_timestamp = int(start_date.timestamp())
end_timestamp = int(end_date.timestamp())

total_hours = (end_timestamp - start_timestamp) // 3600

# Check if the DataFrame contains the expected number of records
actual_hours = df.filter((df.time >= start_timestamp) & (df.time <= end_timestamp)).count()

if actual_hours == total_hours:
    print("All hourly records are present.")
else:
    print(f"Missing records: Expected {total_hours} records, but found {actual_hours} records.")


Missing records: Expected 2208 records, but found 2210 records.


In [None]:
# Check for duplicate records based on `data_time_from` and `data_time_to`
df_duplicates = df.groupBy("data_time_from", "data_time_to").count().filter("count > 1")

duplicate_count = df_duplicates.count()
if duplicate_count == 0:
    print("No duplicate records found.")
else:
    print(f"Found {duplicate_count} duplicate records.")


In [None]:
# List of critical columns to check for missing values
critical_columns = ["open_price", "close_price", "high_price", "low_price", 
                    "conversion_symbol", "conversion_type", "volume_from", "volume_to"]

# Check for rows with null values in critical columns
missing_values_df = df.filter(
    F.reduce(
        lambda a, b: a | b,
        [F.col(col).isNull() | (F.col(col) == '') for col in critical_columns]
    )
)

missing_values_count = missing_values_df.count()
if missing_values_count == 0:
    print("No missing values in critical columns.")
else:
    print(f"Found {missing_values_count} rows with missing values in critical columns.")
