# Build Spark Session

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col
import os
import sys
import time

# =============================================================================
# [PART 1] Environment & Spark Init
# =============================================================================
aws_access_key = os.getenv("AWS_ACCESS_KEY_ID")
aws_secret_key = os.getenv("AWS_SECRET_ACCESS_KEY")
aws_region = os.getenv("AWS_REGION", "us-east-1")
s3_endpoint = os.getenv("AWS_S3_ENDPOINT", "http://minio:9000")
nessie_uri = os.getenv("NESSIE_URI", "http://nessie:19120/api/v1")
RAW_DATA_PATH = "/mnt/kkr/iceberg/datasets/nuscenes_v1.0-mini/v1.0-mini" 

if not aws_access_key or not aws_secret_key:
    print("Error: AWS Access Key or Secret Key is missing in environment variables.")
    sys.exit(1)

spark = SparkSession.builder \
    .appName("NessieMinioSpark") \
    .config('spark.sql.extensions', 'org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions,org.projectnessie.spark.extensions.NessieSparkSessionExtensions') \
    .config('spark.sql.catalog.spark_catalog', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.spark_catalog.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog') \
    .config('spark.sql.catalog.spark_catalog.uri', nessie_uri) \
    .config('spark.sql.catalog.spark_catalog.warehouse', 's3://spark1') \
    .config('spark.sql.catalog.spark_catalog.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') \
    .config('spark.sql.catalog.spark_catalog.s3.endpoint', s3_endpoint) \
    .config('spark.sql.catalog.spark_catalog.s3.path-style-access', 'true') \
    .config('spark.sql.defaultCatalog', 'spark_catalog') \
    .config('spark.sql.catalog.nessie', 'org.apache.iceberg.spark.SparkCatalog') \
    .config('spark.sql.catalog.nessie.warehouse', 's3://spark1') \
    .config('spark.sql.catalog.nessie.catalog-impl', 'org.apache.iceberg.nessie.NessieCatalog') \
    .config('spark.sql.catalog.nessie.io-impl', 'org.apache.iceberg.aws.s3.S3FileIO') \
    .config('spark.sql.catalog.nessie.uri', nessie_uri) \
    .config('spark.sql.catalog.nessie.ref', 'main') \
    .config('spark.sql.catalog.nessie.cache-enabled', 'false') \
    .config('spark.sql.catalog.nessie.s3.endpoint', s3_endpoint) \
    .config('spark.sql.catalog.nessie.s3.region', aws_region) \
    .config('spark.sql.catalog.nessie.s3.path-style-access', 'true') \
    .config('spark.sql.catalog.nessie.s3.access-key-id', aws_access_key) \
    .config('spark.sql.catalog.nessie.s3.secret-access-key', aws_secret_key) \
    .config('spark.hadoop.fs.s3a.access.key', aws_access_key) \
    .config('spark.hadoop.fs.s3a.secret.key', aws_secret_key) \
    .config('spark.hadoop.fs.s3a.endpoint', s3_endpoint) \
    .config('spark.hadoop.fs.s3a.path.style.access', 'true') \
    .config('spark.hadoop.fs.s3a.connection.ssl.enabled', 'false') \
    .config('spark.hadoop.fs.s3a.impl', 'org.apache.hadoop.fs.s3a.S3AFileSystem') \
    .config('spark.hadoop.fs.s3a.aws.credentials.provider', 'org.apache.hadoop.fs.s3a.SimpleAWSCredentialsProvider') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/12/19 12:09:23 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/12/19 12:09:23 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


## Iceberg-Silver Data Load

In [21]:
SCALE_FACTOR = 1

setup_start = time.time()

# Namespace 생성
spark.sql("CREATE NAMESPACE IF NOT EXISTS nessie.nusc_db")

try:
    # 테이블이 이미 있는지 확인 (테스트용)
    # spark.table("nessie.nusc_db.sample_data") # 이 부분은 주석 처리하거나 에러 핸들링을 위해 둠
    # print("Tables might exist. Attempting to overwrite...")
    pass
except:
    pass

# 1. JSON 읽기
df_sample = spark.read.option("multiLine", True).json(f"{RAW_DATA_PATH}/sample.json")
df_sample_data = spark.read.option("multiLine", True).json(f"{RAW_DATA_PATH}/sample_data.json")
df_annotation = spark.read.option("multiLine", True).json(f"{RAW_DATA_PATH}/sample_annotation.json")
df_category = spark.read.option("multiLine", True).json(f"{RAW_DATA_PATH}/category.json")
df_instance = spark.read.option("multiLine", True).json(f"{RAW_DATA_PATH}/instance.json")
df_sensor = spark.read.option("multiLine", True).json(f"{RAW_DATA_PATH}/sensor.json")
df_calibrated = spark.read.option("multiLine", True).json(f"{RAW_DATA_PATH}/calibrated_sensor.json")

# 2. Pre-Join (Denormalization)
df_channel_map = df_calibrated.join(
    df_sensor, 
    df_calibrated["sensor_token"] == df_sensor["token"]
).select(
    df_calibrated["token"].alias("calib_token"), 
    df_sensor["channel"]
)

df_sample_data_enriched = df_sample_data.join(
    df_channel_map,
    df_sample_data["calibrated_sensor_token"] == df_channel_map["calib_token"]
).drop("calib_token")

# =========================================================
# [실험 변수] 데이터 스케일 팩터
# =========================================================

def scale_df(df, factor):
    if factor <= 1: return df
    return df.crossJoin(spark.range(factor)).drop("id")

print(f">>> [Experiment] Scaling Key Tables by {SCALE_FACTOR}x (others keep 1x) ...")

# 1. 참조 테이블 (Reference Tables) - 스케일링 하지 않음 (Python Dictionary 동작 모사)
# samples, instances, category는 1배 유지
df_sample.write.format("iceberg").mode("overwrite").saveAsTable("nessie.nusc_db.samples")
df_category.write.format("iceberg").mode("overwrite").saveAsTable("nessie.nusc_db.category")
df_instance.write.format("iceberg").mode("overwrite").saveAsTable("nessie.nusc_db.instances")

# 2. 팩트 테이블 (Fact Tables) - 스케일링 적용 (데이터 폭증 유발)
# sample_data와 annotations만 늘려서 10 * 10 = 100배 효과를 냄
scale_df(df_sample_data_enriched, SCALE_FACTOR).write.format("iceberg") \
    .partitionBy("channel") \
    .mode("overwrite") \
    .saveAsTable("nessie.nusc_db.sample_data")

scale_df(df_annotation, SCALE_FACTOR).write.format("iceberg").mode("overwrite").saveAsTable("nessie.nusc_db.annotations")

print(f"Data Ingestion Finished: {time.time() - setup_start:.2f}s")

>>> [Experiment] Scaling Key Tables by 1x (others keep 1x) ...
Data Ingestion Finished: 1.63s


### Iceberg-Silver Query Exp

In [24]:
#SF=1
all_time=0
test_count=10
for i in range(test_count):
    query_start = time.time()

    query = """
    SELECT 
        sd.filename as img_path,
        a.translation,
        a.size,
        a.rotation
    FROM nessie.nusc_db.samples s
    JOIN nessie.nusc_db.sample_data sd 
        ON s.token = sd.sample_token
    JOIN nessie.nusc_db.annotations a 
        ON s.token = a.sample_token
    JOIN nessie.nusc_db.instances i 
        ON a.instance_token = i.token
    JOIN nessie.nusc_db.category c 
        ON i.category_token = c.token
    WHERE 
        sd.channel = 'CAM_FRONT' 
        AND c.name = 'human.pedestrian.adult'
    """

    result_df = spark.sql(query)
    count = result_df.count()

    query_end = time.time()
    print(f">>> [Lakehouse] 결과: 보행자 데이터 {count}건 추출 완료")
    print(f">>> [Lakehouse] 데이터셋 구성 소요 시간: {query_end - query_start:.4f}초")

    #spark.stop()
    all_time += query_end - query_start
print(f"Total avg Time Elapsed: {all_time/test_count:.4f} sec")    

>>> [Lakehouse] 결과: 보행자 데이터 27483건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.2533초
>>> [Lakehouse] 결과: 보행자 데이터 27483건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.2611초
>>> [Lakehouse] 결과: 보행자 데이터 27483건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.2132초
>>> [Lakehouse] 결과: 보행자 데이터 27483건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.2350초
>>> [Lakehouse] 결과: 보행자 데이터 27483건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.2562초
>>> [Lakehouse] 결과: 보행자 데이터 27483건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.2538초
>>> [Lakehouse] 결과: 보행자 데이터 27483건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.2577초
>>> [Lakehouse] 결과: 보행자 데이터 27483건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.2259초
>>> [Lakehouse] 결과: 보행자 데이터 27483건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.2606초
>>> [Lakehouse] 결과: 보행자 데이터 27483건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.2280초
Total avg Time Elapsed: 0.2445 sec


In [None]:
#SF=3
all_time=0
test_count=10
for i in range(test_count):
    query_start = time.time()

    query = """
    SELECT 
        sd.filename as img_path,
        a.translation,
        a.size,
        a.rotation
    FROM nessie.nusc_db.samples s
    JOIN nessie.nusc_db.sample_data sd 
        ON s.token = sd.sample_token
    JOIN nessie.nusc_db.annotations a 
        ON s.token = a.sample_token
    JOIN nessie.nusc_db.instances i 
        ON a.instance_token = i.token
    JOIN nessie.nusc_db.category c 
        ON i.category_token = c.token
    WHERE 
        sd.channel = 'CAM_FRONT' 
        AND c.name = 'human.pedestrian.adult'
    """

    result_df = spark.sql(query)
    count = result_df.count()

    query_end = time.time()
    print(f">>> [Lakehouse] 결과: 보행자 데이터 {count}건 추출 완료")
    print(f">>> [Lakehouse] 데이터셋 구성 소요 시간: {query_end - query_start:.4f}초")

    #spark.stop()
    all_time += query_end - query_start
print(f"Total avg Time Elapsed: {all_time/test_count:.4f} sec")    

>>> [Lakehouse] 결과: 보행자 데이터 247347건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.4549초
>>> [Lakehouse] 결과: 보행자 데이터 247347건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.4696초
>>> [Lakehouse] 결과: 보행자 데이터 247347건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.4230초
>>> [Lakehouse] 결과: 보행자 데이터 247347건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.3984초
>>> [Lakehouse] 결과: 보행자 데이터 247347건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.4220초
>>> [Lakehouse] 결과: 보행자 데이터 247347건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.4307초
>>> [Lakehouse] 결과: 보행자 데이터 247347건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.3822초
>>> [Lakehouse] 결과: 보행자 데이터 247347건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.4702초
>>> [Lakehouse] 결과: 보행자 데이터 247347건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.4115초
>>> [Lakehouse] 결과: 보행자 데이터 247347건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.4908초
Total avg Time Elapsed: 0.4353 sec


In [18]:
#SF=5
all_time=0
test_count=10
for i in range(test_count):
    query_start = time.time()

    query = """
    SELECT 
        sd.filename as img_path,
        a.translation,
        a.size,
        a.rotation
    FROM nessie.nusc_db.samples s
    JOIN nessie.nusc_db.sample_data sd 
        ON s.token = sd.sample_token
    JOIN nessie.nusc_db.annotations a 
        ON s.token = a.sample_token
    JOIN nessie.nusc_db.instances i 
        ON a.instance_token = i.token
    JOIN nessie.nusc_db.category c 
        ON i.category_token = c.token
    WHERE 
        sd.channel = 'CAM_FRONT' 
        AND c.name = 'human.pedestrian.adult'
    """

    result_df = spark.sql(query)
    count = result_df.count()

    query_end = time.time()
    print(f">>> [Lakehouse] 결과: 보행자 데이터 {count}건 추출 완료")
    print(f">>> [Lakehouse] 데이터셋 구성 소요 시간: {query_end - query_start:.4f}초")

    #spark.stop()
    all_time += query_end - query_start
print(f"Total avg Time Elapsed: {all_time/test_count:.4f} sec")    

>>> [Lakehouse] 결과: 보행자 데이터 687075건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.6546초
>>> [Lakehouse] 결과: 보행자 데이터 687075건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.6339초
>>> [Lakehouse] 결과: 보행자 데이터 687075건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.8451초
>>> [Lakehouse] 결과: 보행자 데이터 687075건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.7130초
>>> [Lakehouse] 결과: 보행자 데이터 687075건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.6355초
>>> [Lakehouse] 결과: 보행자 데이터 687075건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.6601초
>>> [Lakehouse] 결과: 보행자 데이터 687075건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.6182초
>>> [Lakehouse] 결과: 보행자 데이터 687075건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.5970초
>>> [Lakehouse] 결과: 보행자 데이터 687075건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.6147초
>>> [Lakehouse] 결과: 보행자 데이터 687075건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.6150초
Total avg Time Elapsed: 0.6587 sec


In [20]:
#SF=7
all_time=0
test_count=10
for i in range(test_count):
    query_start = time.time()

    query = """
    SELECT 
        sd.filename as img_path,
        a.translation,
        a.size,
        a.rotation
    FROM nessie.nusc_db.samples s
    JOIN nessie.nusc_db.sample_data sd 
        ON s.token = sd.sample_token
    JOIN nessie.nusc_db.annotations a 
        ON s.token = a.sample_token
    JOIN nessie.nusc_db.instances i 
        ON a.instance_token = i.token
    JOIN nessie.nusc_db.category c 
        ON i.category_token = c.token
    WHERE 
        sd.channel = 'CAM_FRONT' 
        AND c.name = 'human.pedestrian.adult'
    """

    result_df = spark.sql(query)
    count = result_df.count()

    query_end = time.time()
    print(f">>> [Lakehouse] 결과: 보행자 데이터 {count}건 추출 완료")
    print(f">>> [Lakehouse] 데이터셋 구성 소요 시간: {query_end - query_start:.4f}초")

    #spark.stop()
    all_time += query_end - query_start
print(f"Total avg Time Elapsed: {all_time/test_count:.4f} sec")    

                                                                                

>>> [Lakehouse] 결과: 보행자 데이터 1346667건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 1.0477초


                                                                                

>>> [Lakehouse] 결과: 보행자 데이터 1346667건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 1.0171초


                                                                                

>>> [Lakehouse] 결과: 보행자 데이터 1346667건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.9256초


                                                                                

>>> [Lakehouse] 결과: 보행자 데이터 1346667건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 1.0421초


                                                                                

>>> [Lakehouse] 결과: 보행자 데이터 1346667건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.9735초


                                                                                

>>> [Lakehouse] 결과: 보행자 데이터 1346667건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.9722초


                                                                                

>>> [Lakehouse] 결과: 보행자 데이터 1346667건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.9497초


                                                                                

>>> [Lakehouse] 결과: 보행자 데이터 1346667건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.9177초


                                                                                

>>> [Lakehouse] 결과: 보행자 데이터 1346667건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.9279초
>>> [Lakehouse] 결과: 보행자 데이터 1346667건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.9071초
Total avg Time Elapsed: 0.9681 sec


                                                                                

## Iceberg-Gold Data Load

In [35]:
SCALE_FACTOR = 7

print(">>> [Lakehouse] Phase 1: Gold Table 생성 (Join + Denormalization)")
setup_start = time.time()

# 1. JSON 읽기
df_sample = spark.read.option("multiLine", True).json(f"{RAW_DATA_PATH}/sample.json")
df_sample_data = spark.read.option("multiLine", True).json(f"{RAW_DATA_PATH}/sample_data.json")
df_annotation = spark.read.option("multiLine", True).json(f"{RAW_DATA_PATH}/sample_annotation.json")
df_category = spark.read.option("multiLine", True).json(f"{RAW_DATA_PATH}/category.json")
df_instance = spark.read.option("multiLine", True).json(f"{RAW_DATA_PATH}/instance.json")
df_sensor = spark.read.option("multiLine", True).json(f"{RAW_DATA_PATH}/sensor.json")
df_calibrated = spark.read.option("multiLine", True).json(f"{RAW_DATA_PATH}/calibrated_sensor.json")

# 2. 복잡한 조인을 미리 수행 (Denormalization)
# CAM_FRONT 채널 정보 결합
df_channel_map = df_calibrated.join(df_sensor, df_calibrated["sensor_token"] == df_sensor["token"]) \
    .select(df_calibrated["token"].alias("calib_token"), df_sensor["channel"])

# 전체 조인 수행 (Gold Table용 데이터 구성)
df_gold_raw = df_sample_data.join(df_channel_map, df_sample_data["calibrated_sensor_token"] == df_channel_map["calib_token"]) \
    .join(df_annotation, df_sample_data["sample_token"] == df_annotation["sample_token"]) \
    .join(df_instance, df_annotation["instance_token"] == df_instance["token"]) \
    .join(df_category, df_instance["category_token"] == df_category["token"]) \
    .select(
        df_sample_data["filename"].alias("img_path"),
        df_annotation["translation"],
        df_annotation["size"],
        df_annotation["rotation"],
        df_sensor["channel"],
        df_category["name"].alias("category_name")
    )

# 3. 데이터 스케일링 (10배 증강 -> 조인 폭발 시뮬레이션 결과와 맞추기 위해 100배 효과 적용 가능)
def scale_df(df, factor):
    if factor <= 1: return df
    # Baseline과 동일하게 10x10=100배 효과를 내기 위해 factor*factor로 증강
    return df.crossJoin(spark.range(factor * factor)).drop("id")

print(f">>> [Experiment] Scaling Gold Table by {SCALE_FACTOR}x{SCALE_FACTOR}=100x ...")
df_gold_final = scale_df(df_gold_raw, SCALE_FACTOR)

# 4. Iceberg Gold Table 저장
# CAM_FRONT나 Category에 상관없이 일단 저장한 뒤 쿼리에서 필터링하는 방식이 실용적입니다.
df_gold_final.write.format("iceberg") \
    .partitionBy("channel") \
    .mode("overwrite") \
    .saveAsTable("nessie.nusc_db.gold_train_set")

print(f"Gold Table Ingestion Finished: {time.time() - setup_start:.2f}s")

>>> [Lakehouse] Phase 1: Gold Table 생성 (Join + Denormalization)
>>> [Experiment] Scaling Gold Table by 7x7=100x ...




Gold Table Ingestion Finished: 81.85s


                                                                                

### Iceberg-Gold Exp

In [28]:
#SF=1
all_time=0
test_count=10
for i in range(test_count):
  print(">>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리")
  query_start = time.time()

  # 조인이 전혀 없는 단순 필터링 쿼리
  query = """
  SELECT img_path, translation, size, rotation
  FROM nessie.nusc_db.gold_train_set
  WHERE channel = 'CAM_FRONT' 
    AND category_name = 'human.pedestrian.adult'
  """

  result_df = spark.sql(query)
  count = result_df.count()

  query_end = time.time()
  print(f">>> [Lakehouse] 결과: 보행자 데이터 {count}건 추출 완료")
  print(f">>> [Lakehouse] 데이터셋 구성 소요 시간: {query_end - query_start:.4f}초")

  #spark.stop()
  all_time += query_end - query_start
print(f"Total avg Time Elapsed: {all_time/test_count:.4f} sec")     

>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 27483건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.0961초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 27483건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.0624초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 27483건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.0524초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 27483건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.0635초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 27483건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.0615초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 27483건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.0822초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 27483건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.0737초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 27483건 추출 완료
>>> [Lakehouse]

In [31]:
#SF=3
all_time=0
test_count=10
for i in range(test_count):
  print(">>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리")
  query_start = time.time()

  # 조인이 전혀 없는 단순 필터링 쿼리
  query = """
  SELECT img_path, translation, size, rotation
  FROM nessie.nusc_db.gold_train_set
  WHERE channel = 'CAM_FRONT' 
    AND category_name = 'human.pedestrian.adult'
  """

  result_df = spark.sql(query)
  count = result_df.count()

  query_end = time.time()
  print(f">>> [Lakehouse] 결과: 보행자 데이터 {count}건 추출 완료")
  print(f">>> [Lakehouse] 데이터셋 구성 소요 시간: {query_end - query_start:.4f}초")

  #spark.stop()
  all_time += query_end - query_start
print(f"Total avg Time Elapsed: {all_time/test_count:.4f} sec")     

>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 247347건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.1191초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 247347건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.1186초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 247347건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.1244초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 247347건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.1105초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 247347건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.0925초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 247347건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.0991초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 247347건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.0818초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 247347건 추출 완료
>>> [La

In [34]:
#SF=5
all_time=0
test_count=10
for i in range(test_count):
  print(">>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리")
  query_start = time.time()

  # 조인이 전혀 없는 단순 필터링 쿼리
  query = """
  SELECT img_path, translation, size, rotation
  FROM nessie.nusc_db.gold_train_set
  WHERE channel = 'CAM_FRONT' 
    AND category_name = 'human.pedestrian.adult'
  """

  result_df = spark.sql(query)
  count = result_df.count()

  query_end = time.time()
  print(f">>> [Lakehouse] 결과: 보행자 데이터 {count}건 추출 완료")
  print(f">>> [Lakehouse] 데이터셋 구성 소요 시간: {query_end - query_start:.4f}초")

  #spark.stop()
  all_time += query_end - query_start
print(f"Total avg Time Elapsed: {all_time/test_count:.4f} sec")     

>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 687075건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.1335초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 687075건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.1295초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 687075건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.0913초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 687075건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.0959초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 687075건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.1298초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 687075건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.1296초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 687075건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.1442초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 687075건 추출 완료
>>> [La

In [38]:
#SF=7
all_time=0
test_count=10
for i in range(test_count):
  print(">>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리")
  query_start = time.time()

  # 조인이 전혀 없는 단순 필터링 쿼리
  query = """
  SELECT img_path, translation, size, rotation
  FROM nessie.nusc_db.gold_train_set
  WHERE channel = 'CAM_FRONT' 
    AND category_name = 'human.pedestrian.adult'
  """

  result_df = spark.sql(query)
  count = result_df.count()

  query_end = time.time()
  print(f">>> [Lakehouse] 결과: 보행자 데이터 {count}건 추출 완료")
  print(f">>> [Lakehouse] 데이터셋 구성 소요 시간: {query_end - query_start:.4f}초")

  #spark.stop()
  all_time += query_end - query_start
print(f"Total avg Time Elapsed: {all_time/test_count:.4f} sec")     

>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 1346667건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.1727초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 1346667건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.1768초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 1346667건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.1735초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 1346667건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.1396초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 1346667건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.1388초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 1346667건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.1097초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 1346667건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 0.1581초
>>> [Lakehouse] Phase 2: 실험 시작 - 단일 Gold 테이블 쿼리
>>> [Lakehouse] 결과: 보행자 데이터 1346667건 추출 완료

# Lakehouse General

In [15]:
# -----------------------------------------------------------------------------
# 3. Phase 2: Experiment (Query)
# -----------------------------------------------------------------------------
print(">>> [Lakehouse] Phase 2: 실험 시작 - 모델 학습 데이터셋 구성 쿼리")
query_start = time.time()

query = """
SELECT 
    sd.filename as img_path,
    a.translation,
    a.size,
    a.rotation
FROM nessie.nusc_db.samples s
JOIN nessie.nusc_db.sample_data sd 
    ON s.token = sd.sample_token
JOIN nessie.nusc_db.annotations a 
    ON s.token = a.sample_token
JOIN nessie.nusc_db.instances i 
    ON a.instance_token = i.token
JOIN nessie.nusc_db.category c 
    ON i.category_token = c.token
WHERE 
    sd.channel = 'CAM_FRONT' 
    AND c.name = 'human.pedestrian.adult'
"""

result_df = spark.sql(query)
count = result_df.count()

query_end = time.time()
print(f">>> [Lakehouse] 결과: 보행자 데이터 {count}건 추출 완료")
print(f">>> [Lakehouse] 데이터셋 구성 소요 시간: {query_end - query_start:.4f}초")

>>> [Lakehouse] Phase 2: 실험 시작 - 모델 학습 데이터셋 구성 쿼리


[Stage 56:>                                                         (0 + 1) / 1]

>>> [Lakehouse] 결과: 보행자 데이터 2748300건 추출 완료
>>> [Lakehouse] 데이터셋 구성 소요 시간: 1.6202초


                                                                                

In [32]:
spark.stop()