In [None]:
import pyodbc
from faker import Faker
import datetime
import time
import pandas as pd
from tqdm.notebook import tqdm
from IPython.display import display, FileLink


In [2]:
# Paramètres de connexion
SERVER   = 'localhost,1433'
USER     = 'sa'
PASSWORD = 'youness-Plebicom'
DRIVER   = '{ODBC Driver 17 for SQL Server}'

In [3]:
# Connexion à master pour créer la base TestDB
master_conn = pyodbc.connect(
    f"Driver={DRIVER};"
    f"Server={SERVER};"
    "Database=master;"
    f"UID={USER};PWD={PASSWORD};",
    autocommit=True
)
master_cur = master_conn.cursor()
master_cur.execute("""
IF DB_ID('TestDB') IS NULL
    CREATE DATABASE TestDB;
""")
master_cur.close()
master_conn.close()

In [None]:
# Connexion à TestDB pour creer la table USER via le fichier schema.sql
schema_conn = pyodbc.connect(
    f"Driver={DRIVER};"
    f"Server={SERVER};"
    "Database=TestDB;"
    f"UID={USER};PWD={PASSWORD};",
    autocommit=True    
)
schema_cur = schema_conn.cursor()

with open('schema.sql', 'r', encoding='utf-8') as f:
    content = f.read()

for batch in content.split('GO'):
    sql = batch.strip()
    if sql:
        schema_cur.execute(sql)

schema_cur.close()
schema_conn.close()

Pour Faire le test des perfromances je vais générer d'une manière fictif des données de tests ! 

In [7]:
# Instanciation Faker
fake = Faker('fr_FR')

# Volumétrie souhaitée
N_USER        = 10_000     # nombre d'utilisateurs
HISTORY_PER   = 5           # historiques par utilisateur
N_MERCHANT    = 1_000       # nombre de marchands
PURCHASE_PER  = 20          # achats par utilisateur
BATCH_SIZE    = 1_000       # taille de lot pour executemany


In [8]:
# génération des utilisateurs sans doublons
conn = get_conn('TestDB')
cur  = conn.cursor()

# Réinitialise l’espace unique pour faker
fake.unique.clear()

# Vider la table dbo.[USER]
cur.execute("TRUNCATE TABLE dbo.[USER];")
conn.commit()

batch = []
for i in range(N_USER):
    try:
        # Génère un username unique
        username    = fake.unique.user_name()[:50]
    except Exception:
        # Si l'espace unique est épuisé : fallback avec suffixe index
        username = f"user_{i}"
    insert_date = fake.date_time_between(start_date='-2y', end_date='now')
    state       = fake.random_element([1, 2, 3])
    comment     = fake.sentence(nb_words=10)[:500]
    batch.append((username, insert_date, state, comment))

    if len(batch) >= BATCH_SIZE:
        cur.executemany(
            "INSERT INTO dbo.[USER] (USERNAME, INSERT_DATE, STATE, COMMENT) VALUES (?, ?, ?, ?);",
            batch
        )
        conn.commit()
        batch.clear()

# Insérer le dernier lot restant
if batch:
    cur.executemany(
        "INSERT INTO dbo.[USER] (USERNAME, INSERT_DATE, STATE, COMMENT) VALUES (?, ?, ?, ?);",
        batch
    )
    conn.commit()
    batch.clear()


In [None]:
# Affichage des données USERS generées
conn = get_conn('TestDB')
df_users = pd.read_sql("SELECT * FROM dbo.[USER];", conn)
display(df_users)  
conn.close()


  df_users = pd.read_sql("SELECT * FROM dbo.[USER];", conn)


Unnamed: 0,user_id,username,insert_date,state,comment
0,1,valentineroy,2024-05-03 03:04:43,1,Complètement repas atteindre escalier répondre...
1,2,henrilemonnier,2023-12-26 20:40:22,1,Herbe aucun début son de remarquer sable reche...
2,3,anais91,2025-06-03 22:05:21,1,Lendemain préférer marché fleur imposer honneu...
3,4,gilbert56,2023-09-13 20:42:10,2,Commander centre vieil absolument donner impor...
4,5,normandrene,2025-04-21 04:56:49,2,Fin ceci façon solitude falloir race tôt voile.
...,...,...,...,...,...
9995,9996,martheroussel,2024-09-19 13:37:04,2,Enfin dominer dieu parler aujourd'hui pierre s...
9996,9997,thibaultelodie,2024-01-28 10:14:15,1,Colon ruine soit poussière signe hésiter agite...
9997,9998,lucie26,2025-06-27 09:48:42,2,Votre tapis prétendre voix palais pourquoi ser...
9998,9999,elodieguillaume,2024-10-25 14:48:45,1,Tour claire bout visite promettre rendre.


