#####1. Define mount configs using service principal

In [0]:
# storage_account_name = "not shown here"
# container_name = "not shown here"
# mount_point = "not shown here"
# client_id = "not shown here"
# tenant_id = "not shown here"
# client_secret ="not shown here"

In [0]:
# configs = {"fs.azure.account.auth.type": "OAuth",
#         "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
#         "fs.azure.account.oauth2.client.id": f"{client_id}",
#         "fs.azure.account.oauth2.client.secret": f"{client_secret}",
#         "fs.azure.account.oauth2.client.endpoint": f"https://login.microsoftonline.com/{tenant_id}/oauth2/token"}

#####2. Mount the container

In [0]:
# dbutils.fs.mount(
#   source = f"abfss://{container_name}@{storage_account_name}.dfs.core.windows.net/",
#   mount_point = f"/mnt/{mount_point}",
#   extra_configs = configs)

In [0]:
# dbutils.fs.refreshMounts()

#####3. List contents of the mount point

In [0]:
# %fs
# ls mnt/cryptotrader/trading_logging/

#####4. Unmount /mnt/files directory

In [0]:
#%fs
#unmount /mnt/files

In [0]:
#%fs mounts

#####5. [Bronze Layer] COPY INTO: Incremental Ingestion in batch daily

In [0]:
%sql
CREATE TABLE IF NOT EXISTS logging_raw(
    logging STRING,
    account STRING, 
    date_end date
) USING DELTA;

In [0]:
%sql
COPY INTO logging_raw
FROM (
  SELECT value as logging, 
        regexp_extract(_metadata.file_path, r"/mainac/([^/]+)/", 1) as account,
        to_date(regexp_extract(_metadata.file_path, r"/(\d{8})\.log$", 1), 'yyyyMMdd') as date_end
  FROM 'dbfs:/mnt/cryptotrader/trading_logging/skinnydew'
)
FILEFORMAT = TEXT
PATTERN = '*/*/*/*.log'
COPY_OPTIONS ('mergeSchema' = 'true');

num_affected_rows,num_inserted_rows,num_skipped_corrupt_files
9579,9579,0


In [0]:
%sql
select * from logging_raw 
order by account, date_end desc
LIMIT 20;

logging,account,date_end
"2024-06-06 23:55:05,800: INFO - Using Bybit",ML0001,2024-06-06
"2024-06-07 00:00:22,172: INFO - ------Saved records------",ML0001,2024-06-06
"2024-06-06 23:55:05,800: INFO - Sandbox: False",ML0001,2024-06-06
"2024-06-06 23:55:08,635: INFO - Account [realtrade][ML0001_prod_azure_vm] begins trading",ML0001,2024-06-06
"2024-06-06 23:55:08,635: INFO - Start trading",ML0001,2024-06-06
"2024-06-06 23:55:10,482: INFO - Current Leverage: 3.0x",ML0001,2024-06-06
"2024-06-06 23:55:10,482: INFO - Get most recent data for RUNE/USDT:USDT",ML0001,2024-06-06
"2024-06-07 00:00:01,453: INFO - defining strategy...",ML0001,2024-06-06
"2024-06-07 00:00:14,531: INFO - USDT amount: 1342.11683891",ML0001,2024-06-06
"2024-06-07 00:00:14,609: INFO - Bid price: 6.017",ML0001,2024-06-06


#####6. [Silver Layer] Output first record of each account daily 

In [0]:
%sql
CREATE OR REPLACE TABLE account_records
USING delta AS
WITH extracted_logging AS (
    SELECT 
        account,
        date_end,
        to_timestamp(regexp_extract(logging, r"^([^,]+)", 1), "yyyy-MM-dd HH:mm:ss") as update_time,
        regexp_extract(logging, r'INFO - USDT amount: ([0-9]+\.[0-9]+)', 1) AS usdt_amount
    FROM logging_raw
    WHERE logging LIKE '%INFO - USDT amount:%'
),
ranked_logging AS (
    SELECT 
        account,
        date_end,
        update_time,
        usdt_amount,
        ROW_NUMBER() OVER (PARTITION BY account, date_end ORDER BY update_time) AS row_num
    FROM extracted_logging
)
SELECT 
    date_end,
    account,
    update_time,
    usdt_amount,
    row_num
FROM ranked_logging
WHERE row_num = 1
ORDER BY date_end, account, update_time

num_affected_rows,num_inserted_rows


In [0]:
# Read the Delta table into a DataFrame
account_records_df = spark.read.table("account_records")

In [0]:
display(account_records_df)

