1. Azure Storage와 Databricks 마운트 

In [0]:
dbutils.fs.unmount("/mnt/my-mount")

/mnt/my-mount has been unmounted.


True

In [0]:
service_credential = dbutils.secrets.get(scope="analyticssecretscope",key="analyticsecret")

spark.conf.set("fs.azure.account.auth.type.candlestickstr.dfs.core.windows.net", "OAuth")
spark.conf.set("fs.azure.account.oauth.provider.type.candlestickstr.dfs.core.windows.net", "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider")
spark.conf.set("fs.azure.account.oauth2.client.id.candlestickstr.dfs.core.windows.net", "682f15f2-5550-44d1-a25b-51fec5774443")
spark.conf.set("fs.azure.account.oauth2.client.secret.candlestickstr.dfs.core.windows.net", service_credential)
spark.conf.set("fs.azure.account.oauth2.client.endpoint.candlestickstr.dfs.core.windows.net", "https://login.microsoftonline.com/785087ba-1e72-4e7d-b1d1-4a9639137a66/oauth2/token")

print("CONNECTION OK")

CONNECTION OK


In [0]:
configs = {"fs.azure.account.auth.type": "OAuth",
          "fs.azure.account.oauth.provider.type": "org.apache.hadoop.fs.azurebfs.oauth2.ClientCredsTokenProvider",
          "fs.azure.account.oauth2.client.id": "682f15f2-5550-44d1-a25b-51fec5774443",
          "fs.azure.account.oauth2.client.secret": dbutils.secrets.get(scope="analyticssecretscope",key="analyticsecret"),
          "fs.azure.account.oauth2.client.endpoint": "https://login.microsoftonline.com/785087ba-1e72-4e7d-b1d1-4a9639137a66/oauth2/token"}

dbutils.fs.unmount('/mnt/my-mount')
dbutils.fs.mount(
  source = "abfss://candlestick2024@candlestickstr.dfs.core.windows.net/",
  mount_point = "/mnt/my-mount",
  extra_configs = configs)

/mnt/my-mount has been unmounted.


True

In [0]:
files = dbutils.fs.ls("/mnt/my-mount")
print(files)

[FileInfo(path='dbfs:/mnt/my-mount/KRW-BTC/', name='KRW-BTC/', size=0, modificationTime=1750155851000), FileInfo(path='dbfs:/mnt/my-mount/KRW-ETH/', name='KRW-ETH/', size=0, modificationTime=1750155851000), FileInfo(path='dbfs:/mnt/my-mount/KRW-XRP/', name='KRW-XRP/', size=0, modificationTime=1750155851000)]


2. bronze, silver, gold 스키마 생성

In [0]:
%sql
CREATE SCHEMA IF NOT EXISTS bronze;
CREATE SCHEMA IF NOT EXISTS silver;
CREATE SCHEMA IF NOT EXISTS gold;

3. market별로 bronze table 생성

In [0]:
btc_df = spark.read.option("multiline", "true").json("/mnt/my-mount/KRW-BTC/*.json")

btc_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("bronze.bronze_krw_btc")

In [0]:
eth_df = spark.read.option("multiline", "true").json("/mnt/my-mount/KRW-ETH/*.json")

eth_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("bronze.bronze_krw_eth")

In [0]:
xrp_df = spark.read.option("multiline", "true").json("/mnt/my-mount/KRW-XRP/*.json")

xrp_df.write.format("delta") \
    .mode("overwrite") \
    .saveAsTable("bronze.bronze_krw_xrp")

In [0]:
# 방법 1: DELETE SQL 사용
spark.sql("DELETE FROM silver.silver_krw_xrp")

# 또는 방법 2: DataFrame API 사용 (필터 없이 전부 삭제)
from delta.tables import DeltaTable

delta_table = DeltaTable.forName(spark, "silver.silver_krw_xrp")
delta_table.delete()  # 조건 없으면 전체 삭제


In [0]:
%sql

select * from bronze.bronze_krw_xrp;

4. market별로 silver table 생성

In [0]:
from pyspark.sql.functions import col, to_date, year, month, dayofmonth, current_timestamp