In [45]:
def get_conn(database: str):
    return pyodbc.connect(
        f"Driver={DRIVER};"
        f"Server={SERVER};"
        f"Database={database};"
        f"UID={USER};PWD={PASSWORD};",
        autocommit=True
    )

def summarize(res):
    print(f"--- {res['method']} / {res['operation']} ---")
    for key in ('duration_ms','cpu_ms','mem_kb'):
        vals = res[key]
        p50 = statistics.median(vals)
        p95 = statistics.quantiles(vals, n=100)[94]
        mean= statistics.mean(vals)
        std = statistics.stdev(vals)
        print(f"{key:12} p50={p50:.1f}  p95={p95:.1f}  mean={mean:.1f}  std={std:.1f}")
    print()

Maintenant j'implémente les différente méthodes d'historisation et je calcul leurs performance

In [None]:
# création du trigger d’historisation
trigger_ddl = """
-- 1) Création de la table USER_HISTORY si nécessaire
IF OBJECT_ID('dbo.USER_HISTORY','U') IS NULL
BEGIN
  CREATE TABLE dbo.USER_HISTORY (
    USER_HISTORY_ID BIGINT       IDENTITY PRIMARY KEY,
    USER_ID         INT          NOT NULL,
    CHANGE_DATE     DATETIME2    NOT NULL DEFAULT SYSUTCDATETIME(),
    OPERATION       CHAR(1)      NOT NULL,  -- 'I' ou 'U'
    OLD_STATE       INT          NULL,
    NEW_STATE       INT          NULL,
    OLD_COMMENT     NVARCHAR(500) NULL,
    NEW_COMMENT     NVARCHAR(500) NULL
  );
END
GO

IF OBJECT_ID('dbo.trg_USER_Audit','TR') IS NOT NULL
  DROP TRIGGER dbo.trg_USER_Audit;
GO

CREATE TRIGGER dbo.trg_USER_Audit
ON dbo.[USER]
AFTER INSERT, UPDATE
AS
BEGIN
  SET NOCOUNT ON;

  INSERT INTO dbo.USER_HISTORY
    (USER_ID, OPERATION, OLD_STATE, NEW_STATE, OLD_COMMENT, NEW_COMMENT)
  SELECT
    ISNULL(d.USER_ID, i.USER_ID)                          AS USER_ID,
    CASE WHEN d.USER_ID IS NULL THEN 'I' ELSE 'U' END, 
    d.STATE, 
    i.STATE, 
    d.COMMENT, 
    i.COMMENT
  FROM inserted AS i
  FULL OUTER JOIN deleted AS d
    ON i.USER_ID = d.USER_ID
  WHERE 
    d.USER_ID IS NULL             
    OR
    (d.STATE   <> i.STATE OR d.COMMENT <> i.COMMENT);
END;
GO
"""

# Exécution du DDL
exec_sql(trigger_ddl)
print("Trigger d’historisation créé avec succès.")  


Trigger d’historisation créé avec succès.


In [None]:
# Temporal Tables 
temporal_ddl = """
-- 1) Supprimer l'ancienne history si elle existe
IF OBJECT_ID('dbo.USER_HISTORY_Temporal','U') IS NOT NULL
  DROP TABLE dbo.USER_HISTORY_Temporal;
GO

IF COL_LENGTH('dbo.[USER]', 'ValidFrom') IS NULL
BEGIN
  ALTER TABLE dbo.[USER]
    ADD
      ValidFrom datetime2(7) GENERATED ALWAYS AS ROW START NOT NULL
        DEFAULT SYSUTCDATETIME(),
      ValidTo   datetime2(7) GENERATED ALWAYS AS ROW END   NOT NULL
        DEFAULT CONVERT(datetime2(7),'9999-12-31 23:59:59.9999999'),
      PERIOD FOR SYSTEM_TIME (ValidFrom, ValidTo);
END
GO


CREATE TABLE dbo.USER_HISTORY_Temporal (
    user_id      INT             NOT NULL,       -- identique à dbo.[USER].user_id
    USERNAME     NVARCHAR(50)    NOT NULL,       -- identique à dbo.[USER].USERNAME
    INSERT_DATE  DATETIME2(7)    NOT NULL,       -- identique à dbo.[USER].INSERT_DATE
    STATE        TINYINT         NOT NULL,       -- identique à dbo.[USER].STATE
    COMMENT      NVARCHAR(500)   NULL,           -- identique à dbo.[USER].COMMENT
    ValidFrom    DATETIME2(7)    NOT NULL,       -- ajouté pour temporal
    ValidTo      DATETIME2(7)    NOT NULL        -- ajouté pour temporal
);
GO

ALTER TABLE dbo.[USER]
  SET (SYSTEM_VERSIONING = ON (HISTORY_TABLE = dbo.USER_HISTORY_Temporal));
GO
"""
exec_sql(temporal_ddl)
print("Temporal Tables activées avec USER_HISTORY_Temporal correctement alignée.")

