# 🕵️ Forensische Analyse von Weblogs mit DuckLake und DuckDB## Einführung: DuckLake als Unveränderliches Archiv (Immutable Ledger)Dieses Notebook demonstriert die Kernfunktionen der **DuckLake-Erweiterung** für DuckDB, indem es eine forensische Untersuchung von Weblog-Daten durchführt. DuckLake speichert jeden Zustand als **versionierten Snapshot**, was die Rekonstruktion des Originalzustands (Time Travel) ermöglicht.Wir simulieren eine **Log-Manipulation** (Löschen von Fehlereinträgen) und nutzen DuckLake, um:1.  **Time Travel** zu nutzen, um den Originalzustand wiederherzustellen.2.  **Statistische Analysen** (Statuscodes, Top-IPs, URLs) auf dem Original-Log durchzuführen.3.  Die Manipulation durch die **Änderungsverfolgung** (`ducklake_table_deletions`) zu beweisen.

## 1. Vorbereitung und Daten-Setup

In [None]:
import duckdb
import pandas as pd
import matplotlib.pyplot as plt
import os
import time
import glob
from datetime import datetime

# --- Daten-Setup: Laden der Weblogs vom unzippten Pfad ---
LOG_DIR = './data/web-log-dataset/'

try:
    # Suche nach der CSV-Datei im unzippten Verzeichnis
    log_files = glob.glob(LOG_DIR + '*.csv')
    if not log_files:
        raise FileNotFoundError(f"Keine CSV-Datei in '{LOG_DIR}' gefunden.")
    
    # Lade die gefundene Datei
    df = pd.read_csv(log_files[0])
    print(f"Original-Daten geladen von: {log_files[0]} ({len(df)} Zeilen).")
    
    # Spaltenumbenennung gemäß der Struktur: Ip, Time, URL, Status
    df.rename(columns={'Ip': 'ip', 'Time': 'timestamp', 'URL': 'url', 'Status': 'status_code'}, errors='raise', inplace=True)
    
    # Datenbereinigung und Typumwandlung
    df['status_code'] = df['status_code'].astype(int)
    df['timestamp'] = pd.to_datetime(df['timestamp'])
    # Nur die relevanten Spalten behalten, um Fehlern vorzubeugen
    df = df[['timestamp', 'ip', 'url', 'status_code']]
    
except Exception as e:
    print(f"Fehler beim Laden der Daten: {e}")
    print("Bitte stellen Sie sicher, dass `!unzip web-log-dataset.zip -d ./data` ausgeführt wurde und die Spalten Ip, Time, URL, Status in der CSV vorhanden sind.")
    raise
    
print(f"Verwendete Spalten: {list(df.columns)}")

## 2. DuckLake-Initialisierung und Snapshot 1 (Basis-Beweis)

In [None]:
# --- Forensische Pfade definieren ---
DUCKLAKE_METADATA_PATH = 'forensic_evidence.ducklake'
DUCKLAKE_DATA_PATH = 'forensic_data_files'
CATALOG_NAME = 'forensic_log_archive'
table_name = 'access_logs'

# Lokale Ordner erstellen 
os.makedirs(DUCKLAKE_DATA_PATH, exist_ok=True)

# --- DuckDB initialisieren und DuckLake Extension laden ---
con = duckdb.connect(database='forensic_duckdb.db')
con.sql("INSTALL ducklake;")
con.sql("LOAD ducklake;")

# --- DuckLake Katalog anhängen und verwenden ---
attach_query = f"""
    ATTACH 'ducklake:{DUCKLAKE_METADATA_PATH}' AS {CATALOG_NAME} (DATA_PATH '{DUCKLAKE_DATA_PATH}');
"""
con.sql(attach_query)
con.sql(f"USE {CATALOG_NAME};")

print(f"✅ Forensisches Log-Archiv '{CATALOG_NAME}' initialisiert.")

# --- Snapshot 1: Basis-Log (Original-Daten) ---
# 1. Leere Tabelle mit Schema aus dem Pandas DF erstellen
con.sql(f"CREATE OR REPLACE TABLE {table_name} AS SELECT * FROM df LIMIT 0;")

# 2. Daten aus dem Pandas DataFrame einfügen (erzeugt Snapshot 1)
con.sql(f"INSERT INTO {table_name} SELECT * FROM df;")

# 3. Snapshot-ID und Zeitstempel erfassen (WICHTIG für Time Travel)
snapshot_1_id = con.sql(f"SELECT max(snapshot_id) FROM ducklake_snapshots('{CATALOG_NAME}')").fetchone()[0]
snapshot_1_time = con.sql(f"SELECT snapshot_time FROM ducklake_snapshots('{CATALOG_NAME}') WHERE snapshot_id = {snapshot_1_id}").fetchone()[0]

