# Soda Python Scan - Demo

- Das Notebook enthält Demo Codes für Soda, die sich hauptsächlich mit Python Scans mit Soda Core in Kombination mit Soda Cloud und Postgres als Data Source fokussiert (am Ende des Notebooks werden noch einige Features von Soda Cloud präsentiert)

In [None]:
from soda.scan import Scan
from soda.sampler.sampler import Sampler
from soda.sampler.sample_context import SampleContext
import pandas as pd
import psycopg2
from psycopg2 import sql
from IPython.display import Code, Image
from datetime import datetime
import json
import sys


In [None]:
host = 'localhost'
database = 'demo'
user = 'postgres'
password = 'test'
schema = 'public'
table_name = 'bus_breakdown_and_delays'

def connect(database='demo'):
    """Connect to database"""
    conn = None
    try:
        print('Connecting...')
        conn = psycopg2.connect(
            host=host,
            database=database,
            user=user,
            password=password)
    except (Exception, psycopg2.DatabaseError) as error:
        print(error)
        sys.exit(1)
    print(f'All good, Connection to {database} successful!')
    return conn



def sql_to_dataframe(conn, query, column_names):
    """
    Import data from a PostgreSQL database using a SELECT query 
    """
    cursor = conn.cursor()
    try:
        cursor.execute(query)
    except (Exception, psycopg2.DatabaseError) as error:
        print('Error: %s' % error)
        cursor.close()
        return 1
    # The execute returns a list of tuples:
    tuples_list = cursor.fetchall()
    cursor.close()
    # Now we need to transform the list into a pandas DataFrame:
    df = pd.DataFrame(tuples_list, columns=column_names)
    return df

## Visualisierung von Demo-Daten
- Verbindung zur PostgreSQL Datenbank aufbauen und Laden der Daten in ein Pandas Dataframe

In [None]:
columns = ['school_year', 'busbreakdown_id', 'run_type', 'bus_no', 'route_number',
           'reason', 'schools_serviced', 'occured_on', 'created_on', 'boro',
           'bus_company_name', 'how_long_delayed', 'number_of_students_on_the_bus',
           'has_contractor_notified_schools', 'has_contractor_notified_parents',
           'have_you_alerted_opt', 'informed_on', 'incident_number',
           'last_updated_on', 'breakdown_or_running_late', 'school_age_or_prek']

query = """ SELECT * 
            FROM bus_breakdown_and_delays
        """

conn = connect()
bus_df = sql_to_dataframe(conn, query, columns)
conn.close()

bus_df

## Soda Cloud einrichten und API key erstellen
- Erstelle einen Soda Account (free 45-day trial) unter https://cloud.soda.io/signup -> Eingloggen
- Navigiere zum **your avatar** > **Profile** und klicke auf **API keys** Tab
- Clicke auf das **+** Icon und generiere ein neues API key, secret Paar und speichere die Information local ab

## Soda Core mit Demo-Datenbank und Soda Cloud verbinden
- `configuration.yml` Datei enthält alle Information um sich mit der Datenbank (`data_source`) mit Soda zu verbinden
- Auch Information, um sich mit der Soda Cloud zu verbinden können hier eingetragen werden (`soda_cloud`)
- Ersetze `api_key_id` und `api_key_secret` in `configuration.yml` mit den eigenen erstellten Werten

In [None]:
(bus_df.head(4))

In [None]:
# Display connection configuration yml file
Code(filename='./configuration.yml', language='YAML')

## Definiere Metrics und Checks mit Soda Checks Language (SodaCL)
- Checks werden in eine YAML Datei defniert: Es gibt verschiedene Arten von Checks, die definiert werden können
- `Built-in Metrics` (zusammen mit ein Threshold Wert) können verwendet werden, um Checks zu definieren (es gibt über 25 verschiedene Built-in Metrics)
    - Im Bsp. werden 5 verschiedene Build-in Metrics verwendet
        - `row_count`: überprüft die Daten auf ausreichende Einträge
        - `missing_count`: überprüft eine Spalte auf fehlende Werte
        - `duplicate_count`: überprüft eine Spalte auf duplikate Werte
        - `freshness`: überprüft die Daten auf Aktualität mit Referenzvariable `NOW` (default: current_timestamp) unter Berücksichtigung einer Date/Timestamp Spalte
        - `invalid count`: testet, ob eine Spalte nicht-valide Werte enthält (valid values können definiert werden)
        - auch interessant `Anomaly checks`: analysiert Trends und Pattern in Daten über die Zeit; kann auch für Outlier Detection verwendet werden