bronze_df = spark.table("bronze.bronze_krw_btc")

silver_df = bronze_df.withColumn("date", to_date(col("candle_date_time_kst"))) \
    .withColumn("volume", col("candle_acc_trade_volume")) \
    .withColumn("value", col("candle_acc_trade_price")) \
    .withColumn("year", year(to_date(col("candle_date_time_kst")))) \
    .withColumn("month", month(to_date(col("candle_date_time_kst")))) \
    .withColumn("day", dayofmonth(to_date(col("candle_date_time_kst")))) \
    .withColumn("load_timestamp", current_timestamp()) \
    .select(
        "market", "date", "year", "month", "day",
        "opening_price", "high_price", "low_price", "trade_price",
        "prev_closing_price", "change_price", "change_rate",
        "volume", "value", "load_timestamp"
    )

silver_df.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("market", "year", "month") \
    .saveAsTable("silver.silver_krw_btc")


In [0]:
%sql
select * from silver.silver_krw_btc;

market,date,year,month,day,opening_price,high_price,low_price,trade_price,prev_closing_price,change_price,change_rate,volume,value,load_timestamp
KRW-BTC,2024-01-01,2024,1,1,57045000.0,58847000.0,57045000.0,58839000.0,57047000.0,1792000.0,0.0314126948,3165.32092908,182491217914.30185,2025-06-17T10:30:25.353Z
KRW-BTC,2024-01-02,2024,1,2,58839000.0,60997000.0,58814000.0,60206000.0,58839000.0,1367000.0,0.0232328897,8221.19868374,495038780463.7768,2025-06-17T10:30:25.353Z
KRW-BTC,2023-12-31,2023,12,31,56639000.0,57437000.0,56452000.0,57047000.0,56639000.0,408000.0,0.007203517,2308.38316024,131519480295.09232,2025-06-17T10:30:25.353Z
KRW-BTC,2024-01-03,2024,1,3,60205000.0,60799000.0,54346000.0,58581000.0,60206000.0,-1625000.0,-0.0269906654,16310.30692586,945841183535.25,2025-06-17T10:30:25.353Z


In [0]:
bronze_df = spark.table("bronze.bronze_krw_eth")

silver_df = bronze_df.withColumn("date", to_date(col("candle_date_time_kst"))) \
    .withColumn("volume", col("candle_acc_trade_volume")) \
    .withColumn("value", col("candle_acc_trade_price")) \
    .withColumn("year", year(to_date(col("candle_date_time_kst")))) \
    .withColumn("month", month(to_date(col("candle_date_time_kst")))) \
    .withColumn("day", dayofmonth(to_date(col("candle_date_time_kst")))) \
    .withColumn("load_timestamp", current_timestamp()) \
    .select(
        "market", "date", "year", "month", "day",
        "opening_price", "high_price", "low_price", "trade_price",
        "prev_closing_price", "change_price", "change_rate",
        "volume", "value", "load_timestamp"
    )

silver_df.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("market", "year", "month") \
    .saveAsTable("silver.silver_krw_eth")

In [0]:
%sql
select * from silver.silver_krw_eth;

market,date,year,month,day,opening_price,high_price,low_price,trade_price,prev_closing_price,change_price,change_rate,volume,value,load_timestamp
KRW-ETH,2024-01-03,2024,1,3,3154000.0,3190000.0,2851000.0,3024000.0,3154000.0,-130000.0,-0.0412175016,57149.37569621,173530569460.93912,2025-06-17T10:30:36.061Z
KRW-ETH,2024-01-02,2024,1,2,3134000.0,3230000.0,3128000.0,3154000.0,3136000.0,18000.0,0.0057397959,29124.21085963,92574730778.70412,2025-06-17T10:30:36.061Z
KRW-ETH,2024-01-01,2024,1,1,3082000.0,3140000.0,3070000.0,3136000.0,3082000.0,54000.0,0.0175210902,11918.79208026,36923910457.09838,2025-06-17T10:30:36.061Z
KRW-ETH,2023-12-31,2023,12,31,3081000.0,3109000.0,3060000.0,3082000.0,3081000.0,1000.0,0.0003245699,12170.32394322,37497055063.56301,2025-06-17T10:30:36.061Z


