## Configuration

In [1]:
from dotenv import load_dotenv
import os

load_dotenv()  

S3_ENDPOINT="https://minio-dev.service.dc1.consul:9000"
S3_BUCKET = "dlh-lab"
S3_BUCKET_SOURCE = "dlh-source-data-lab"
AWS_ACCESS_KEY = os.environ["AWS_ACCESS_KEY_ID"]
AWS_SECRET_KEY = os.environ["AWS_SECRET_ACCESS_KEY"]

#print(f"Acc KEY: {AWS_ACCESS_KEY}")
print(f"Endpoint: {S3_ENDPOINT}")
print(f"Bucket:   {S3_BUCKET}")

Endpoint: https://minio-dev.service.dc1.consul:9000
Bucket:   dlh-lab


In [2]:
import s3fs
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)

fs = s3fs.S3FileSystem(
    endpoint_url=S3_ENDPOINT,
    key=AWS_ACCESS_KEY,
    secret=AWS_SECRET_KEY,
    client_kwargs={"verify": False},
)

# List top-level prefixes in the bucket
print(f"Bucket: {S3_BUCKET}")
print()
for item in fs.ls(S3_BUCKET):
    print(f"  {item}")

Bucket: dlh-lab

  dlh-lab/DEMO
  dlh-lab/dane_ow_plan_pracy_jg_4525d1e9-afa5-48a0-936f-26ee1be101be
  dlh-lab/default
  dlh-lab/gold
  dlh-lab/iceberg-rammi
  dlh-lab/iceberg_demo
  dlh-lab/new_nessie_setup_a2d9707c-3d2f-4fcc-a6d4-70400fda60ff
  dlh-lab/normalized
  dlh-lab/parquet
  dlh-lab/poc
  dlh-lab/raw
  dlh-lab/semantic
  dlh-lab/stor_tab03
  dlh-lab/stor_table
  dlh-lab/stor_table01
  dlh-lab/stor_table02
  dlh-lab/stor_table05
  dlh-lab/system
  dlh-lab/t1
  dlh-lab/t5_0e127df0-b752-4f76-b8c7-86354f6171d5
  dlh-lab/t6_bde786e4-2be7-43f7-a628-3b0033b1a8d7
  dlh-lab/t7_49cb24a1-5a77-4605-aa51-522c72db1534
  dlh-lab/t8_46973d81-47dd-44b6-8814-b532f3a7c684
  dlh-lab/t9_9e19e1c0-4365-4e00-9ad0-4434612daf16
  dlh-lab/tab10_48c867be-466c-47c0-b35e-5dce4d6bab43
  dlh-lab/tab11_fdfb1e34-0c76-4f6a-a2c3-9aec16b08372
  dlh-lab/tab12_3523601c-42dc-42d0-8fa5-e333bc7b4c65
  dlh-lab/tab1_51855741-7768-4c58-a091-df9874f4d3e6
  dlh-lab/tab1_5a28d8b6-c551-4d64-95d2-6f84a1e71f4e
  dlh-lab/tab2_

In [8]:
# List source files (csv, zip, parquet)
for item in fs.ls(f"{S3_BUCKET_SOURCE}"):
    print(item)
    for sub in fs.ls(item)[:5]:
        print(f"  {sub}")

dlh-source-data-lab/reporting
  dlh-source-data-lab/reporting/hip
dlh-source-data-lab/tmp
  dlh-source-data-lab/tmp/PB_PBALL_20250428__20250423000000687.csv.ZIP
  dlh-source-data-lab/tmp/PB_TYPPARTNERAPB_20250428__20250423000000689.csv.ZIP
  dlh-source-data-lab/tmp/RBUS_ROZLREZEOPER_20250420_S_20250419145110873.csv
  dlh-source-data-lab/tmp/chunk.parquet
  dlh-source-data-lab/tmp/extract_for_connection_test.csv


In [11]:
# List warehouse (Iceberg table storage)
warehouse_path = f"{S3_BUCKET}/iceberg-rammi"
if fs.exists(warehouse_path):
    for item in fs.ls(warehouse_path):
        print(item)
        for sub in fs.ls(item)[:5]:
            print(f"  {sub}")
else:
    print("No warehouse directory yet â€” run spark_ingest.py first")

dlh-lab/iceberg-rammi/hip
  dlh-lab/iceberg-rammi/hip/
  dlh-lab/iceberg-rammi/hip/HISKSE_ROZLMOCYOS
  dlh-lab/iceberg-rammi/hip/OMSR_PLANGEN
  dlh-lab/iceberg-rammi/hip/PBM_QN
  dlh-lab/iceberg-rammi/hip/RBES_ZASGENMAG


---
## 2.Parquet

In [13]:
import polars as pl