date_end,account,update_time,usdt_amount,row_num
2024-05-03,ML0001,2024-05-04T00:00:00Z,3999.93716499,1
2024-05-04,ML0001,2024-05-05T00:00:00Z,3971.69011612,1
2024-05-05,ML0001,2024-05-06T00:00:01Z,4009.15950378,1
2024-05-05,ML0002,2024-05-05T12:30:41Z,3999.92099743,1
2024-05-05,ML0003,2024-05-05T14:36:01Z,100.0,1
2024-05-06,ML0001,2024-05-07T00:00:01Z,3898.98112158,1
2024-05-06,ML0002,2024-05-07T00:00:01Z,4038.57959257,1
2024-05-06,ML0003,2024-05-07T00:00:01Z,4026.148304,1
2024-05-06,ML0004,2024-05-06T13:11:01Z,1272.9092,1
2024-05-07,ML0001,2024-05-08T00:00:00Z,3958.14142712,1


#####7. Pivot the table for each account and total equity time series

In [0]:
from pyspark.sql.functions import sum as spark_sum, col, round
from pyspark.sql.types import FloatType

In [0]:
# Pivot the data
pivot_df = account_records_df.groupBy('date_end').pivot('account').agg(spark_sum('usdt_amount')).fillna(0).orderBy('date_end')

In [0]:
# Calculate the total daily equity, summing up all USDT amounts
total_equity_df = pivot_df.withColumn('USDT_Sum', sum(pivot_df[col] for col in pivot_df.columns if col != 'date_end'))

In [0]:
display(total_equity_df)

date_end,ML0001,ML0002,ML0003,ML0004,USDT_Sum
2024-05-03,3999.93716499,0.0,0.0,0.0,3999.93716499
2024-05-04,3971.69011612,0.0,0.0,0.0,3971.69011612
2024-05-05,4009.15950378,3999.92099743,100.0,0.0,8109.08050121
2024-05-06,3898.98112158,4038.57959257,4026.148304,1272.9092,13236.61821815
2024-05-07,3958.14142712,3896.33282792,3922.78046537,1300.1295969,13077.38431731
2024-05-08,3638.02986578,4005.91548042,4022.84405033,1321.68321383,12988.47261036
2024-05-09,1500.00005411,3873.80826381,4088.94753034,3136.65086414,12599.4067124
2024-05-10,1414.31595301,3708.66822244,4171.77948345,3033.79865953,12328.562318429998
2024-05-11,1441.77724811,3617.21361293,4224.28725026,3050.80927273,12334.08738403
2024-05-12,1375.29964204,3604.23516625,4247.54533666,3026.65852965,12253.7386746


#####8. [Gold Layer] Create final delta table to view

In [0]:
total_equity_df = total_equity_df.withColumn("ML0001", round(col("ML0001").cast(FloatType()), 3))\
                                 .withColumn("ML0002", round(col("ML0002").cast(FloatType()), 3))\
                                 .withColumn("ML0003", round(col("ML0003").cast(FloatType()), 3))\
                                 .withColumn("ML0004", round(col("ML0004").cast(FloatType()), 3))\
                                 .withColumn("USDT_Sum", round(col("USDT_Sum").cast(FloatType()), 3))

In [0]:
total_equity_df.write.mode("overwrite").saveAsTable("equity_time_series")

In [0]:
# %sql
# DESCRIBE EXTENDED equity_time_series

#####9. [visualization] Equity Curve Time Series

In [0]:
%sql
SELECT *
FROM equity_time_series
WHERE date_end >= '2024-05-09'

date_end,ML0001,ML0002,ML0003,ML0004,USDT_Sum
2024-05-09,1500.0,3873.808,4088.948,3136.651,12599.406
2024-05-10,1414.316,3708.668,4171.779,3033.799,12328.563
2024-05-11,1441.777,3617.214,4224.287,3050.809,12334.087
2024-05-12,1375.3,3604.235,4247.545,3026.658,12253.738
2024-05-13,1445.948,3606.911,4223.893,2993.732,12270.483
2024-05-14,1374.796,3689.843,4003.218,3051.893,12119.75
2024-05-15,1556.24,3932.094,4151.029,3217.273,12856.636
2024-05-16,1598.374,3853.268,4257.894,3255.344,12964.879
2024-05-17,1696.3,4071.521,4368.205,3344.578,13480.604
2024-05-18,1700.275,4069.062,4363.777,3323.225,13456.339


Databricks visualization. Run in Databricks to view.

&copy; 2021-2023 ScholarNest Technologies Pvt. Ltd. All rights reserved.<br/>
Apache, Apache Spark, Spark and the Spark logo are trademarks of the <a href="https://www.apache.org/">Apache Software Foundation</a>.<br/>
Databricks, Databricks Cloud and the Databricks logo are trademarks of the <a href="https://www.databricks.com/">Databricks Inc</a>.<br/>
<br/>
<a href="https://www.scholarnest.com/privacy/">Privacy Policy</a> | 
<a href="https://www.scholarnest.com/terms/">Terms of Use</a> | <a href="https://www.scholarnest.com/contact/">Contact Us</a>