
# Great Expectations (GX) Setup Notebook

Dieses Notebook dient zur Konfiguration und Definition der Datenvalidierung mit Hilfe der Bibliothek Great Expectations. Die Ergebnisse des Notebooks (Validation_Definition, Checkpoint) werden für die Simulation zur automatisierten Validierung weiterverwendet

## Voraussetzungen
- Abhängigkeiten aus requirements.txt installiert
- Datenbanksystem installiert (siehe README)

## Inhalt
1. **Setup und Datenbankverbindung**  
2. **Definition der Datenvalidierung**  
3. **Definition des Checkpoints**  
4. **Test-Validierung** 

### 1. Setup und Datenbankverbindung

#### Imports

In [None]:
from datetime import datetime, timedelta

# Great Expectations Funktionen importieren
import great_expectations as gx
from great_expectations.checkpoint import UpdateDataDocsAction

# Konstanten für die Datenbankverbindung importieren
from constants import (
    DB_HOST, DB_PORT, DB_USER, DB_PASSWORD, DB_NAME,
    DATA_SOURCE, DATA_ASSET, DB_TABLE, BATCH_DEFINITION,
    EXPECTATION_SUITE, VALIDATION_DEFINITION, CHECKPOINT_NAME
)

#### Kontext laden

In [None]:
# GX-Verzeichnis mit allen relevanten Dateien erstellen
context = gx.get_context(mode="file", project_root_dir="../")

#### Connection-String für Datenbankverbindung generieren

In [None]:
db_connection_string = f"postgresql+psycopg2://{DB_USER}:{DB_PASSWORD}@{DB_HOST}:{DB_PORT}/{DB_NAME}"

#### Datenquelle hinzufügen

In [None]:
data_source = context.data_sources.add_or_update_postgres(
    name = DATA_SOURCE, 
    connection_string = db_connection_string
)

#### Daten-Asset (Tabelle) hinzufügen

In [None]:
if not data_source.assets:
    # Asset existiert noch nicht – daher wird es hinzugefügt
    data_asset = data_source.add_table_asset(table_name=DB_TABLE, name=DATA_ASSET)    
else:
    # Asset existiert bereits
    data_asset = data_source.assets[DATA_ASSET]

#### Batch-Definition hinzufügen

In [None]:
if not data_asset.batch_definitions:
   # Batch-Definition existiert noch nicht – daher wird es hinzugefügt
   batch_definition = data_asset.add_batch_definition_daily(name=BATCH_DEFINITION, column="timestamp")
else:
   # Batch existiert bereits
   batch_definition = data_asset.get_batch_definition(name=BATCH_DEFINITION)

#### Verbindung testen

In [None]:
# Batch laden und die ersten 10 Datensätze ausgeben
daily_batch = batch_definition.get_batch()
daily_batch.head()

### Definition der Datenvalidierung

#### Expectation Suite hinzufügen

In [None]:
gx_suite = context.suites.add_or_update(gx.ExpectationSuite(name=EXPECTATION_SUITE))

#### Expectations definieren

In [None]:
# Validierung von NULL-Werten
exp_not_null_timestamp = gx.expectations.ExpectColumnValuesToNotBeNull(
    column="timestamp", description="Stellt sicher, dass das Zeitstempel-Feld nicht NULL ist."
)
exp_not_null_sensorId = gx.expectations.ExpectColumnValuesToNotBeNull(
    column="sensor_id", description="Stellt sicher, dass das Sensor-ID-Feld nicht NULL ist."
)
exp_not_null_location = gx.expectations.ExpectColumnValuesToNotBeNull(
    column="location", description="Stellt sicher, dass das Standort-Feld nicht NULL ist."
)
exp_not_null_temperature = gx.expectations.ExpectColumnValuesToNotBeNull(
    column="temperature", description="Stellt sicher, dass das Temperatur-Feld nicht NULL ist."
)
exp_not_null_humidity = gx.expectations.ExpectColumnValuesToNotBeNull(
    column="humidity", description="Stellt sicher, dass das Luftfeuchtigkeits-Feld nicht NULL ist."
)
exp_not_null_windSpeed = gx.expectations.ExpectColumnValuesToNotBeNull(
    column="wind_speed", description="Stellt sicher, dass das Windgeschwindigkeits-Feld nicht NULL ist."
)

# Validierung von Eindeutigkeit (Prüfung auf doppelte Datensätze)
exp_unique_measurement = gx.expectations.ExpectCompoundColumnsToBeUnique(
    column_list=["timestamp", "sensor_id", "location"],
    description="Stellt sicher, dass keine Messung redundant vorliegt.",
    
)