storage_options = {
    "endpoint_url": S3_ENDPOINT,
    "aws_access_key_id": AWS_ACCESS_KEY,
    "aws_secret_access_key": AWS_SECRET_KEY,
    "aws_region": "PSEIDEV",
    "allow_invalid_certificates": "true",
}

# Find parquet files in source/parquet/
parquet_prefix = f"{S3_BUCKET}/parquet"
if fs.exists(parquet_prefix):
    parquet_files = [f"s3://{f}" for f in fs.glob(f"{parquet_prefix}/**/*.parquet")]
    print(f"Found {len(parquet_files)} parquet files:")
    for f in parquet_files[:10]:
        print(f"  {f}")
else:
    parquet_files = []
    print("No parquet")

Found 9874 parquet files:
  s3://dlh-lab/parquet/HISKSE_ROZLMOCYOS/HISKSE_ROZLMOCYOS_20250501__20250501040330309.parquet
  s3://dlh-lab/parquet/HISKSE_ROZLMOCYOS/HISKSE_ROZLMOCYOS_20250501__20250502040307971.parquet
  s3://dlh-lab/parquet/HISKSE_ROZLMOCYOS/HISKSE_ROZLMOCYOS_20250502__20250502040310839.parquet
  s3://dlh-lab/parquet/HISKSE_ROZLMOCYOS/HISKSE_ROZLMOCYOS_20250502__20250503040313880.parquet
  s3://dlh-lab/parquet/HISKSE_ROZLMOCYOS/HISKSE_ROZLMOCYOS_20250503__20250503040316747.parquet
  s3://dlh-lab/parquet/HISKSE_ROZLMOCYOS/HISKSE_ROZLMOCYOS_20250503__20250504040409839.parquet
  s3://dlh-lab/parquet/HISKSE_ROZLMOCYOS/HISKSE_ROZLMOCYOS_20250504__20250504040413116.parquet
  s3://dlh-lab/parquet/HISKSE_ROZLMOCYOS/HISKSE_ROZLMOCYOS_20250504__20250505040318762.parquet
  s3://dlh-lab/parquet/HISKSE_ROZLMOCYOS/HISKSE_ROZLMOCYOS_20250505__20250505040321602.parquet
  s3://dlh-lab/parquet/HISKSE_ROZLMOCYOS/HISKSE_ROZLMOCYOS_20250505__20250506040316635.parquet


In [14]:
# Read a parquet file with Polars
if parquet_files:
    df = pl.read_parquet(parquet_files[0], storage_options=storage_options)
    print(f"File: {parquet_files[0]}")
    print(f"Rows: {len(df)}, Columns: {df.columns}")
    df.head()

File: s3://dlh-lab/parquet/HISKSE_ROZLMOCYOS/HISKSE_ROZLMOCYOS_20250501__20250501040330309.parquet
Rows: 1428, Columns: ['UTC_ZAP', 'UTCTIME', 'MRID_ZAS', 'OW_KOD', 'ZN_BR_NET_KOD', 'ID_PALIWA', 'TYP_AKT', 'TYP_DYSP', 'ZIARNO_CZ', 'DOD_PRZEC', 'DOD_INW', 'UB_REMONT_KAP', 'UBREMSR', 'UBREMBIEZ', 'UBREMAW', 'UBCIEP', 'UBEKSPL', 'UBINWEST', 'SUMUB', 'P_OS', 'UBSIEC', 'OBCIAZ', 'REZIM', 'P_DYSP', 'REZWIR', 'P_DYSP_RUCH', 'RERUCHZIM', 'RERUCHWIR', 'P_INST', 'BO']


In [None]:
# Read ALL parquet files, union with schema merge
if parquet_files:
    dfs = [pl.read_parquet(f, storage_options=storage_options) for f in parquet_files]
    combined = pl.concat(dfs, how="diagonal")
    print(f"Combined: {len(combined)} rows, {len(combined.columns)} columns")
    combined.head(10)

---
## 3. Iceberg Spark

In [2]:
from pyspark.sql import SparkSession

ICEBERG_SPARK_JAR = "org.apache.iceberg:iceberg-spark-runtime-4.0_2.13:1.10.0"
HADOOP_AWS_JAR = "org.apache.hadoop:hadoop-aws:3.4.1"

warehouse_path = f"s3a://{S3_BUCKET}/iceberg-rammi"

spark = (
    SparkSession.builder
    .appName("iceberg-explorer")
    .master("local[2]")
    .config("spark.jars.packages", f"{ICEBERG_SPARK_JAR},{HADOOP_AWS_JAR}")
    # Iceberg catalog
    .config("spark.sql.catalog.iceberg_test", "org.apache.iceberg.spark.SparkCatalog")
    .config("spark.sql.catalog.iceberg_test.type", "hadoop")
    .config("spark.sql.catalog.iceberg_test.warehouse", warehouse_path)
    # S3 / Hadoop config
    .config("spark.hadoop.fs.s3a.endpoint", S3_ENDPOINT)
    .config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY)
    .config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_KEY)
    .config("spark.hadoop.fs.s3a.path.style.access", "true")
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
    .config("spark.hadoop.fs.s3a.connection.ssl.enabled", "true")
    # Iceberg extensions
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions")
    .config("spark.sql.defaultCatalog", "iceberg_test")
    .getOrCreate()
)

