# Playground

## Initialize globals

In [1]:
import sys
import os
import io
import shutil
import time

from uuid import uuid4
from typing import Any
from contextlib import redirect_stdout
from pyspark.sql import SparkSession

sys.path.append("../src")

from fabricengineer.transform.mlv.mlv import MaterializedLakeView
from fabricengineer.transform.silver.insertonly import SilverIngestionInsertOnlyService, ConstantColumn, LakehouseTable as LakehouseTableIO, get_mock_table_path
from fabricengineer.logging import TimeLogger, logger

mlv: MaterializedLakeView
timer: TimeLogger

In [2]:
class NotebookUtilsFSMock:
    def _get_path(self, file: str) -> str:
        return os.path.join(os.getcwd(), file)

    def exists(self, path: str) -> bool:
        return os.path.exists(self._get_path(path))

    def put(
        self,
        file: str,
        content: str,
        overwrite: bool = False
    ) -> None:
        path = self._get_path(file)
        os.makedirs(os.path.dirname(path), exist_ok=True)

        if os.path.exists(path) and not overwrite:
            raise FileExistsError(f"File {path} already exists and overwrite is set to False.")
        with open(path, 'w') as f:
            f.write(content)


class NotebookUtilsMock:
    def __init__(self):
        self.fs = NotebookUtilsFSMock()

global spark
spark: SparkSession = SparkSession.builder.appName("PlaygroundSparkSession").getOrCreate()

global notebookutils
notebookutils = NotebookUtilsMock()

25/08/05 10:50:46 WARN Utils: Your hostname, MacBook-Air-von-Enrico.local resolves to a loopback address: 127.0.0.1; using 192.168.0.7 instead (on interface en0)
25/08/05 10:50:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/05 10:50:46 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/08/05 10:50:47 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [3]:
import io
import logging
from typing import Any, Callable
from contextlib import contextmanager

@contextmanager
def capture_logs(logger: logging.Logger):
    log_stream = io.StringIO()
    handler = logging.StreamHandler(log_stream)
    handler.setLevel(logging.DEBUG)  # Fang alles ab
    formatter = logging.Formatter("[%(asctime)s] [%(levelname)s] %(filename)s %(message)s", "%d.%m.%Y %H:%M:%S,%f")
    handler.setFormatter(formatter)

    logger.addHandler(handler)
    try:
        yield log_stream
    finally:
        logger.removeHandler(handler)

def sniff_logs(logger: logging.Logger, fn: Callable[[], Any]) -> tuple[Any, list[str]]:
    with capture_logs(logger) as log_stream:
        result = fn()
    logs = log_stream.getvalue().splitlines()
    return result, logs


In [4]:
import os
import shutil


def cleanup_fs():
    path_Files = notebookutils.fs._get_path("Files")
    path_tmp = notebookutils.fs._get_path("tmp")
    path_tmp_2 = "../tmp"
    path_tmp_3 = "../Files"

    rm_paths = [path_Files, path_tmp, path_tmp_2, path_tmp_3]
    for path in rm_paths:
        if os.path.exists(path):
            shutil.rmtree(path)

cleanup_fs()

## TimeLogger

In [5]:
with open("../src/fabricengineer/logging/timer.py") as f:
    code = f.read()
exec(code, globals())

timer

TimeLogger(start_time=None, end_time=None, elapsed_time=None)

In [6]:
timer.start().log()
time.sleep(1)
timer.stop().log()

[05.08.2025 10:50:47] [INFO] fabricengineer TIMER-START:	2025-08-05 10:50:47
[05.08.2025 10:50:48] [INFO] fabricengineer TIMER-END:	2025-08-05 10:50:48, ELAPSED: 1.0036s


## MaterializedLakeView

In [7]:
with open("../src/fabricengineer/transform/mlv/mlv.py") as f:
    code = f.read()
exec(code, globals())


mlv.init(
    lakehouse="Lakehouse",
    schema="schema",
    table="table",
    table_suffix=None,
    is_testing_mock=True
)

mlv.to_dict()

{'lakehouse': 'Lakehouse',
 'schema': 'schema',
 'table': 'table',
 'table_path': 'Lakehouse.schema.table'}

In [8]:
mlv.init(
    lakehouse=str(uuid4()),
    schema="schema",
    table="table",
    table_suffix=None,
    is_testing_mock=True
)

sql = """
SELECT * FROM Lakehouse.schema.table
"""
is_existing = False
for i in range(0, 4):
    if i > 0:
        is_existing = True
    if i == 2:
        sql = """
        SELECT * FROM Lakehouse.schema.table WHERE 1=0
        """
    result, logs = sniff_logs(
        logger,
        lambda: mlv.create_or_replace(sql, mock_is_existing=is_existing),
    )
    print(f"Logs-{i+1}")
    display(logs)

[05.08.2025 10:50:48] [INFO] fabricengineer CREATE SCHEMA IF NOT EXISTS b889b89c-d0a4-4ae2-be2b-aa31db3f0afc.schema
[05.08.2025 10:50:48] [INFO] fabricengineer CREATE MLV: b889b89c-d0a4-4ae2-be2b-aa31db3f0afc.schema.table