In [0]:
bronze_df = spark.table("bronze.bronze_krw_xrp")

silver_df = bronze_df.withColumn("date", to_date(col("candle_date_time_kst"))) \
    .withColumn("volume", col("candle_acc_trade_volume")) \
    .withColumn("value", col("candle_acc_trade_price")) \
    .withColumn("year", year(to_date(col("candle_date_time_kst")))) \
    .withColumn("month", month(to_date(col("candle_date_time_kst")))) \
    .withColumn("day", dayofmonth(to_date(col("candle_date_time_kst")))) \
    .withColumn("load_timestamp", current_timestamp()) \
    .select(
        "market", "date", "year", "month", "day",
        "opening_price", "high_price", "low_price", "trade_price",
        "prev_closing_price", "change_price", "change_rate",
        "volume", "value", "load_timestamp"
    )

silver_df.write.format("delta") \
    .mode("overwrite") \
    .partitionBy("market", "year", "month") \
    .saveAsTable("silver.silver_krw_xrp")

In [0]:
%sql
select * from silver.silver_krw_xrp;

market,date,year,month,day,opening_price,high_price,low_price,trade_price,prev_closing_price,change_price,change_rate,volume,value,load_timestamp
KRW-XRP,2023-12-31,2023,12,31,835.0,841.0,826.0,832.0,835.0,-3.0,-0.0035928144,99746154.07295448,83199980794.70201,2025-06-17T10:31:03.667Z
KRW-XRP,2024-01-03,2024,1,3,838.0,854.0,700.0,796.0,838.0,-42.0,-0.0501193317,692341035.2517552,547795948799.3515,2025-06-17T10:31:03.667Z
KRW-XRP,2024-01-02,2024,1,2,839.0,852.0,834.0,838.0,838.0,,0.0,202232664.20174855,170155409776.905,2025-06-17T10:31:03.667Z
KRW-XRP,2024-01-01,2024,1,1,832.0,840.0,825.0,838.0,832.0,6.0,0.0072115385,109778469.832643,91297902557.62538,2025-06-17T10:31:03.667Z


5. Bronze 테이블 증분 데이터 코드 

In [0]:

# bronze 계층 증분 데이터 로드
from pyspark.sql.functions import col, to_date

execution_date = "2024-01-04"

for market in ["KRW-BTC", "KRW-ETH", "KRW-XRP"]:
    table_name = f"bronze.bronze_{market.lower().replace('-', '_')}"
    json_path = f"/mnt/my-mount/{market}/{market}-{execution_date}.json"

    print(table_name)
    print(json_path)

    try:
        # 원본 JSON 읽기
        bronze_df = spark.read.option("multiline", "true").json(json_path)
        bronze_df.show(truncate=False)
        print("*" * 100)

        # 날짜 존재 여부 확인
        is_exist = (
            spark.table(table_name)
            .filter(to_date(col("candle_date_time_kst")) == execution_date)
            .limit(1)
            .count()
            > 0
        )

        if is_exist:
            print(f"✅ {market} - 날짜 {execution_date} 삭제 후 append")

            # 먼저 해당 날짜 삭제
            spark.sql(
                f"""
                DELETE FROM {table_name}
                WHERE to_date(candle_date_time_kst) = DATE('{execution_date}')
                """
            )

        else:
            print(f"🆕 {market} - 날짜 {execution_date} append")

        # 공통: 데이터 추가
        bronze_df.write.format("delta").mode("append").saveAsTable(table_name)

    except Exception as e:
        print(f"❌ {market} 처리 실패: {e}")


