In [1]:
%run "./nb_logging"

StatementMeta(, e9e8d624-10e0-4f06-9fbd-8070c400c7e5, 5, Finished, Available, Finished)

Config Loaded
Logging System Loaded


In [2]:
from pyspark.sql import SparkSession, DataFrame
from pyspark.sql import DataFrame 
from pyspark.sql.functions import *
from pyspark.sql.types import *
from delta.tables import *
import re

spark = SparkSession.builder.getOrCreate()

StatementMeta(, e9e8d624-10e0-4f06-9fbd-8070c400c7e5, 6, Finished, Available, Finished)

In [3]:
# ===============================
# COLUMN CLEANING
# ===============================
def clean_column_names(df: DataFrame) -> DataFrame:
    """
    Converts DataFrame column names to snake_case
    """
    def to_snake_case(name: str) -> str:
        name = re.sub(r"[^\w\s]", "", name)
        name = re.sub(r"\s+", "_", name)
        return name.lower()

    new_columns = [to_snake_case(c) for c in df.columns]
    return df.toDF(*new_columns)

StatementMeta(, e9e8d624-10e0-4f06-9fbd-8070c400c7e5, 7, Finished, Available, Finished)

In [4]:
# ===============================
# EXPLODE & FLATTEN
# ===============================

def explode_and_flatten(df):
    for field in df.schema.fields:
        if isinstance(field.dataType, ArrayType) and isinstance(field.dataType.elementType, StructType):
            struct_name = field.name
            df = df.withColumn(struct_name, explode(col(struct_name)))
            for f in field.dataType.elementType.fields:
                df = df.withColumn(f"{struct_name}_{f.name}", col(f"{struct_name}.{f.name}"))
            df = df.drop(struct_name)
        elif isinstance(field.dataType, StructType):
            struct_name = field.name
            array_fields = [
                f"{struct_name}.{f.name}"
                for f in field.dataType.fields
                if isinstance(f.dataType, ArrayType)
            ]
            if not array_fields:
                continue
            df = df.withColumn(
                f"{struct_name}_zip",
                arrays_zip(*[col(c) for c in array_fields])
            ).withColumn(
                f"{struct_name}_zip",
                explode(col(f"{struct_name}_zip"))
            )
            for f in field.dataType.fields:
                if isinstance(f.dataType, ArrayType):
                    df = df.withColumn(
                        f"{struct_name}_{f.name}",
                        col(f"{struct_name}_zip.{f.name}")
                    )
            df = df.drop(f"{struct_name}_zip")
    return df

StatementMeta(, e9e8d624-10e0-4f06-9fbd-8070c400c7e5, 8, Finished, Available, Finished)

In [5]:
# ===============================
# SAVE TO LAKEHOUSE
# ===============================
def save_to_lakehouse(
    df: DataFrame,
    path: str,
    source: str,
    mode: str = "overwrite"
):
    """
    Saves DataFrame to Lakehouse with logging
    """
    try:
        log_process_start(source, "save_to_lakehouse")
        df.write.mode(mode).parquet(path)
        log_process_end(source, "save_to_lakehouse")
    except Exception as e:
        log_error(source, "save_to_lakehouse", f"Error saving to {path}", str(e))
        raise

StatementMeta(, e9e8d624-10e0-4f06-9fbd-8070c400c7e5, 9, Finished, Available, Finished)

In [6]:
# ===============================
# SURROGATE KEY
# ===============================
def generate_surrogate_key(
    df: DataFrame,
    columns: list,
    key_name: str = "surrogate_key"
) -> DataFrame:
    """
    Generates MD5 hash surrogate key
    """
    return df.withColumn(key_name, md5(concat_ws("||", *columns)))

StatementMeta(, e9e8d624-10e0-4f06-9fbd-8070c400c7e5, 10, Finished, Available, Finished)

In [7]:
# ===============================
# TIME SERIES FRAME
# ===============================
def create_time_series_frame(
    df: DataFrame,
    time_column: str = "timestamp"
) -> DataFrame:
    """
    Adds date column from timestamp
    """
    return df.withColumn("date", to_date(col(time_column)))

StatementMeta(, e9e8d624-10e0-4f06-9fbd-8070c400c7e5, 11, Finished, Available, Finished)

In [8]:
# ===============================
# JOIN HELPER
# ===============================
def join_dataframes(
    df1: DataFrame,
    df2: DataFrame,
    join_cols: list,
    how: str = "inner"
) -> DataFrame:
    """
    Joins two DataFrames
    """
    return df1.join(df2, on=join_cols, how=how)

print("Functions Loaded")

StatementMeta(, e9e8d624-10e0-4f06-9fbd-8070c400c7e5, 12, Finished, Available, Finished)

Functions Loaded


In [9]:
def set_columns_safe(df: DataFrame, cols: list) -> DataFrame:
    """
    Select only existing columns (safe select)
    """
    existing_cols = [c for c in cols if c in df.columns]
    return df.select(*existing_cols)

StatementMeta(, e9e8d624-10e0-4f06-9fbd-8070c400c7e5, 13, Finished, Available, Finished)

In [10]:
def filter_last_7_days(df: DataFrame, timestamp_col: str) -> DataFrame:
    """
    DataFrame'i timestamp_col üzerinden son 7 günle sınırlı tutar.
    """
    return df.filter(col(timestamp_col) >= date_sub(current_date(), 7))

StatementMeta(, e9e8d624-10e0-4f06-9fbd-8070c400c7e5, 14, Finished, Available, Finished)

# -------------------------------
# Functions(DataFrame)
# -------------------------------
def clean_column_names(df: DataFrame) -> DataFrame:
    def to_snake_case(name):
        name = re.sub(r'[^0-9a-zA-Z_]', '', name)
        name = re.sub(r'\s+', '_', name)
        return name.lower()
    new_columns = [to_snake_case(c) for c in df.columns]
    return df.toDF(*new_columns)

def explode_and_flatten(df: DataFrame) -> DataFrame:
    while True:
        flatten_cols = [f.name for f in df.schema.fields if isinstance(f.dataType, StructType)]
        explode_cols  = [f.name for f in df.schema.fields if isinstance(f.dataType, ArrayType)]
        if not flatten_cols and not explode_cols:
            break
        for c in flatten_cols:
            struct_fields = [f.name for f in df.schema[c].dataType.fields]
            flatten_expr = [col(f"{c}.{f}").alias(f"{c}_{f}") for f in struct_fields]
            df = df.select([col(c) for c in df.columns if c != c] + flatten_expr)
        for c in explode_cols:
            df = df.withColumn(c, explode(col(c)))
    return df

def create_time_series_frame(df: DataFrame, time_column: str) -> DataFrame:
    return df.withColumn("date", to_date(col(time_column)))

def generate_surrogate_key(df: DataFrame, columns: list, key_name="surrogate_key") -> DataFrame:
    return df.withColumn(key_name, md5(concat_ws("||", *columns)))

def save_to_lakehouse(df: DataFrame, path: str, source: str, mode="overwrite"):
    try:
        df.write.mode(mode).parquet(path)
        log_process_end(source, "save_to_lakehouse")
    except Exception as e:
        log_error(source, "save_to_lakehouse", f"Error saving {path}", e)
        