print(f"Spark: {spark.version}")
print(f"Warehouse: {warehouse_path}")

Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/02/16 14:38:05 WARN Utils: Your hostname, LNCWA1I05373, resolves to a loopback address: 127.0.1.1; using 10.255.255.254 instead (on interface lo)
26/02/16 14:38:05 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
:: loading settings :: url = jar:file:/home/ca743/Work/playground/data-k8s-playground/components/de/iceberg/upload/.venv/lib/python3.12/site-packages/pyspark/jars/ivy-2.5.3.jar!/org/apache/ivy/core/settings/ivysettings.xml
Ivy Default Cache set to: /home/ca743/.ivy2.5.2/cache
The jars for the packages stored in: /home/ca743/.ivy2.5.2/jars
org.apache.iceberg#iceberg-spark-runtime-4.0_2.13 added as a dependency
org.apache.hadoop#hadoop-aws added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-9066c3d5-4ec7-4b34-b4bb-43cadaa6c84a;1.0
	confs: [default]
	found org.apache.iceberg#iceberg-spark-runtime-4.0_2.13;1.10.0 in central
	found org.apache.

Spark: 4.1.1
Warehouse: s3a://dlh-lab/iceberg-rammi


In [3]:
# List ns
spark.sql("SHOW NAMESPACES").show(truncate=False)

SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


+---------+
|namespace|
+---------+
|hip      |
+---------+



In [4]:
# List all tables in the default namespace
namespaces = [row[0] for row in spark.sql("SHOW NAMESPACES").collect()]
for ns in namespaces:
    print(f"\n=== {ns} ===")
    spark.sql(f"SHOW TABLES IN {ns}").show(truncate=False)


=== hip ===
+---------+-----------------+-----------+
|namespace|tableName        |isTemporary|
+---------+-----------------+-----------+
|hip      |ZRU_USE          |false      |
|hip      |RBES_ZASGENMAG   |false      |
|hip      |OMSR_PLANGEN     |false      |
|hip      |HISKSE_ROZLMOCYOS|false      |
|hip      |PBM_QN           |false      |
+---------+-----------------+-----------+



In [11]:
TABLE = "hip.ZRU_USE"

df_spark = spark.table(TABLE)
print(f"Table: {TABLE}")
print(f"Rows: {df_spark.count()}")
df_spark.printSchema()
df_spark.show(10, truncate=False)

Table: hip.ZRU_USE
Rows: 100310304
root
 |-- DOBA: string (nullable = true)
 |-- UDT_DO: string (nullable = true)
 |-- RODZ_RB: string (nullable = true)
 |-- JB_ZGL: string (nullable = true)
 |-- JB_PH: string (nullable = true)
 |-- TYP_DOK_ZRU: string (nullable = true)
 |-- NR_DOK_ZRU: string (nullable = true)
 |-- NR_DOK_ZRD: string (nullable = true)
 |-- NR_PORZ_ZRU: string (nullable = true)
 |-- UDT_DOK: string (nullable = true)
 |-- ZN_AKT_RBN: string (nullable = true)
 |-- ZN_AKT_RBB: string (nullable = true)
 |-- ZN_PRZET: string (nullable = true)
 |-- NR_ITER_WER: string (nullable = true)
 |-- TYP_TRAN_USE: string (nullable = true)
 |-- ZIARNO_CZ: string (nullable = true)
 |-- POZ_USE_QTY: string (nullable = true)
 |-- _source_file: string (nullable = true)



[Stage 15:>                                                         (0 + 1) / 1]

+----------+-------------------+-------+----------------+----------------+-----------+------------------------------------+------------------------------------+-----------+-------------------+----------+----------+--------+-----------+------------+---------+-----------+----------------------------------------------------------------------------+
|DOBA      |UDT_DO             |RODZ_RB|JB_ZGL          |JB_PH           |TYP_DOK_ZRU|NR_DOK_ZRU                          |NR_DOK_ZRD                          |NR_PORZ_ZRU|UDT_DOK            |ZN_AKT_RBN|ZN_AKT_RBB|ZN_PRZET|NR_ITER_WER|TYP_TRAN_USE|ZIARNO_CZ|POZ_USE_QTY|_source_file                                                                |
+----------+-------------------+-------+----------------+----------------+-----------+------------------------------------+------------------------------------+-----------+-------------------+----------+----------+--------+-----------+------------+---------+-----------+----------------------------------

                                                                                