bronze.bronze_krw_btc
/mnt/my-mount/KRW-BTC/KRW-BTC-2024-01-04.json
+----------------------+-----------------------+--------------------+--------------------+------------+------------+----------+---------+-------+-------------+------------------+-------------+-----------+
|candle_acc_trade_price|candle_acc_trade_volume|candle_date_time_kst|candle_date_time_utc|change_price|change_rate |high_price|low_price|market |opening_price|prev_closing_price|timestamp    |trade_price|
+----------------------+-----------------------+--------------------+--------------------+------------+------------+----------+---------+-------+-------------+------------------+-------------+-----------+
|3.660256480668456E11  |6254.53768656          |2024-01-04T09:00:00 |2024-01-04T00:00:00 |896000.0    |0.0152950615|6.0162E7  |5.7319E7 |KRW-BTC|5.8581E7     |5.8581E7          |1704412798132|5.9477E7   |
+----------------------+-----------------------+--------------------+--------------------+------------+---------

6. Silver 테이블 증분 데이터 코드

In [0]:
# silver 계층 증분 데이터 로드

from pyspark.sql.functions import col, to_date, year, month, dayofmonth, current_timestamp

execution_date = "2024-01-04"

for market in ["KRW-BTC", "KRW-ETH", "KRW-XRP"]:
    bronze_table = f"bronze.bronze_{market.lower().replace('-', '_')}"
    silver_table = f"silver.silver_{market.lower().replace('-', '_')}"
    
    # ✅ Step 1. Bronze에서 해당 날짜 데이터 필터링
    bronze_df = spark.table(bronze_table)
    bronze_filtered = bronze_df.filter(to_date(col("candle_date_time_kst")) == execution_date)

    # ✅ Step 2. Silver로 가공
    silver_df = bronze_filtered.withColumn("date", to_date(col("candle_date_time_kst"))) \
        .withColumn("volume", col("candle_acc_trade_volume")) \
        .withColumn("value", col("candle_acc_trade_price")) \
        .withColumn("year", year(col("date"))) \
        .withColumn("month", month(col("date"))) \
        .withColumn("day", dayofmonth(col("date"))) \
        .withColumn("load_timestamp", current_timestamp()) \
        .select(
            "market", "date", "year", "month", "day",
            "opening_price", "high_price", "low_price", "trade_price",
            "prev_closing_price", "change_price", "change_rate",
            "volume", "value", "load_timestamp"
        )

    # ✅ Step 3. Silver 테이블에 해당 날짜 존재 여부 확인 및 삭제
    from pyspark.sql.utils import AnalysisException

    try:
        is_exist = (
            spark.table(silver_table)
            .filter((col("market") == market) & (col("date") == execution_date))
            .limit(1)
            .count() > 0
        )
    except AnalysisException:
        is_exist = False  # 테이블이 아직 없으면

    if is_exist:
        print(f"✅ {market} - 날짜 {execution_date} 삭제 후 append")
        spark.sql(
            f"""
            DELETE FROM {silver_table}
            WHERE market = '{market}' AND date = DATE('{execution_date}')
            """
        )
    else:
        print(f"🆕 {market} - 날짜 {execution_date} append")

    # ✅ Step 4. Append
    silver_df.write.format("delta") \
        .mode("append") \
        .partitionBy("market", "year", "month") \
        .saveAsTable(silver_table)


🆕 KRW-BTC - 날짜 2024-01-04 append
🆕 KRW-ETH - 날짜 2024-01-04 append
🆕 KRW-XRP - 날짜 2024-01-04 append


In [0]:
%sql

SELECT * FROM silver.silver_krw_btc

market,date,year,month,day,opening_price,high_price,low_price,trade_price,prev_closing_price,change_price,change_rate,volume,value,load_timestamp
KRW-BTC,2024-01-01,2024,1,1,57045000.0,58847000.0,57045000.0,58839000.0,57047000.0,1792000.0,0.0314126948,3165.32092908,182491217914.30185,2025-06-17T10:30:25.353Z
KRW-BTC,2024-01-02,2024,1,2,58839000.0,60997000.0,58814000.0,60206000.0,58839000.0,1367000.0,0.0232328897,8221.19868374,495038780463.7768,2025-06-17T10:30:25.353Z
KRW-BTC,2023-12-31,2023,12,31,56639000.0,57437000.0,56452000.0,57047000.0,56639000.0,408000.0,0.007203517,2308.38316024,131519480295.09232,2025-06-17T10:30:25.353Z
KRW-BTC,2024-01-04,2024,1,4,58581000.0,60162000.0,57319000.0,59477000.0,58581000.0,896000.0,0.0152950615,6254.53768656,366025648066.8456,2025-06-17T10:38:02.131Z
KRW-BTC,2024-01-03,2024,1,3,60205000.0,60799000.0,54346000.0,58581000.0,60206000.0,-1625000.0,-0.0269906654,16310.30692586,945841183535.25,2025-06-17T10:30:25.353Z


