In [None]:
from pyspark.sql.functions import count
from datetime import datetime

date_str = datetime.now().strftime("%Y%m%d")

# Configuration
storage_account = "datalakemedallion"
dbutils.widgets.text("AZURE_STORAGE_KEY", "")
storage_key = dbutils.widgets.get("AZURE_STORAGE_KEY")
container_source_name = "silver"
container_destinataire_name = "gold"
folder_source = "control_qualite"
folder_dv = "data_valid"
folder_dinv = "invalid_data"

# Configuration Spark et destination
spark.conf.set(f"fs.azure.account.key.{storage_account}.blob.core.windows.net", storage_key)
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
spark.conf.set("parquet.enable.summary-metadata", "false")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "")

# Lecture des données
## Lecture des données de comptes et clients
df_clients = spark.read.parquet(f"wasbs://{container_source_name}@{storage_account}.blob.core.windows.net/transformations/clients_trsf/")
df_comptes = spark.read.parquet(f"wasbs://{container_source_name}@{storage_account}.blob.core.windows.net/transformations/comptes_trsf/")

## Lecture des données de transactions
df_transactions_valid = spark.read.parquet(f"wasbs://{container_source_name}@{storage_account}.blob.core.windows.net/control_qualite/valid_data/")
df_transactions_invalid= spark.read.parquet(f"wasbs://{container_source_name}@{storage_account}.blob.core.windows.net/control_qualite/invalid_data/")

## Lire les données de tx brutes
df_raw_tx = spark.read.parquet(f"wasbs://bronze@{storage_account}.blob.core.windows.net/transactions/")

# Calcul des données
total_raw_tx = df_raw_tx.count()
valid = df_transactions_valid.count()
invalid = df_transactions_invalid.count()
total = valid + invalid
nb_doublons = total_raw_tx - total
taux_validite = round((invalid / total) * 100, 2)
anomaly_counts = (df_transactions_invalid.groupBy("anomaly_type").agg(count("*").alias("count")).collect())
anomaly_dict = {row["anomaly_type"]: row["count"] for row in anomaly_counts}


# construction du summary DF
data_summary = [
    (
        total_raw_tx,
        total,
        valid,
        invalid,
        taux_validite,
        nb_doublons,
        anomaly_dict.get("Date future invalide", 0),
        anomaly_dict.get("Montant négatif ou nul", 0),
        anomaly_dict.get("Devise invalide", 0),
        anomaly_dict.get("Statut inconnu", 0),
        anomaly_dict.get("Compte inconnu", 0),
        anomaly_dict.get("Transaction ID inconnu", 0),
        anomaly_dict.get("Type de transaction inconnu", 0),
        anomaly_dict.get("Canal inconnu", 0),
        anomaly_dict.get("Catégorie commerçant inconnue", 0),
        anomaly_dict.get("Autre anomalie", 0)
    )
]

columns = [
    "total_transactions_avant_transformations",
    "total_transactions_apres_transformations",
    "valid_transactions",
    "invalid_transactions",
    "taux_validite",
    "nb_doublons",
    "nb_date_future",
    "nb_montant_negatif",
    "nb_devise_invalide",
    "nb_statut_inconnu",
    "nb_account_inconnu",
    "nb_transaction_id_inconnu",
    "nb_transaction_type_inconnu",
    "nb_channel_inconnu",
    "nb_merchant_category_inconnue",
    "nb_autre_anomalie"
]

df_summary = spark.createDataFrame(data_summary, columns)


spark.conf.set(f"fs.azure.account.key.{storage_account}.dfs.core.windows.net", storage_key)
spark.conf.set("mapreduce.fileoutputcommitter.marksuccessfuljobs", "false")
spark.conf.set("parquet.enable.summary-metadata", "false")
spark.conf.set("fs.azure.account.oauth2.client.endpoint", "")
spark.conf.set("mapreduce.fileoutputcommitter.algorithm.version", "1") 

# chargement des données
df_summary.coalesce(1).write.option("compression","uncompressed").mode("overwrite").parquet(f"abfss://{container_destinataire_name}@{storage_account}.dfs.core.windows.net/gold-summary/")
df_clients.coalesce(1).write.option("compression","uncompressed").mode("overwrite").parquet(f"abfss://{container_destinataire_name}@{storage_account}.dfs.core.windows.net/gold-clients/")
df_comptes.coalesce(1).write.option("compression","uncompressed").mode("overwrite").parquet(f"abfss://{container_destinataire_name}@{storage_account}.dfs.core.windows.net/gold-comptes/")
df_transactions_valid.coalesce(1).write.option("compression","uncompressed").mode("overwrite").parquet(f"abfss://{container_destinataire_name}@{storage_account}.dfs.core.windows.net/gold-transactions/valid-tx/")
df_transactions_invalid.coalesce(1).write.option("compression","uncompressed").mode("overwrite").parquet(f"abfss://{container_destinataire_name}@{storage_account}.dfs.core.windows.net/gold-transactions/invalid-tx/")

print("---- processus terminé ----")