In [1]:
# %%
# ✅ Cell 1: Setup & Start Spark Session

import sys, os
sys.path.append("/app")  # Mount path for container

import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, trim, regexp_replace, coalesce, lit,
    pandas_udf, udf, when, concat_ws
)
from pyspark.sql.types import StringType
import unicodedata

from utils.notebook_setup import enable_project_imports
enable_project_imports()

from utils.io_helpers import list_files_with_aliases, load_file_with_alias

# Start Spark
spark = SparkSession.builder.appName("Pyspark - Multi-file Clean & Transform").getOrCreate()
spark.sparkContext.setLogLevel("ERROR")

✅ [notebook_setup] Project root already in sys.path.


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/29 14:25:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
25/03/29 14:25:59 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
25/03/29 14:25:59 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [2]:
# %%
# ✅ Cell 2: Load ALL files into a single DataFrame

data_type = "json"
input_folder = f"/app/data/input/practice/{data_type}"

aliases = list_files_with_aliases(input_folder, ext=data_type)
print("📄 Loaded files:")

# Load and union all dataframes
combined_df = None

for alias, path in aliases.items():
    df = load_file_with_alias(spark, input_folder, alias, ext=data_type)
    print(f"✅ Loaded {alias}: {path}")
    combined_df = df if combined_df is None else combined_df.unionByName(df, allowMissingColumns=True)

combined_df.show(5, truncate=False)

📄 Loaded files:
📥 Loading file: file1 → /app/data/input/practice/json/part-00000-46e2d8a4-96d4-4b0e-a4a9-992dbceb1653-c000.json


                                                                                

✅ Loaded file1: /app/data/input/practice/json/part-00000-46e2d8a4-96d4-4b0e-a4a9-992dbceb1653-c000.json
📥 Loading file: file2 → /app/data/input/practice/json/part-00001-46e2d8a4-96d4-4b0e-a4a9-992dbceb1653-c000.json
✅ Loaded file2: /app/data/input/practice/json/part-00001-46e2d8a4-96d4-4b0e-a4a9-992dbceb1653-c000.json
📥 Loading file: file3 → /app/data/input/practice/json/part-00002-46e2d8a4-96d4-4b0e-a4a9-992dbceb1653-c000.json
✅ Loaded file3: /app/data/input/practice/json/part-00002-46e2d8a4-96d4-4b0e-a4a9-992dbceb1653-c000.json
+-----------------------+-------+---------+--------------------+------+-------+
|extra                  |inactiv|nume     |ocupatie            |varsta|vechime|
+-----------------------+-------+---------+--------------------+------+-------+
|[PV, EV]               |NULL   |Andrei   |Specialist marketing|38    |13     |
|[3D Printer, WII]      |NULL   |Alexandru|Specialist HR       |34    |8      |
|[AC, EV, 5G Router]    |NULL   |Adrian   |Inginer civil       |

In [3]:
# ✅ Cell 3: Data Cleaning functions

@pandas_udf(StringType())
def remove_accents(series: pd.Series) -> pd.Series:
    return series.apply(lambda s: ''.join(
        c for c in unicodedata.normalize('NFD', s) if unicodedata.category(c) != 'Mn'
    ) if pd.notnull(s) else s)

@udf(StringType())
def remove_duplicates(val):
    if not val:
        return None
    if isinstance(val, list):
        cleaned = [str(x).strip() for x in val if x]
    elif isinstance(val, str):
        cleaned = [x.strip() for x in val.split(',') if x]
    else:
        return str(val).strip()
    return ', '.join(sorted(set(cleaned)))


In [4]:
# %%
# ✅ Cell 4: Apply Cleaning & Transformation

clean_df = combined_df \
    .withColumn("nume", remove_accents(trim(col("nume")))) \
    .withColumn("ocupatie", remove_accents(trim(regexp_replace(col("ocupatie"), r"\s+", " ")))) \
    .withColumn("extra", remove_duplicates(col("extra"))) \
    .withColumn("inactiv", coalesce(col("inactiv"), lit(False))) \
    .withColumn("varsta_la_contractare", col("varsta") - col("vechime")) \
    .drop("vechime") \
    .withColumn("text_descriptiv", 
        when(col("extra").isNull() | (col("extra") == ""), 
             concat_ws(" ", col("nume"), lit("în vârstă de"), col("varsta"), lit("ani este"), col("ocupatie")))
        .otherwise(
             concat_ws(" ", col("nume"), lit("în vârstă de"), col("varsta"), lit("ani este"), col("ocupatie"), lit("și deține:"), col("extra"))
        )
    )

clean_df.cache()
clean_df.show(5, truncate=False)



+---------------------+-------+---------+--------------------+------+---------------------+-------------------------------------------------------------------------------+
|extra                |inactiv|nume     |ocupatie            |varsta|varsta_la_contractare|text_descriptiv                                                                |
+---------------------+-------+---------+--------------------+------+---------------------+-------------------------------------------------------------------------------+
|EV, PV               |false  |Andrei   |Specialist marketing|38    |25                   |Andrei în vârstă de 38 ani este Specialist marketing și deține: EV, PV         |
|3D Printer, WII      |false  |Alexandru|Specialist HR       |34    |26                   |Alexandru în vârstă de 34 ani este Specialist HR și deține: 3D Printer, WII    |
|5G Router, AC, EV    |false  |Adrian   |Inginer civil       |45    |22                   |Adrian în vârstă de 45 ani este Inginer civil și 

                                                                                

In [7]:
# %%
# ✅ Cell 5: Validation (in Pandas)

pdf = clean_df.toPandas()
pdf.head(10)

# ✅ Ensure no diacritics and trimmed values
assert pdf['nume'].apply(lambda x: x == x.strip()).all()
assert pdf['ocupatie'].apply(lambda x: x == x.strip()).all()
assert pdf['nume'].apply(lambda x: all(unicodedata.category(c) != "Mn" for c in unicodedata.normalize("NFD", x))).all()

# ✅ 'extra' checks
assert pdf['extra'].notna().sum() > 0
assert pdf['extra'].dropna().apply(lambda x: len(x.split(",")) == len(set(x.split(",")))).all()

# ✅ 'inactiv' checks
assert pdf['inactiv'].apply(lambda x: isinstance(x, bool)).all()

print("✅ All validation checks passed!")

# %%
# ✅ Cell 6: Analysis examples

# Example 1: Clients aged top 10 (active)
clean_df.filter(col("inactiv") == False).orderBy(col("varsta").desc()).show(10, truncate=False)

# Example 2: Oldest at contract (top 5 active)
clean_df.filter(col("inactiv") == False).orderBy(col("varsta_la_contractare").desc()).show(5, truncate=False)


✅ All validation checks passed!
+---------------------+-------+--------+---------------+------+---------------------+----------------------------------------------------------------------------------+
|extra                |inactiv|nume    |ocupatie       |varsta|varsta_la_contractare|text_descriptiv                                                                   |
+---------------------+-------+--------+---------------+------+---------------------+----------------------------------------------------------------------------------+
|3D Printer, AC, PC   |false  |Bogdan  |Farmacist      |50    |18                   |Bogdan în vârstă de 50 ani este Farmacist și deține: 3D Printer, AC, PC           |
|3D Printer, PC, PS5  |false  |Viorel  |Farmacist      |50    |18                   |Viorel în vârstă de 50 ani este Farmacist și deține: 3D Printer, PC, PS5          |
|3D Printer, AC       |false  |Carmen  |Farmacist      |50    |18                   |Carmen în vârstă de 50 ani este Farmac