In [0]:
import re
import json
from pyspark.sql.functions import udf, col, concat_ws, lit, when, current_date
from pyspark.sql.types import StringType
from delta.tables import DeltaTable
from pyspark.sql import functions as F
from datetime import datetime
import traceback

In [0]:
print("Read data")

S3_RAW_PATH = "s3://cvee-bucket-eu-north-1/jobs_raw/"
DELTA_TABLE_PATH = "cvee.jobs_silver"

files = dbutils.fs.ls(S3_RAW_PATH)
parquet_files = [f.path for f in files if f.path.endswith(".parquet")]


def extract_date_from_name(path):
    match = re.search(r"jobs_raw_(\d{8}_\d{6})\.parquet", path)
    if match:
        return datetime.strptime(match.group(1), "%Y%m%d_%H%M%S")
    return datetime.min

latest_file_path = max(parquet_files, key=extract_date_from_name)

print(f"Reading data from S3 {latest_file_path} ")
df_raw = spark.read.parquet(latest_file_path)
print(f"Number of records read: {df_raw.count()}")
df_raw.printSchema()


In [0]:
print("Transform data")

# User defined functions

def clean_html(text):
    if text is None:
        return ""
    clean = re.compile('<.*?>|&([a-zA-Z0-9]+|#[0-9]{1,6}|#x[0-9a-fA-F]{1,6});')
    text = re.sub(clean, '', text)
    text = text.replace('\n', ' ').replace('\r', ' ').replace('\t', ' ')
    text = re.sub(' +', ' ', text).strip()
    return text


clean_html_udf = udf(clean_html, StringType())

df_cleaned = df_raw \
    .withColumn("description_clean", clean_html_udf(col("description"))) \
    .withColumn("competences_aggregated", 
                F.expr("concat_ws(' ', transform(competences, x -> x.libelle))")) \
    .withColumn("formations_aggregated", 
                F.expr("concat_ws(' ', transform(formations, x -> x.domaineLibelle))")) \
    .withColumn("qualites_aggregated", 
                F.expr("concat_ws(' ', transform(qualitesProfessionnelles, x -> x.libelle))")) \


df_final = df_cleaned.withColumn(
    "vector_text_input",
    concat_ws(
        " ",
        col("intitule"),
        col("description_clean"),
        col("competences_aggregated"),
        col("formations_aggregated"),
        col("qualites_aggregated")
    )
)



df_final = df_final.dropDuplicates(["id"])

df_final = df_final.drop("description", "competences_aggregated", "formations_aggregated", "qualites_aggregated")

df_final = df_final.withColumnRenamed("id", "job_id") \
                   .withColumnRenamed("description_clean", "description") \
                   .withColumn("ingestion_date", current_date()) \




In [0]:
print("Translation with ai_translate")

df_translated = df_final.withColumn(
    "vector_text_input_en",
    F.expr("ai_translate(vector_text_input, 'en')")
).drop("vector_text_input") \
    .withColumnRenamed("vector_text_input_en", "vector_text_input")

print(df_translated.columns)

In [0]:

print(f"Merging into Delta table {DELTA_TABLE_PATH}")
try:
    # Handle column mismatch
    target_schema = spark.table(DELTA_TABLE_PATH).schema
    df_source_aligned = df_translated.select(
        F.from_json(F.to_json(F.struct("*")), target_schema).alias("data")
    ).select("data.*")

    delta_table = DeltaTable.forName(spark, DELTA_TABLE_PATH)
    old_count = delta_table.toDF().count()

    delta_table.alias("target").merge(
        df_source_aligned.alias("source"),
        "target.job_id = source.job_id"
    ).whenNotMatchedInsertAll().execute()

    new_count = delta_table.toDF().count()
    print(f"Number of rows added: {new_count - old_count} / {df_source_aligned.count()} read")
except Exception as e:
    #df_translated.write.format("delta").mode("overwrite").saveAsTable(DELTA_TABLE_PATH)
    #print(f"Table created with {df_translated.count()} rows")
    print("\n Merge failed")
    print(f"Error Type : {type(e).__name__}")
    print(f"Error Message : {e}")
    print("\nStacktrace")
    traceback.print_exc()
    