Temporal Tables activées avec USER_HISTORY_Temporal correctement alignée.


In [None]:
# Change Data Capture (CDC)
cdc_ddl = """
IF NOT EXISTS (
    SELECT 1
    FROM sys.databases
    WHERE name = 'TestDB'
      AND is_cdc_enabled = 1
)
BEGIN
    EXEC sys.sp_cdc_enable_db;
END
GO

IF NOT EXISTS (
    SELECT 1
    FROM cdc.change_tables
    WHERE source_object_id = OBJECT_ID('dbo.[USER]')
)
BEGIN
    EXEC sys.sp_cdc_enable_table
        @source_schema           = N'dbo',
        @source_name             = N'USER',
        @role_name               = NULL,
        @supports_net_changes    = 0;
END
GO

SELECT * 
FROM cdc.change_tables
WHERE source_object_id = OBJECT_ID('dbo.[USER]');
"""
exec_sql(cdc_ddl)
print("CDC activé sur dbo.[USER] et table cdc.dbo_USER_CT prête.")  


CDC activé sur dbo.[USER] et table cdc.dbo_USER_CT prête.


In [None]:
# Change Tracking
ct_ddl = """
IF NOT EXISTS (
    SELECT 1
    FROM sys.change_tracking_databases
    WHERE database_id = DB_ID('TestDB')
)
BEGIN
    ALTER DATABASE TestDB
      SET CHANGE_TRACKING = ON
      (CHANGE_RETENTION = 2 DAYS, AUTO_CLEANUP = ON);
END
GO

IF NOT EXISTS (
    SELECT 1
    FROM sys.change_tracking_tables
    WHERE object_id = OBJECT_ID('dbo.[USER]')
)
BEGIN
    ALTER TABLE dbo.[USER]
      ENABLE CHANGE_TRACKING
      WITH (TRACK_COLUMNS_UPDATED = ON);
END
GO

SELECT 
    * 
FROM sys.change_tracking_tables
WHERE object_id = OBJECT_ID('dbo.[USER]');
"""
exec_sql(ct_ddl)
print("Change Tracking activé sur dbo.[USER].")


Change Tracking activé sur dbo.[USER].


In [None]:
# Requêtes de test mises à jour
sql_tests = {
    'Triggers': {
        'insert': """
            INSERT INTO dbo.[USER] (USERNAME, STATE, COMMENT)
            VALUES ('bench_trig_' + LEFT(CONVERT(varchar(36), NEWID()), 8), 1, 't');
        """,
        'update': """
            WITH ToUpdate AS (
              SELECT TOP(1) *
              FROM dbo.[USER]
              WHERE STATE = 1
              ORDER BY USER_ID DESC
            )
            UPDATE ToUpdate
            SET STATE = 2,
                COMMENT = 'tupd';
        """
    },
    'Temporal': {
        'insert': """
            INSERT INTO dbo.[USER] (USERNAME, STATE, COMMENT)
            VALUES ('bench_temp_' + LEFT(CONVERT(varchar(36), NEWID()), 8), 1, 't');
        """,
        'update': """
            WITH ToUpdate AS (
              SELECT TOP(1) *
              FROM dbo.[USER]
              WHERE STATE = 1
              ORDER BY USER_ID DESC
            )
            UPDATE ToUpdate
            SET STATE = 2,
                COMMENT = 'tupd';
        """
    },
    'CDC': {
        'insert': """
            INSERT INTO dbo.[USER] (USERNAME, STATE, COMMENT)
            VALUES ('bench_cdc_' + LEFT(CONVERT(varchar(36), NEWID()), 8), 1, 'c');
        """,
        'update': """
            WITH ToUpdate AS (
              SELECT TOP(1) *
              FROM dbo.[USER]
              WHERE STATE = 1
              ORDER BY USER_ID DESC
            )
            UPDATE ToUpdate
            SET STATE = 2,
                COMMENT = 'cupd';
        """
    },
    'ChangeTracking': {
        'insert': """
            INSERT INTO dbo.[USER] (USERNAME, STATE, COMMENT)
            VALUES ('bench_ct_' + LEFT(CONVERT(varchar(36), NEWID()), 8), 1, 'ct');
        """,
        'update': """
            WITH ToUpdate AS (
              SELECT TOP(1) *
              FROM dbo.[USER]
              WHERE STATE = 1
              ORDER BY USER_ID DESC
            )
            UPDATE ToUpdate
            SET STATE = 2,
                COMMENT = 'ctupd';
        """
    }
}

