## Zadanie 1

Flatten json - wczytaj 10 wybranych atrybutów z pliku brzydki.json (kolumna features).

In [0]:
from pyspark.sql.functions import explode, col

json_path = "/FileStore/tables/brzydki.json"
raw_df = spark.read.option("multiline", "true").json(json_path)

features_df = raw_df.select(explode(col("features")).alias("feature"))

selected_df = features_df.select(
    col("feature.properties.featureId").alias("id_cechy"),
    col("feature.properties.toid").alias("obiekt_docelowy"),
    col("feature.properties.changeEventType").alias("typ_zmiany"),
    col("feature.properties.jobReference").alias("referencja_zadania"),
    col("feature.properties.validFromTimestamp").alias("obowiazuje_od"),
    col("feature.geometry.type").alias("typ_geometrii"),
    col("feature.geometry.coordinates").alias("koordynaty"),
    col("feature.properties.baseFormComponent.form").alias("forma_podstawowa"),
    col("feature.properties.lifecycleStatusComponent.lifecycleStatus").alias("status_obiegu"),
    col("feature.properties.baseFunctionComponents")[0]["function"].alias("funkcja_podstawowa")
)

selected_df.show(truncate=False)


+------------------------------------+--------------------+----------+------------------+--------------------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+-------------+------------------+
|id_cechy                            |obiekt_docelowy     |typ_zmiany|referencja_zadania|obowiazuje_od       |typ_geometrii|koordynaty                                                                                                                                                 |forma_podstawowa  |status_obiegu|funkcja_podstawowa|
+------------------------------------+--------------------+----------+------------------+--------------------+-------------+-----------------------------------------------------------------------------------------------------------------------------------------------------------+------------------+-------------+------------------+
|

## Zadanie 2

Poka Yoke

Napisz 5 metody, które mogą być użyte w Pipeline tak aby tyły odporne na błędy użytkownika, jak najbardziej „produkcyjnie”. Możesz użyć tego co już stworzyłeś/laś i usprawnij rozwiązanie na bardziej odporne na błędy biorąc pod uwagę dobre praktyki.


**`validate_column_ranges(df, ranges)`**

Sprawdza, czy wartości liczbowe w kolumnach mieszczą się w określonych granicach.

In [0]:
def validate_column_ranges(df, ranges: dict):
    """
    Check whether numerical values in columns fall within the specified ranges.

    :param df: pd.DataFrame
    :param ranges: dict – mapping column name to (min, max) allowed values
    """
    for column, (low, high) in ranges.items():
        if column in df.columns:
            if not df[column].between(low, high).all():
                raise ValueError(f"Column '{column}' has values outside expected range {low}-{high}")


**`require_columns(df, expected_cols)`**

Zapewnia obecność niezbędnych kolumn w ramce danych.

In [0]:
def require_columns(df, expected_cols: list):
    """
    Ensure that all specified columns are present in the DataFrame.

    :param df: pd.DataFrame or pyspark.sql.DataFrame
    :param expected_cols: list of column names that must exist
    """
    present = df.columns
    missing = [c for c in expected_cols if c not in present]
    if missing:
        raise ValueError(f"Missing required columns: {missing}")


**`rename_columns_safely(df, mapping)`**

Bezpiecznie zmienia nazwy kolumn, tylko jeśli istnieją.

In [0]:
def rename_columns_safely(df, mapping: dict):
    """
    Rename columns only if all old names are present.

    :param df: pd.DataFrame
    :param mapping: dict with old -> new column names
    :return: DataFrame with renamed columns
    """
    absent = [k for k in mapping.keys() if k not in df.columns]
    if absent:
        raise KeyError(f"Cannot rename missing columns: {absent}")
    return df.rename(columns=mapping)


**`convert_columns(df, type_schema)`**

Konwertuje kolumny do określonych typów, jeśli to możliwe.

In [0]:
def convert_columns(df, type_schema: dict):
    """
    Try to convert column types to those provided in the schema.

    :param df: pd.DataFrame
    :param type_schema: dict {column: type}
    :return: DataFrame with converted columns
    """
    result = df.copy()
    for col_name, col_type in type_schema.items():
        if col_name in df.columns:
            try:
                result[col_name] = df[col_name].astype(col_type)
            except Exception as e:
                raise TypeError(f"Column '{col_name}' could not be cast to {col_type}: {e}")
    return result


**`check_unique_key(df, key_cols)`**

Weryfikuje, czy kombinacja kluczy jest unikalna.

In [0]:
def check_unique_key(df, key_cols: list):
    """
    Validate uniqueness of records based on given key columns.

    :param df: pd.DataFrame
    :param key_cols: list of columns forming a composite key
    """
    if df.duplicated(subset=key_cols).any():
        raise ValueError(f"Duplicate entries found for keys: {key_cols}")
