In [None]:
from pyspark.sql.functions import col, date_format, col, lag, when, mean
from pyspark.sql.window import Window
import os

storage_account_name = os.environ.get("storage_account_name")
storage_account_key = os.environ.get("storage_account_key")
container_name = os.environ.get("container_name")

symbols = [
    "BTCUSDT",
    "ETHUSDT",
    "BNBUSDT",
    "ETHBTC",
    "BNBETH",
    "BNBBTC",
]


def calculate_ema(df, column, period):
    alpha = 2 / (period + 1)
    ema = df.withColumn("prev_ema", lag(column, 1).over(Window.orderBy("Close time")))
    ema = ema.withColumn(
        "ema",
        when(col("prev_ema").isNull(), col(column)).otherwise(
            alpha * col("Close") + (1 - alpha) * col("prev_ema")
        ),
    )
    return ema


def calculate_rsi(df, column, period):
    delta = col(column) - lag(column, 1).over(Window.orderBy("Close time"))
    gain = when(delta > 0, delta).otherwise(0)
    loss = when(delta < 0, -delta).otherwise(0)
    avg_gain = mean(gain).over(
        Window.partitionBy().orderBy("Close time").rowsBetween(-period, 0)
    )
    avg_loss = mean(loss).over(
        Window.partitionBy().orderBy("Close time").rowsBetween(-period, 0)
    )
    rs = avg_gain / avg_loss
    rsi = 100 - (100 / (1 + rs))
    return df.withColumn("rsi", rsi)

In [None]:
symbols_df = None
abfss_url = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/"

for symbol in symbols:
    blob_name = f"layers/silver/{symbol}.parquet"
    df = spark.read.parquet(f"{abfss_url}/{blob_name}")
    df = calculate_ema(df, "Close", 8)
    df = calculate_rsi(df, "ema", 14)

    rsi = df.select("Close time", "rsi").withColumnRenamed("rsi", f"{symbol}")
    if symbols_df is None:
        symbols_df = df.select("Close Time")

    symbols_df = symbols_df.join(rsi, on="Close time")


symbols_df = (
    symbols_df.withColumn(
        "BTC", (col("BTCUSDT") + (100 - col("ETHBTC")) + (100 - col("BNBBTC"))) / 3
    )
    .withColumn("ETH", (col("ETHUSDT") + col("ETHBTC") + (100 - col("BNBETH"))) / 3)
    .withColumn("BNB", (col("BNBUSDT") + col("BNBETH") +  col("BNBBTC")) / 3)
    .withColumn(
        "USDT",
        ((100 - col("BTCUSDT")) + (100 - col("ETHUSDT")) + (100 - col("BNBUSDT"))) / 3,
    )
)
symbols_df = symbols_df.dropna()

In [None]:
blob_name_to_write = f"layers/gold/symbols.parquet"

symbols_df = symbols_df.withColumn(
    "Close time", date_format(col("Close time"), "yyyy-MM-dd HH:mm")
)

symbols_df.write.format("parquet").option("header", "true").mode("overwrite").save(
    f"{abfss_url}/{blob_name_to_write}"
)

display(symbols_df)

Close time,BTCUSDT,ETHUSDT,BNBUSDT,ETHBTC,BNBETH,BNBBTC,BTC,ETH,BNB,USDT
2023-11-25 23:59,65.69962294194286,76.71837028591997,32.4342324342324,94.32114882506517,1.606086221470889,4.604231974921518,55.59141404731872,89.81114429650476,43.14536222692726,41.71592477930159
2023-11-26 23:59,65.31320435748557,75.74236163563928,33.865710560625786,91.51361621279304,5.596107055961156,8.027956176803755,55.25721065596292,87.21995693082373,43.8112871465944,41.69290781541645
2023-11-27 23:59,60.36561732973657,68.47510276219421,31.042724828204328,82.80802292263618,5.484896661367316,7.615122737860446,56.64749055641332,81.93274300782103,42.97083291723707,46.705518359954965
2023-11-28 23:59,59.90100828348381,62.79869348461408,28.74930824571108,64.59544032185966,5.42452830188688,6.816359262229199,62.829736233131655,73.98986850152896,42.45249242845625,49.51699666206368
2023-11-29 23:59,63.95662881564539,64.34310463875458,31.076017130620983,56.62225705329149,5.416012558869781,6.441345862382391,66.96434196665717,71.8497830443921,43.35022794236946,46.87474980499301
2023-11-30 23:59,63.93934601650623,62.17204896189119,29.838087895142635,51.86647523330953,5.303612605688002,6.069694373036157,68.66772547005351,69.5783038631709,43.02400204259816,48.01683904215332
2023-12-01 23:59,64.8082786911256,65.23733879564408,30.53435114503813,58.751153491233495,4.9180327868853055,6.024096385542052,66.67767627145001,73.02348649999742,43.14276251546047,46.47334378939741
2023-12-02 23:59,70.6300284932065,69.50717179927517,31.801149138146386,56.559076103050046,4.353312302839171,5.214084161452462,69.61895607623467,73.90431186649535,43.64679242651104,42.68721685645732
2023-12-03 23:59,73.98068575127584,74.22445279042014,32.08955223880598,63.47111553784865,3.6977491961415296,4.694576383519177,68.60499794330268,77.99927304404243,43.697575017142775,39.90176973983268
2023-12-04 23:59,76.52862082533127,75.49404105568968,32.30845524423502,58.97676601736688,3.542094455852208,4.31384490458781,71.07933663445885,76.97623753906812,43.84556826516647,38.55629429158135


Databricks visualization. Run in Databricks to view.