- `Custom Checks`: Checks, die nicht mit Built-in Metrics definiert werden können, können als Custom Checks durch Verwendung von `User-defined checks` und `Failed rows checks` definiert werden
    - `User-defined checks`: ermöglichen common-table expressions (CTE) oder SQL queries zu erstellen, die Soda Library während des Scans verwendet
    - `Failed rows checks`: können verwendet werden, um Einträge, die im User-defined check fehlgeschlagen sind an Soda Cloud zu senden
    - Im Bsp. wird als Custom Check definiert, der prüft, ob für alle Einträge jeweil der Datum-Wert von `last_updated_on` Spalte größer oder gleich der Datum-Wert von `created_on` Spalte ist (fehlgeschlagene Einträge werden an Soda Cloud gesendet)
- `Schema Checks`: können verwendet werden, um das Schema der Daten (z.B. Präsenz von bestimmten Spalten) zu prüfen
- `warn` oder `fail`: Für die definierten Checks können Alert Levels definiert werden

In [None]:
# Display checks configuration yml file
Code(filename='./demo_tests.yml', language='YAML')

## Scans programmatisch mit Python ausführen
- Soda library, die als Python library zur Verfügung steht, kann benutzt werden, um Scans auszuführen
- Erstelle dafür ein `scan` Objekt der Klasse `soda.scan.Scan`
- scan Objekte können durch Verwendung verschiedener Methoden konfiguriert werden:
    - `add_configuration_yaml_file`: Mit der Methode kann eine `configuration.yml` (Connection Konfiguration) Datei für das scan Objekt gesetzt werden
        - Alternativ `add_configuration_yaml_str`: Definiere den Inhalt von `configuration.yml` Datei in ein String und übergebe den Wert an dieser Methode
    - `set_data_source_name`: Übergebe ein Name von data_source, welches in der `configuration.yml` Datei definiert ist
    - `set_scan_definition_name`: Spezifiziert den Scan Definition Name (wenn verbunden mit Soda Cloud), um Check Ergebnisse separat zu halten (z.B. dev, prod, staging)
    - `add_variables`: Variablen können mit der Methode für das scan Objekt gesetzt werden (hier wird z.B. die `NOW` Variable gesetzt, die für den freshness Check verwendet wird -> hier `"NOW": "2021-09-13 00:00:00"`und `freshness(last_updated_on) < 1d`: Check schlägt fehl, wenn der neueste Timestamp-Eintrag der Spalte älter als "2021-09-12 00:00:00" ist)
    - `add_sodacl_yaml_file`: mit der Methode können Check YAML Files an das scan Objekt übergeben werden
        - Alternativ `add_sodacl_yaml_str`: Definiere Cheks (mit SodaCL) in ein String und übergebe den Wert an dieser Methode
    - `execute`: Führt den Scan aus und gibt ein Exitcode zurück (https://docs.soda.io/soda-library/programmatic.html#scan-exit-codes)
    - `set_verbose`: Führt den Scan in "verbose" Mode (mehr Logging-Information als Standard Mode) durch
    - `get_logs_text`: Gibt den Log-Output des Scans als String zurück

In [None]:
scan = Scan()

# Set scan definition name, equivalent to CLI -s option
scan.set_scan_definition_name("soda_demo")

scan.set_data_source_name("demo_datasource")

# Add config yml file
scan.add_configuration_yaml_file(file_path="./configuration.yml")

# Add variables
scan.add_variables({"NOW": "2021-09-13 00:00:00"})

# Add check YAML files 
scan.add_sodacl_yaml_file("./demo_tests.yml")

# Execute the scan
exit_code = scan.execute()
print("Exit code:", exit_code)

# Set logs to verbose mode, equivalent to CLI -V option
scan.set_verbose(True)

# Print results of scan
print(scan.get_logs_text())

### Checks Results Informationen in Soda Cloud
- Da wir in `configuration.yml` eine Connection mit Soda Cloud konfiguriert haben, können die Check Information in der Soda Cloud eingesehen werden
- Da wir für den Custom Check `Failed rows checks` definiert haben, werden alle Einträge die den Custom Check nicht erfüllen in der Soda Cloud angezeigt (siehe 1. Bild unten)
- Auch Informationen über Schema Checks werden angezeigt (siehe 2. Bild unten)

In [None]:
Image(filename='./images/soda_cloud_failed_rows.png')

In [None]:
Image(filename='./images/soda_cloud_schema_check.png') 

### Scan Results speichern
- Um alle Informationen des Scan Results zu erhalten, kann die Methode `get_scan_results()` verwendet werden
- Diese Informationen können dann an beliebigen Orten gespeichert werden

In [None]:
scan.get_scan_results()

### Weitere Methoden des scan Objekts
- `scan.assert_no_error_logs()`, `scan.assert_no_checks_fail()`, `scan.assert_no_checks_warn_or_fail()`: AssertionError wird geworfen, falls der Scan Error-Logs / Check-Fails /Check-Fails und -Warnings enthält

- `scan.has_error_logs()`: Gibt ein Boolean Wert zurück, der beschreibt, ob das Log Errors beinhaltet (hilfreich, wenn man Code Blöcke ausführen möchte, die nur ausgeführt werden sollte, falls das Log Fehler enthält)
- `scan.get_error_logs_text()`: Gibt den Log Text zurück, falls das Log Errors beinhaltet

- `scan.get_checks_fail()`, `scan.get_checks_warn_or_fail()`: Gibt eine List zurück die alle fehlgeschlagene/warnende Checks als Objekte enthält. Auf ein Check-Objekt kann die Methode `get_dict()` angewendet werden, um alle Information des bestimmten Checks zu erhalten (siehe Beispiele unten)

- `scan.get_checks_fail_text()`, `scan.get_checks_warn_or_fail_text()`: Gibt jeweils eine Beschreibung für alle fehlgeschlagene/warnende Checks als String zurück

- `scan.has_check_fails()`, `scan.has_checks_warn_or_fail()`: Gibt ein Boolean Wert zurück, der beschreibt, ob der Scan fehlgeschlagene/warnende Checks durchgeführt hat (hilfreich, wenn man Code Blöcke ausführen möchte, die nur ausgeführt werden sollte, falls der Scan Checks mit Fehler/Warnungen enthält)

- `scan.get_all_checks_text()`: Gibt jeweils eine Beschreibung für alle durchgeführten Checks als String zurück

### Mehr Informationen aus einzelne fail und warn Ergebnisse bekommen
- auf ein Check Objekt (enthalten in der Liste zurückgegeben durch z.B. `scan.get_checks_warn_or_fail_text()` Methode) kann die Methode `get_dict()` angewendet werden, um alle Information des Checks zu erhalten
- Diese Informationen können an beliebigen Orten gespeichert oder auch weiterverarbeitet werden

#### Beispiel 1: Information des Custom Checks
- hier sind bspw. Information wie `totalRowCount` (Anzahl fehlgeschlagener Einträge) dabei, die im Log nicht angezeigt werden

In [None]:
scan.get_checks_warn_or_fail()[0].get_dict()

#### Beispiel 2: Information des Schema Checks
- `column_type_mismatches` enthält hier ausführliche Information, welche Spalte die definierte Bedingungen nicht erfüllt

In [None]:
scan.get_checks_warn_or_fail()[1].get_dict()

## Failed Row sampes als output visualisieren (statt nur in Soda Cloud anzuzeigen)
- Dateneinträge, die den Check nicht erfüllt haben, können in der Regel nur in der Soda Cloud angesehen werden
- Um Lokal Zugriff auf diese Einträge zu bekommen, ist es möglich eine `CustomSampler` Klasse zu erstellen, die den Standard `Sampler` anpasst
    - Im Beispiel werden die Einträge in ein Pandas Dataframe überführt und danach als eine CSV Datei gespeichert (separat für alle definierten Checks; hier nur eine Datei als Ergebnis, da nur ein Failed rows check definiert wurde)
    - Hier werden 2 weitere Spalten `failed_check` (Name des definierten Checks) und `created_at` hinzugefügt (hilfreich, wenn man alle erstellten CSV in ein Dataframe kombinieren möchte)
    - Mit `scan.sampler = CustomSampler()` wird der CustomSampler statt den Standard-Sampler verwendet

In [None]:
failed_rows_cloud = "false"

class CustomSampler(Sampler):

    def store_sample(self, sample_context: SampleContext):
        rows = sample_context.sample.get_rows()
        json_data = json.dumps(rows) # Convert failed rows to JSON
        exceptions_df = pd.read_json(json_data) #create dataframe with failed rows
        # Define exceptions dataframe
        exceptions_schema = sample_context.sample.get_schema().get_dict()
        exception_df_schema = []
        for n in exceptions_schema:
            exception_df_schema.append(n["name"])
        exceptions_df.columns = exception_df_schema
        check_name = sample_context.check_name
        exceptions_df['failed_check'] = check_name
        exceptions_df['created_at'] = datetime.now()
        exceptions_df.to_csv(check_name+".csv", sep=",", index=False, encoding="utf-8")

scan = Scan()

if failed_rows_cloud == "false":
    scan.sampler = CustomSampler()

scan.set_scan_definition_name("soda_demo")
scan.set_data_source_name("demo_datasource")

# Add config yml file
scan.add_configuration_yaml_file(file_path="./configuration.yml")

# Add variables
scan.add_variables({"NOW": "2021-09-13 00:00:00"})

# Add check YAML files 
scan.add_sodacl_yaml_file("./demo_tests.yml")

# Execute the scan
exit_code = scan.execute()
print("Exit code:", exit_code)

# Set logs to verbose mode, equivalent to CLI -V option
scan.set_verbose(True)

# Print results of scan
print(scan.get_logs_text())

In [None]:
fail_df = pd.read_csv('./last_updated_on_after_created_on = 0.csv')
fail_df

## Soda Cloud

### Soda hosten
- Soda Agents werden benutzt, um Datenvalidierung und -monitoring auszuführen
- Es gibt 2 unterschiedliche Arten, um Soda Agents zu hosten und managen:
- `Soda-hosted Agent` (Standard):
    - Infrastruktur ist verwaltet bei Soda -> Soda kümmert sich um Setup, Maintenance und Skalierung des Agenten
    - **Vorteile**:
        - Einfache Nutzung: nicht nötig um sich um die Infrastruktur zu kümmern
        - Kosten werden berechnet nach Nutzung (möglicherweise mehr Kosten als Self-hosted Agent nach Nutzung)
    - **Nachteile**
        - Infrastruktur nicht anpassbar nach Compliance
- `Self-hosted Agent`:
    - Infrastruktur wird durch den User verwaltet
    - **Vorteile**:
        - Mehr Flexibilität und Kontrolle: Agents beliebig anpassbar
        - Sicherheit: volle Kontrolle über die Infrastruktur (anpassbar nach Compliance)
    - **Nachteile**
        - Kosten um die Infrastruktur zu managen
        - Updates und Maintenance müssen von User verwaltet werden   

### Andere wichtige Features von Soda (Soda Cloud)
- `Data Profiling`: detailliertes Profiling von Daten inkl. Statistiken wie Distribution, Uniqueness, Null Values etc. generieren
- `Alert Notifications` konfigurieren: Alerts können nach konfigurierbaren Bedingungen an verschidene Kanäle (wie Email, Slack etc.) gesendet werden
- `Scheduled Scans`: Automatisieren von Ausführung von Scans
- `Data source support`: support verschiederner Data sources wie BigQuery, Snowflake, Databricks, MYSQL, PostgreSQL etc.
- `Check suggestions`: Für bestimmte Data sources (GCP BigQuery, PostgresSQL, Snowflake) ist es möglich Vorschläge für mögliche Checks, die berücksichtigt werden könnten, zu erhalten
- `Soda in andere Tools integrieren` (Jira, Github, dbt, etc.): z.B. Soda GitHub Action kann in GitHub Workflow integriert werden um automatisch Scans auszuführen
- `Soda GPT` in Soda Cloud Account: Interactiver Chatbot um Soda Checks zu schreiben
- `Anpassbare Dashboards`: bspw. kann das Anomaly Dashboard aktiviert werden mit Funtionen wie:
    - **Proactive Issue Detection**: Frühere Entdeckung pontenzieller Probleme bevor sie auftreten
    - **Montoring von Data Quality**: Monitoring und idenfizieren von Anomalien in Daten
    - **Visualisierung und Interaktionen zwischen Team-Mitglieder**
- `Roles` und `rights` um Zugriffsrechte zu verwalten:
    - **Admin**: Voller Zugriff und Kontrolle (inkl. User und Configuration Management) im Platform
    - **Editor**: Erzeugen und Managen von Data Quality Checks und Dashboards
    - **Viewer**: Read-only Zugriff auf Reports, Dashboards, Data Profiles
    - **Guest**: View-only Zugriff auf nur bestimmte Reports und Dashboards, die mit denen geteilt wurden
    - **Custom**: Flexibel definierbarer Rolle

### Usecases von Soda
#### Daten testen in einer Pipeline
- **Bsp. Ausgangssituation**:
    - Data Engineers verwenden dbt um Models aus Daten zu generieren und diese Models zu transformieren und in ein Reporting- und Visualisierungstool zu pushen 
    - Airflow wird für Scheduling und Monitoring von Workflows (inkl. Data Ingestion und Transformation Events) verwendet
- Scan für Data Quality Checks ausführen nach Events und bevor Informationen in ein Reporting Tool gepusht werden
- Falls Scan Ergebnisse Probleme anzeigen, kann Soda die Entwickler direkt notifizieren und die Pipeline stoppen
- Verhindert, dass die Data Quality Probleme, Probleme in Reporting verursachen

#### Daten testen bevor Migration
- Bsp. Daten werden von einer Data source zu einer anderen Data source migriert (z.B. von PostgreSQL nach Snowflake)
- Soda kann Reconciliation Checks (Abstimmungschecks) verwenden, um Data Quality bevor und nach Migration zu vergleichen

#### Daten testen während Development
- **Bsp. Ausgangssituation**:
    - Entwicklerteam verwenden GitHub für ein dbt Projekt, um Daten Ingestion und Transformation zu managen
    - Für Data Quality Tests wurden YAML Files mit SodaCL Checks erstellt
- In GitHub Action Workflow kann Soda Scans integriert werden, um die Data Quality nach jeden Pull Request, Commit etc. auszuführen