print(f"\n✅ Snapshot 1 (Original-Log) erfasst. ID: {snapshot_1_id}, Time: {snapshot_1_time}")

## 3. Simulierte Log-Manipulation und Snapshot 2 (Manipuliertes Log)

In [None]:
# Kurze Pause für eindeutigen Snapshot-Zeitstempel
time.sleep(1)

# --- Simulierte Log-Manipulation (Beweisvertuschung) ---

# Löschen verdächtiger Einträge (Alle Fehler-Einträge >= 400 werden gelöscht)
deleted_count = con.execute(f"DELETE FROM {table_name} WHERE status_code >= 400;").fetchall()[0][0]
print(f"   -> {deleted_count} Fehler-Einträge (>= 400) wurden gelöscht (Simulierte Manipulation).")

# Hinzufügen eines unverdächtigen Eintrags (Ablenkung)
new_log_df = pd.DataFrame({
    'timestamp': [pd.to_datetime(datetime.now())], 
    'ip': ['203.0.113.5'], 
    'url': ['/normal/status'],
    'status_code': [200]
})
con.sql(f"INSERT INTO {table_name} SELECT * FROM new_log_df;")
print(f"   -> Ein neuer 200-Eintrag wurde hinzugefügt (Ablenkung).")

# Snapshot 2 erfassen
snapshot_2_id = con.sql(f"SELECT max(snapshot_id) FROM ducklake_snapshots('{CATALOG_NAME}')").fetchone()[0]
print(f"\n✅ Snapshot 2 (Manipuliertes Log) erfasst. ID: {snapshot_2_id}")

con.sql(f"SELECT snapshot_id, strftime(snapshot_time, '%Y-%m-%d %H:%M:%S') AS time FROM ducklake_snapshots('{CATALOG_NAME}') ORDER BY snapshot_id;").show()

## 4. Forensische Analyse: Time Travel & Änderungsverfolgung

### 4.1. Direkter Beweis: Gelöschte und hinzugefügte Einträge (`ducklake_table_deletions`)

In [None]:
print("\n[Forensischer Beweis: Gelöschte Einträge (Vertuschung) zwischen Snapshot 1 und 2]\n")
con.sql(f"""
    SELECT 
        ip, 
        url,
        status_code 
    FROM 
        ducklake_table_deletions('{CATALOG_NAME}', 'main', '{table_name}', {snapshot_1_id}, {snapshot_2_id}); -- **DuckLake Deletions**
""").show()

print("\n[Kontrolle: Neu hinzugefügte Einträge (Ablenkung)]\n")
con.sql(f"""
    SELECT 
        ip, 
        url,
        status_code 
    FROM 
        ducklake_table_insertions('{CATALOG_NAME}', 'main', '{table_name}', {snapshot_1_id}, {snapshot_2_id}); -- **DuckLake Insertions**
""").show()

print("Die `deletions`-Funktion rekonstruiert exakt die von der Manipulation entfernten Beweismittel (alle Statuscodes >= 400).")

### 4.2. Time Travel: Rekonstruktion der Top-IPs vor der Manipulation

In [None]:
# --- 1. Analyse des aktuellen (manipulierten) Zustands (Snapshot 2) ---
top_ips_current = con.execute(f"""
    SELECT 
        ip, 
        COUNT(*) AS cnt
    FROM 
        {table_name}
    GROUP BY 
        ip
    ORDER BY 
        cnt DESC
    LIMIT 10
""").fetchdf()

# --- 2. Analyse des historischen (Original-) Zustands (Snapshot 1) mit Time Travel ---
top_ips_historic = con.execute(f"""
    SELECT 
        ip, 
        COUNT(*) AS cnt
    FROM 
        {table_name} FOR TIMESTAMP AS OF '{snapshot_1_time}' -- **DuckLake Time Travel**
    GROUP BY 
        ip
    ORDER BY 
        cnt DESC
    LIMIT 10
""").fetchdf()

### Visualisierung des Beweises
fig, axes = plt.subplots(1, 2, figsize=(16, 6))

# Plot 1: Aktueller, manipulierter Zustand
top_ips_current.plot.bar(x="ip", y="cnt", legend=False, ax=axes[0], color='skyblue')
axes[0].set_title("Top IPs (Aktueller Zustand - Snapshot 2)")
axes[0].set_ylabel("Anzahl der Anfragen")
axes[0].set_xlabel("IP-Adresse")
axes[0].tick_params(axis='x', rotation=45)

# Plot 2: Historischer, rekonstruierter Zustand (Beweis)
top_ips_historic.plot.bar(x="ip", y="cnt", legend=False, ax=axes[1], color='darkred')
axes[1].set_title("Top IPs (Zustand vor Manipulation - Snapshot 1)")
axes[1].set_ylabel("Anzahl der Anfragen")
axes[1].set_xlabel("IP-Adresse")
axes[1].tick_params(axis='x', rotation=45)