Logs-1


['[05.08.2025 10:50:48,f] [INFO] <string> CREATE SCHEMA IF NOT EXISTS b889b89c-d0a4-4ae2-be2b-aa31db3f0afc.schema',
 '[05.08.2025 10:50:48,f] [INFO] <string> CREATE MLV: b889b89c-d0a4-4ae2-be2b-aa31db3f0afc.schema.table']

[05.08.2025 10:50:48] [INFO] fabricengineer Nothing has changed.


Logs-2


['[05.08.2025 10:50:48,f] [INFO] <string> Nothing has changed.']

[05.08.2025 10:50:48] [INFO] fabricengineer REPLACE MLV: b889b89c-d0a4-4ae2-be2b-aa31db3f0afc.schema.table
[05.08.2025 10:50:48] [INFO] fabricengineer DROP MATERIALIZED LAKE VIEW IF EXISTS b889b89c-d0a4-4ae2-be2b-aa31db3f0afc.schema.table
[05.08.2025 10:50:48] [INFO] fabricengineer CREATE SCHEMA IF NOT EXISTS b889b89c-d0a4-4ae2-be2b-aa31db3f0afc.schema
[05.08.2025 10:50:48] [INFO] fabricengineer CREATE MLV: b889b89c-d0a4-4ae2-be2b-aa31db3f0afc.schema.table


Logs-3


['[05.08.2025 10:50:48,f] [INFO] <string> REPLACE MLV: b889b89c-d0a4-4ae2-be2b-aa31db3f0afc.schema.table',
 '[05.08.2025 10:50:48,f] [INFO] <string> DROP MATERIALIZED LAKE VIEW IF EXISTS b889b89c-d0a4-4ae2-be2b-aa31db3f0afc.schema.table',
 '[05.08.2025 10:50:48,f] [INFO] <string> CREATE SCHEMA IF NOT EXISTS b889b89c-d0a4-4ae2-be2b-aa31db3f0afc.schema',
 '[05.08.2025 10:50:48,f] [INFO] <string> CREATE MLV: b889b89c-d0a4-4ae2-be2b-aa31db3f0afc.schema.table']

[05.08.2025 10:50:48] [INFO] fabricengineer Nothing has changed.


Logs-4


['[05.08.2025 10:50:48,f] [INFO] <string> Nothing has changed.']

25/08/05 10:51:01 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors


In [9]:
mlv.file_path
os.remove(mlv.file_path)

In [10]:
result, logs = sniff_logs(
    lambda: mlv.create_or_replace(sql, mock_is_existing=True)
)

logs

['WARN: file=None, is_existing=True. RECREATE.',
 'DROP MATERIALIZED LAKE VIEW IF EXISTS d0f584a9-ad2b-41b5-92f8-e4d8dddce28a.schema.table',
 'CREATE SCHEMA IF NOT EXISTS d0f584a9-ad2b-41b5-92f8-e4d8dddce28a.schema',
 'CREATE MLV: d0f584a9-ad2b-41b5-92f8-e4d8dddce28a.schema.table']

In [11]:
result, logs = sniff_logs(
    lambda: mlv.refresh(full_refresh=True)
)

logs

['REFRESH MATERIALIZED LAKE VIEW d0f584a9-ad2b-41b5-92f8-e4d8dddce28a.schema.table FULL']

In [12]:
result, logs = sniff_logs(
    lambda: mlv.refresh(full_refresh=False)
)

logs

['REFRESH MATERIALIZED LAKE VIEW d0f584a9-ad2b-41b5-92f8-e4d8dddce28a.schema.table ']

## Clean up the file system

In [13]:
cleanup_fs()

## SilverIngestionInsertOnlyService

In [14]:
src_table = LakehouseTableIO(
    lakehouse="BronzeLakehouse",
    schema="schema",
    table="table1"
)
dest_table = LakehouseTableIO(
    lakehouse="SilverLakehouse",
    schema=src_table.schema,
    table=src_table.table
)

etl = SilverIngestionInsertOnlyService()
etl.init(
    spark_=spark,
    source_table=src_table,
    destination_table=dest_table,
    nk_columns=["id"],
    constant_columns=[],
    is_delta_load=False,
    delta_load_use_broadcast=True,
    transformations={},
    exclude_comparing_columns=None,
    include_comparing_columns=None,
    historize=True,
    partition_by_columns=None,
    is_testing_mock=True
)

In [None]:
from pyspark.sql import functions as F, types as T

schema = T.StructType([
    T.StructField("id", T.IntegerType(), False),
    T.StructField("name", T.StringType(), False),
    T.StructField("department_id", T.IntegerType(), False),
    T.StructField("created_at", T.StringType(), False),
    T.StructField("updated_at", T.StringType(), False),
])