In [0]:
%sql

SELECT * FROM silver.silver_krw_eth

market,date,year,month,day,opening_price,high_price,low_price,trade_price,prev_closing_price,change_price,change_rate,volume,value,load_timestamp
KRW-ETH,2024-01-04,2024,1,4,3024000.0,3078000.0,2967000.0,3053000.0,3024000.0,29000.0,0.0095899471,24389.84136858,73593717136.83714,2025-06-17T10:38:04.145Z
KRW-ETH,2024-01-03,2024,1,3,3154000.0,3190000.0,2851000.0,3024000.0,3154000.0,-130000.0,-0.0412175016,57149.37569621,173530569460.93912,2025-06-17T10:30:36.061Z
KRW-ETH,2024-01-02,2024,1,2,3134000.0,3230000.0,3128000.0,3154000.0,3136000.0,18000.0,0.0057397959,29124.21085963,92574730778.70412,2025-06-17T10:30:36.061Z
KRW-ETH,2024-01-01,2024,1,1,3082000.0,3140000.0,3070000.0,3136000.0,3082000.0,54000.0,0.0175210902,11918.79208026,36923910457.09838,2025-06-17T10:30:36.061Z
KRW-ETH,2023-12-31,2023,12,31,3081000.0,3109000.0,3060000.0,3082000.0,3081000.0,1000.0,0.0003245699,12170.32394322,37497055063.56301,2025-06-17T10:30:36.061Z


In [0]:
%sql

SELECT * FROM silver.silver_krw_eth

market,date,year,month,day,opening_price,high_price,low_price,trade_price,prev_closing_price,change_price,change_rate,volume,value,load_timestamp
KRW-ETH,2024-01-04,2024,1,4,3024000.0,3078000.0,2967000.0,3053000.0,3024000.0,29000.0,0.0095899471,24389.84136858,73593717136.83714,2025-06-17T11:07:38.87Z
KRW-ETH,2024-01-03,2024,1,3,3154000.0,3190000.0,2851000.0,3024000.0,3154000.0,-130000.0,-0.0412175016,57149.37569621,173530569460.93912,2025-06-17T10:30:36.061Z
KRW-ETH,2024-01-02,2024,1,2,3134000.0,3230000.0,3128000.0,3154000.0,3136000.0,18000.0,0.0057397959,29124.21085963,92574730778.70412,2025-06-17T10:30:36.061Z
KRW-ETH,2024-01-01,2024,1,1,3082000.0,3140000.0,3070000.0,3136000.0,3082000.0,54000.0,0.0175210902,11918.79208026,36923910457.09838,2025-06-17T10:30:36.061Z
KRW-ETH,2023-12-31,2023,12,31,3081000.0,3109000.0,3060000.0,3082000.0,3081000.0,1000.0,0.0003245699,12170.32394322,37497055063.56301,2025-06-17T10:30:36.061Z


In [0]:
# Silver Incremental Load Notebook 시작
dbutils.widgets.text("execution_date", "2024-01-04")
execution_date = dbutils.widgets.get("execution_date")

from pyspark.sql.functions import col, to_date, year, month, dayofmonth, current_timestamp
from pyspark.sql.utils import AnalysisException

# 마켓 목록
markets = ["KRW-BTC", "KRW-ETH", "KRW-XRP"]