all_results = []
for method, ops in tqdm(sql_tests.items(), desc="Méthodes", unit="méthode"):
    for op_name, sql in tqdm(ops.items(), desc=f"{method} ops", leave=False, unit="op"):
        print(f"\nBenchmark: {method} - {op_name}")
        res = run_benchmark(sql, method, op_name, iterations=10)
        all_results.append(res)

print(f"\nTotal benchmarks réalisés : {len(all_results)}")
for r in all_results:
    print(f"{r['method']} - {r['operation']}: {len(r['duration_ms'])} exécutions")


Méthodes:   0%|          | 0/4 [00:00<?, ?méthode/s]

Triggers ops:   0%|          | 0/2 [00:00<?, ?op/s]


Benchmark: Triggers - insert

Benchmark: Triggers - update


Temporal ops:   0%|          | 0/2 [00:00<?, ?op/s]


Benchmark: Temporal - insert

Benchmark: Temporal - update


CDC ops:   0%|          | 0/2 [00:00<?, ?op/s]


Benchmark: CDC - insert

Benchmark: CDC - update


ChangeTracking ops:   0%|          | 0/2 [00:00<?, ?op/s]


Benchmark: ChangeTracking - insert

Benchmark: ChangeTracking - update

Total benchmarks réalisés : 8
Triggers - insert: 10 exécutions
Triggers - update: 10 exécutions
Temporal - insert: 10 exécutions
Temporal - update: 10 exécutions
CDC - insert: 10 exécutions
CDC - update: 10 exécutions
ChangeTracking - insert: 10 exécutions
ChangeTracking - update: 10 exécutions


In [None]:
try:
    all_results
except NameError:
    raise RuntimeError("exécuter le benchmark.")

# Construire un DataFrame à partir de all_results
df = pd.DataFrame([
    {
        'method': r['method'],
        'operation': r['operation'],
        'p50_duration_ms':         pd.Series(r['duration_ms']).quantile(0.50),
        'p95_duration_ms':         pd.Series(r['duration_ms']).quantile(0.95),
        'avg_duration_ms':         pd.Series(r['duration_ms']).mean(),
        'p50_cpu_ms':              pd.Series(r['cpu_ms']).quantile(0.50),
        'p95_cpu_ms':              pd.Series(r['cpu_ms']).quantile(0.95),
        'avg_cpu_ms':              pd.Series(r['cpu_ms']).mean(),
        'p50_mem_kb':              pd.Series(r['mem_kb']).quantile(0.50),
        'p95_mem_kb':              pd.Series(r['mem_kb']).quantile(0.95),
        'avg_mem_kb':              pd.Series(r['mem_kb']).mean(),
    }
    for r in all_results
])

# Enregistrer en CSV
output_path = 'benchmark_results.csv'
df.to_csv(output_path, index=False)
display(df)
FileLink(output_path)


Unnamed: 0,method,operation,p50_duration_ms,p95_duration_ms,avg_duration_ms,p50_cpu_ms,p95_cpu_ms,avg_cpu_ms,p50_mem_kb,p95_mem_kb,avg_mem_kb
0,Triggers,insert,10.1,10.1,10.1,0.0,0.0,0.0,0.0,0.0,0.0
1,Triggers,update,28.9,28.9,28.9,0.0,0.0,0.0,0.0,0.0,0.0
2,Temporal,insert,12.2,12.2,12.2,0.0,0.0,0.0,0.0,0.0,0.0
3,Temporal,update,13.0,13.0,13.0,0.0,0.0,0.0,0.4,0.4,0.4
4,CDC,insert,12.9,12.9,12.9,0.0,0.0,0.0,0.0,0.0,0.0
5,CDC,update,23.8,23.8,23.8,0.0,0.0,0.0,0.0,0.0,0.0
6,ChangeTracking,insert,13.6,13.6,13.6,0.0,0.0,0.0,0.0,0.0,0.0
7,ChangeTracking,update,27.1,27.1,27.1,1.6,1.6,1.6,0.4,0.4,0.4