# Berechnung des Zeitfensters für die Aktualitätsprüfung
min_timestamp = datetime.now() - timedelta(days=1)  # 24 Stunden zurück

# Validierung der Aktualität
exp_timestamp = gx.expectations.ExpectColumnValuesToBeBetween(
    column="timestamp",
    min_value=min_timestamp,
    description="Stellt sicher, dass der Zeitstempel innerhalb der letzten 24 Stunden liegt."
)

# Validierung von Wertebereichen
exp_temperature = gx.expectations.ExpectColumnValuesToBeBetween(
    column="temperature", min_value=-20, max_value=50,
    description="Stellt sicher, dass die Temperatur zwischen -20 und 50 Grad liegt."
)
exp_humidity = gx.expectations.ExpectColumnValuesToBeBetween(
    column="humidity", min_value=0, max_value=100,
    description="Stellt sicher, dass die Luftfeuchtigkeit zwischen 0 und 100 Prozent liegt."
)
exp_wind_speed = gx.expectations.ExpectColumnValuesToBeBetween(
    column="wind_speed", min_value=0, max_value=50,
    description="Stellt sicher, dass die Windgeschwindigkeit zwischen 0 und 50 km/h liegt."
)

# SQL-Abfrage für Genauigkeitsvalidierung definieren (am Beispiel temperature)
my_query = """
    SELECT
        *
    FROM
        {batch}
    WHERE
        LENGTH(SPLIT_PART(CAST(temperature AS TEXT), '.', 2)) < 1
    """

# Validierung der Genauigkeit (am Beispiel der Spalte temperature)
exp_temperature_precision = gx.expectations.UnexpectedRowsExpectation(
    unexpected_rows_query=my_query,
    description="Stellt sicher, dass die Temperaturwerte mindestens eine Dezimalstelle enthalten."
)

# Validierung der Konsistenz
exp_sensor_id_consistency = gx.expectations.ExpectColumnValuesToBeInSet(
    column="sensor_id", value_set=["WS_001", "WS_002", "WS_003", "WS_004", "WS_005"],
    description="Stellt sicher, dass die Sensor-ID einem definierten Wert entspricht."
)

#### Expectations zum GX-Kontext hinzufügen

In [None]:
context.suites.get(name=EXPECTATION_SUITE).add_expectation(exp_not_null_timestamp)
context.suites.get(name=EXPECTATION_SUITE).add_expectation(exp_not_null_sensorId)
context.suites.get(name=EXPECTATION_SUITE).add_expectation(exp_not_null_location)
context.suites.get(name=EXPECTATION_SUITE).add_expectation(exp_not_null_temperature)
context.suites.get(name=EXPECTATION_SUITE).add_expectation(exp_not_null_humidity)
context.suites.get(name=EXPECTATION_SUITE).add_expectation(exp_not_null_windSpeed)
context.suites.get(name=EXPECTATION_SUITE).add_expectation(exp_unique_measurement)
context.suites.get(name=EXPECTATION_SUITE).add_expectation(exp_timestamp)
context.suites.get(name=EXPECTATION_SUITE).add_expectation(exp_temperature)
context.suites.get(name=EXPECTATION_SUITE).add_expectation(exp_humidity)
context.suites.get(name=EXPECTATION_SUITE).add_expectation(exp_wind_speed)
context.suites.get(name=EXPECTATION_SUITE).add_expectation(exp_temperature_precision)
context.suites.get(name=EXPECTATION_SUITE).add_expectation(exp_sensor_id_consistency)



#### Validation-Definition zum Kontext hinzufügen

In [None]:
validation_definition = gx.ValidationDefinition(
    data=batch_definition, 
    suite=context.suites.get(name=EXPECTATION_SUITE), 
    name=VALIDATION_DEFINITION
)
validation_definition = context.validation_definitions.add_or_update(validation_definition)

### Definition des Checkpoints

In [None]:
action_list = [    
    # Diese Aktion aktualisiert die Data Docs statische Website mit den Validierungsergebnissen,
    # nachdem der Checkpoint ausgeführt wurde.
    UpdateDataDocsAction(
        name="update_all_data_docs",
    ),
]

# Checkpoint erstellen
checkpoint = gx.Checkpoint(
    name=CHECKPOINT_NAME,
    validation_definitions=[context.validation_definitions.get(name=VALIDATION_DEFINITION)],
    actions=action_list,
    result_format={"result_format": "COMPLETE"},
)

# Checkpoint zum Kontext hinzufügen
context.checkpoints.add_or_update(checkpoint)

### Test-Validierung

In [None]:
# Vorhandenen Checkpoint anhand seines Namens abrufen und ausführen
checkpoint_test = context.checkpoints.get(name=CHECKPOINT_NAME)
checkpoint_test.run()