In [7]:

spark.sql(f"SELECT * FROM {TABLE}.snapshots").show(truncate=False)

+-----------------------+-------------------+---------+---------+-------------------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|committed_at           |snapshot_id        |parent_id|operation|manifest_list                                                                                                                  |summary                                                                                                                               

In [8]:

spark.sql(f"SELECT file_path, file_format, record_count, file_size_in_bytes FROM {TABLE}.files").show(truncate=False)

+---------------------------------------------------------------------------------------------------------------------+-----------+------------+------------------+
|file_path                                                                                                            |file_format|record_count|file_size_in_bytes|
+---------------------------------------------------------------------------------------------------------------------+-----------+------------+------------------+
|s3a://dlh-lab/iceberg-rammi/hip/HISKSE_ROZLMOCYOS/data/00000-146-b890bf7d-c18c-457d-adb5-2ef52a69a269-0-00001.parquet|PARQUET    |1989118     |20690275          |
|s3a://dlh-lab/iceberg-rammi/hip/HISKSE_ROZLMOCYOS/data/00001-147-b890bf7d-c18c-457d-adb5-2ef52a69a269-0-00001.parquet|PARQUET    |1976690     |20272480          |
|s3a://dlh-lab/iceberg-rammi/hip/HISKSE_ROZLMOCYOS/data/00002-148-b890bf7d-c18c-457d-adb5-2ef52a69a269-0-00001.parquet|PARQUET    |1963008     |19856788          |
|s3a://dlh-lab/i

In [None]:
parquet_s3_path = f"s3a://{S3_BUCKET}/source/parquet"

df_raw = spark.read.option("mergeSchema", "true").parquet(f"{parquet_s3_path}/*/*")
print(f"Raw parquet rows: {df_raw.count()}")
df_raw.printSchema()
df_raw.show(5, truncate=False)

In [9]:

spark.sql("""
    SELECT count(*) as cnt
    FROM hip.HISKSE_ROZLMOCYOS
""").show(truncate=False)

+-------+
|cnt    |
+-------+
|9101352|
+-------+



In [10]:
spark.sql("""
    SELECT count(*) as cnt
    FROM hip.PBM_QN
""").show(truncate=False)

+--------+
|cnt     |
+--------+
|95541312|
+--------+



In [13]:
spark.sql("""
    SELECT
        -- dokument_zru mapping
        CAST(DOBA AS DATE) AS doba_pl,
        JB_ZGL AS jb_zglaszajacego_kod,
        NR_DOK_ZRD AS dok_zrodl_mrid,
        NR_DOK_ZRU AS mrid,
        CAST(NR_ITER_WER AS INT) AS nr_pierszej_iteracji_wer,
        CAST(NR_PORZ_ZRU AS DECIMAL(14,0)) AS nr_porzadkowy_zru,
        RODZ_RB AS rodzaj_rb,
        TYP_DOK_ZRU AS typ_dok_zru_kod,
        CAST(UDT_DOK AS TIMESTAMP) AS dtczas_dok,
        ZN_AKT_RBN AS zn_akt_dok_zru_rbn_kod,
        ZN_PRZET AS zn_przetwarzania_kod,
        -- umowy_sprzedazy_energii mapping
        JB_PH AS jb_partnera_hand_kod,
        NR_DOK_ZRU AS dok_zru_mrid,
        CAST(POZ_USE_QTY AS DECIMAL(9,3)) AS sr_moc_use,
        TYP_TRAN_USE AS typ_transakcji_kod,
        CAST(UDT_DO AS TIMESTAMP) AS czas_do_utc,
        -- unmapped
        ZN_AKT_RBB,
        ZIARNO_CZ
    FROM hip.ZRU_USE
""").show(25, truncate=False)

[Stage 17:>                                                         (0 + 1) / 1]

+----------+--------------------+------------------------------------+------------------------------------+------------------------+-----------------+---------+---------------+-------------------+----------------------+--------------------+--------------------+------------------------------------+----------+------------------+-------------------+----------+---------+
|doba_pl   |jb_zglaszajacego_kod|dok_zrodl_mrid                      |mrid                                |nr_pierszej_iteracji_wer|nr_porzadkowy_zru|rodzaj_rb|typ_dok_zru_kod|dtczas_dok         |zn_akt_dok_zru_rbn_kod|zn_przetwarzania_kod|jb_partnera_hand_kod|dok_zru_mrid                        |sr_moc_use|typ_transakcji_kod|czas_do_utc        |ZN_AKT_RBB|ZIARNO_CZ|
+----------+--------------------+------------------------------------+------------------------------------+------------------------+-----------------+---------+---------------+-------------------+----------------------+--------------------+--------------------

                                                                                

In [None]:
spark.stop()
print("Spark stopped.")