In [0]:
# 1. Initialisation : Création d'une table Delta
# Créons une table Delta avec un schéma initial et insérons quelques données.

from pyspark.sql import SparkSession

# Chemin de la table Delta
delta_table_path = "/tmp/delta_table_schema"

# Création initiale de la table Delta
data = [
    (1, "Produit A", 100),
    (2, "Produit B", 200)
]
columns = ["id", "product", "price"]

df = spark.createDataFrame(data, columns)

# Écriture initiale des données
df.write.format("delta").mode("overwrite").save(delta_table_path)

print("Table Delta initiale :")
spark.read.format("delta").load(delta_table_path).show()


In [0]:
# 2. Ajout d’une colonne avec MERGE
'''
Ajoutons une nouvelle colonne (category) via un MERGE avec un jeu de données source. Cela nécessite l’activation de l’option mergeSchema.
'''

# Nouveaux données avec une colonne supplémentaire "category"
new_data = [
    (1, "Produit A", 100, "Catégorie 1"),
    (3, "Produit C", 300, "Catégorie 2")
]
new_columns = ["id", "product", "price", "category"]

new_df = spark.createDataFrame(new_data, new_columns)

# Activer l'évolution automatique du schéma lors d'un MERGE
from delta.tables import DeltaTable

delta_table = DeltaTable.forPath(spark, delta_table_path)

delta_table.alias("target").merge(
    new_df.alias("source"),
    "target.id = source.id"
).whenMatchedUpdateAll(
).whenNotMatchedInsertAll(
).execute()

print("Table Delta après ajout de la colonne 'category' :")
spark.read.format("delta").load(delta_table_path).show()


In [0]:
# 3. Suppression d’une colonne avec ALTER TABLE
# Supprimons une colonne (category) de la table Delta.

# Suppression de la colonne "category"
spark.sql(f"""
ALTER TABLE delta.`{delta_table_path}`
DROP COLUMN category
""")

print("Table Delta après suppression de la colonne 'category' :")
spark.read.format("delta").load(delta_table_path).show()


In [0]:
# 4. Détection et gestion des incompatibilités de schéma
# Essayons d'insérer des données avec un schéma incompatible sans activer l’option mergeSchema.

# Données incompatibles (colonne supplémentaire "discount")
incompatible_data = [
    (4, "Produit D", 400, 10)
]
incompatible_columns = ["id", "product", "price", "discount"]

incompatible_df = spark.createDataFrame(incompatible_data, incompatible_columns)

# Tentative d'écriture sans autoriser l'évolution du schéma
try:
    incompatible_df.write.format("delta").mode("append").save(delta_table_path)
except Exception as e:
    print("Erreur détectée en raison d'un schéma incompatible :")
    print(e)


In [0]:
# 5. Résolution : Forcer l’évolution du schéma
# Pour résoudre l’incompatibilité, utilisons l’option mergeSchema.

# Réécriture avec mergeSchema pour autoriser l'évolution
incompatible_df.write.format("delta").option("mergeSchema", "true").mode("append").save(delta_table_path)

print("Table Delta après résolution et ajout de la colonne 'discount' :")
spark.read.format("delta").load(delta_table_path).show()