for market in markets:
    bronze_table = f"bronze.bronze_{market.lower().replace('-', '_')}"
    silver_table = f"silver.silver_{market.lower().replace('-', '_')}"

    try:
        # Step 1. Bronze에서 해당 날짜만 필터링
        bronze_df = spark.table(bronze_table)
        bronze_filtered = bronze_df.filter(to_date(col("candle_date_time_kst")) == execution_date)

        if bronze_filtered.count() == 0:
            print(f"⚠️ {market} - 해당 날짜({execution_date}) 데이터 없음. Skip.")
            continue

        # Step 2. Silver 변환
        silver_df = bronze_filtered.withColumn("date", to_date(col("candle_date_time_kst"))) \
            .withColumn("volume", col("candle_acc_trade_volume")) \
            .withColumn("value", col("candle_acc_trade_price")) \
            .withColumn("year", year(col("date"))) \
            .withColumn("month", month(col("date"))) \
            .withColumn("day", dayofmonth(col("date"))) \
            .withColumn("load_timestamp", current_timestamp()) \
            .select(
                "market", "date", "year", "month", "day",
                "opening_price", "high_price", "low_price", "trade_price",
                "prev_closing_price", "change_price", "change_rate",
                "volume", "value", "load_timestamp"
            )

        # Step 3. 기존 silver 테이블에서 해당 날짜 삭제 (있으면)
        try:
            is_exist = (
                spark.table(silver_table)
                .filter((col("market") == market) & (col("date") == execution_date))
                .limit(1)
                .count() > 0
            )
        except AnalysisException:
            is_exist = False  # 테이블이 아예 없는 경우

        if is_exist:
            print(f"🧹 {market} - {execution_date} 기존 데이터 삭제")
            spark.sql(
                f"""
                DELETE FROM {silver_table}
                WHERE market = '{market}' AND date = DATE('{execution_date}')
                """
            )

        # Step 4. Append 저장
        print(f"💾 {market} - {execution_date} 데이터 저장 중...")
        silver_df.write.format("delta") \
            .mode("append") \
            .partitionBy("market", "year", "month") \
            .saveAsTable(silver_table)

        print(f"✅ {market} - {execution_date} 저장 완료")

    except Exception as e:
        print(f"❌ {market} 처리 실패: {e}")


🧹 KRW-BTC - 2024-01-04 기존 데이터 삭제
💾 KRW-BTC - 2024-01-04 데이터 저장 중...
✅ KRW-BTC - 2024-01-04 저장 완료
🧹 KRW-ETH - 2024-01-04 기존 데이터 삭제
💾 KRW-ETH - 2024-01-04 데이터 저장 중...
✅ KRW-ETH - 2024-01-04 저장 완료
🧹 KRW-XRP - 2024-01-04 기존 데이터 삭제
💾 KRW-XRP - 2024-01-04 데이터 저장 중...
✅ KRW-XRP - 2024-01-04 저장 완료


In [0]:
print("test")

test


In [0]:
%sql

select * from bronze.bronze_krw_btc;

candle_acc_trade_price,candle_acc_trade_volume,candle_date_time_kst,candle_date_time_utc,change_price,change_rate,high_price,low_price,market,opening_price,prev_closing_price,timestamp,trade_price
366025648066.8456,6254.53768656,2024-01-04T09:00:00,2024-01-04T00:00:00,896000.0,0.0152950615,60162000.0,57319000.0,KRW-BTC,58581000.0,58581000.0,1704412798132,59477000.0
945841183535.25,16310.30692586,2024-01-03T09:00:00,2024-01-03T00:00:00,-1625000.0,-0.0269906654,60799000.0,54346000.0,KRW-BTC,60205000.0,60206000.0,1704326399884,58581000.0
182491217914.30185,3165.32092908,2024-01-01T09:00:00,2024-01-01T00:00:00,1792000.0,0.0314126948,58847000.0,57045000.0,KRW-BTC,57045000.0,57047000.0,1704153599562,58839000.0
495038780463.7768,8221.19868374,2024-01-02T09:00:00,2024-01-02T00:00:00,1367000.0,0.0232328897,60997000.0,58814000.0,KRW-BTC,58839000.0,58839000.0,1704239999808,60206000.0
131519480295.09232,2308.38316024,2023-12-31T09:00:00,2023-12-31T00:00:00,408000.0,0.007203517,57437000.0,56452000.0,KRW-BTC,56639000.0,56639000.0,1704067199664,57047000.0


In [0]:
%sql

select * from silver.silver_krw_btc;