plt.tight_layout()
plt.show()

## 5. Vertiefende Forensische Analysen mit Time Travel

### 5.1. Anomalie-Erkennung: Statuscode-Verteilung im Vergleich
Der Vergleich der Statuscodes belegt den Manipulationsversuch quantitativ: Die Fehlercodes sind im Original-Log deutlich vorhanden, im aktuellen Log aber fast vollständig verschwunden.

In [None]:
def analyze_status_codes(query_suffix, title_suffix):
    """Führt die Statuscode-Analyse auf dem aktuellen oder historischen Zustand durch."""
    status_analysis = con.execute(f"""
        SELECT 
            CAST(status_code AS VARCHAR) AS status_code, 
            COUNT(*) AS cnt
        FROM 
            {table_name} {query_suffix}
        GROUP BY 
            status_code
        ORDER BY 
            cnt DESC
    """).fetchdf()

    status_analysis.plot.bar(x="status_code", y="cnt", legend=False, 
                             title=f"HTTP-Statuscode Verteilung ({title_suffix})", 
                             figsize=(8, 4), 
                             color=['darkred' if int(s) >= 400 else 'green' for s in status_analysis['status_code']])
    plt.ylabel("Anzahl der Anfragen")
    plt.xlabel("Statuscode")
    plt.xticks(rotation=0)
    plt.show()
    print(f"Statuscode-Analyse ({title_suffix}):")
    print(status_analysis)

# 1. Analyse des manipulierten Zustands (aktuell)
analyze_status_codes("", "Aktueller Zustand (Manipuliert)")

# 2. Analyse des Original-Zustands (Time Travel)
analyze_status_codes(f"FOR TIMESTAMP AS OF '{snapshot_1_time}'", "Historischer Zustand (Original - Beweis)")

### 5.2. Analyse der Häufig aufgerufenen URLs (Entfernte Ziele)
Die Überprüfung der Top-URLs im historischen Log kann fehlende kritische Ziele, die vor der Manipulation aufgerufen wurden, aufdecken.

In [None]:
def analyze_top_urls(query_suffix, title_suffix):
    top_urls = con.execute(f"""
        SELECT 
            url, 
            COUNT(*) AS cnt
        FROM 
            {table_name} {query_suffix}
        GROUP BY 
            url
        ORDER BY 
            cnt DESC
        LIMIT 10
    """).fetchdf()

    top_urls.plot.barh(x="url", y="cnt", legend=False, 
                      title=f"Top 10 Aufgerufene URLs ({title_suffix})", 
                      figsize=(10, 5), color='darkgreen')
    plt.xlabel("Anzahl der Anfragen")
    plt.ylabel("URL")
    plt.gca().invert_yaxis() 
    plt.show()
    print(f"Top-URL-Analyse ({title_suffix}):")
    print(top_urls)

# 1. Analyse des manipulierten Zustands (aktuell)
analyze_top_urls("", "Aktueller Zustand")

# 2. Analyse des Original-Zustands (Time Travel)
analyze_top_urls(f"FOR TIMESTAMP AS OF '{snapshot_1_time}'", "Historischer Zustand (Original)")

### 5.3. Zeitliche Analyse des Gesamttraffics (Requests pro Tag)
Wir nutzen Time Travel, um den ursprünglichen Traffic-Verlauf zu rekonstruieren und festzustellen, ob die Manipulation eine zeitliche Anomalie in den Gesamt-Requests verursacht hat.

In [None]:
# Analyse des Traffic-Verlaufs im unmanipulierten Log (Snapshot 1)
requests_per_day_historic = con.execute(f"""
    SELECT 
        CAST(timestamp AS DATE) AS day, 
        COUNT(*) AS cnt
    FROM 
        {table_name} FOR TIMESTAMP AS OF '{snapshot_1_time}' -- Time Travel
    GROUP BY 
        day
    ORDER BY 
        day
""").fetchdf()

print("Requests pro Tag (Historisch - Vor der Manipulation):\n")
print(requests_per_day_historic)

requests_per_day_historic.plot(x="day", y="cnt", kind="line", title="Requests pro Tag (Historisch)", figsize=(10, 5))
plt.ylabel("Anzahl der Requests")
plt.xlabel("Datum")
plt.show()

## 6. Abschluss und Beweissicherung
Die DuckLake-Funktionen haben es ermöglicht, **unveränderliche Beweise** (gelöschte Fehler-Logs, Statuscode-Anomalie) aus dem historischen Log zu extrahieren. Das Archiv ist nun gesichert.

In [None]:
# Schließe die DuckDB-Verbindung und speichere alle Metadaten final ab
con.close()
print("✅ Forensische Analyse abgeschlossen. Das Archiv ist gesichert.")