data = [
    (1, "Alice", 1, "2023-01-01", "2023-01-01"),
    (2, "u-Bob", 2, "2023-01-01", "2023-01-01"),
    (3, "u-Charlie", 3, "2023-01-01", "2023-01-01"),
    (4, "David", 1, "2023-01-01", "2023-01-01"),
    (5, "Eve", 2, "2023-01-01", "2023-01-01"),
    (6, "Frank", 3, "2023-01-01", "2023-01-01"),
    # (7, "Grace", 1, "2023-01-01", "2023-01-01"),
    (8, "Heidi", 2, "2023-01-01", "2023-01-01"),
    (9, "Ivan", 3, "2023-01-01", "2023-01-01"),
    (10, "Judy", 1, "2023-01-01", "2023-01-01")
    ,(11, "Judy-2", 1, "2023-01-01", "2023-01-01")
    ,(12, "Judy-3", 1, "2023-01-01", "2023-01-01")
    ,(13, "Judy-4", 1, "2023-01-01", "2023-01-01")
]

df_bronze = spark.createDataFrame(data, schema)
df_bronze = df_bronze \
    .withColumn("created_at", F.to_timestamp("created_at")) \
    .withColumn("updated_at",F.to_timestamp("updated_at"))

df_bronze.show(truncate=False)
bronze_path = get_mock_table_path(etl._src_table)
df_bronze.write \
    .format("parquet") \
    .mode("overwrite") \
    .save(bronze_path)

                                                                                

+---+---------+-------------+-------------------+-------------------+
|id |name     |department_id|created_at         |updated_at         |
+---+---------+-------------+-------------------+-------------------+
|1  |Alice    |1            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|2  |u-Bob    |2            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|3  |u-Charlie|3            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|4  |David    |1            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|5  |Eve      |2            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|6  |Frank    |3            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|8  |Heidi    |2            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|9  |Ivan     |3            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|10 |Judy     |1            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|11 |Judy-2   |1            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|12 |Judy-3   |1            |2023-01-01 00:00:00|2023-01-01 00:00:00|
|13 |Judy-4   |1    

25/08/01 14:51:22 WARN MemoryManager: Total allocation exceeds 95,00% (1.020.054.720 bytes) of heap memory
Scaling row group sizes to 95,00% for 8 writers
                                                                                

In [16]:
new_data = etl.ingest()
new_data.orderBy("id").show(truncate=False)

DROP MATERIALIZED LAKE VIEW IF EXISTS SilverLakehouse.table1_h
MLV: CREATE MLV SilverLakehouse.table1_h
+------------------------------------+---+---+---------+-------------+-------------------+-------------------+--------------------------+--------------+
|PK                                  |NK |id |name     |department_id|created_at         |updated_at         |ROW_LOAD_DTS              |ROW_DELETE_DTS|
+------------------------------------+---+---+---------+-------------+-------------------+-------------------+--------------------------+--------------+
|48489805-b151-4e0d-aef0-273e7a42d2e7|1  |1  |Alice    |1            |2023-01-01 00:00:00|2023-01-01 00:00:00|2025-08-01 14:51:19.276059|NULL          |
|dd189728-04c8-4224-b99c-b4f8d0782eeb|2  |2  |u-Bob    |2            |2023-01-01 00:00:00|2023-01-01 00:00:00|2025-08-01 14:51:19.276059|NULL          |
|89837d41-61fe-48f1-a533-85518e4707d0|3  |3  |u-Charlie|3            |2023-01-01 00:00:00|2023-01-01 00:00:00|2025-08-01 14:51:19.2

In [None]:
silver_path = get_mock_table_path(etl._dest_table)
df = spark.read.format("parquet").load(silver_path).orderBy(F.col("id").asc(), F.col("ROW_LOAD_DTS").asc())

df.show(truncate=False)

+------------------------------------+---+---+---------+-------------+-------------------+-------------------+--------------------------+--------------+
|PK                                  |NK |id |name     |department_id|created_at         |updated_at         |ROW_LOAD_DTS              |ROW_DELETE_DTS|
+------------------------------------+---+---+---------+-------------+-------------------+-------------------+--------------------------+--------------+
|d85b9efb-4c1b-47af-9ab6-00a68518278c|1  |1  |Alice    |1            |2023-01-01 00:00:00|2023-01-01 00:00:00|2025-08-01 14:51:19.276059|NULL          |
|a6ab88ad-8d0c-490f-9ef5-6c4581543a1d|2  |2  |u-Bob    |2            |2023-01-01 00:00:00|2023-01-01 00:00:00|2025-08-01 14:51:19.276059|NULL          |
|2e2885c8-d923-4034-8a1a-5e62fb4a4d5d|3  |3  |u-Charlie|3            |2023-01-01 00:00:00|2023-01-01 00:00:00|2025-08-01 14:51:19.276059|NULL          |
|8b5020b8-7226-4673-a167-77b121001f64|4  |4  |David    |1            |2023-01-01 0

25/08/01 14:51:29 WARN GarbageCollectionMetrics: To enable non-built-in garbage collector(s) List(G1 Concurrent GC), users should configure it(them) to spark.eventLog.gcMetrics.youngGenerationGarbageCollectors or spark.eventLog.gcMetrics.oldGenerationGarbageCollectors
