# Intro

This notebook will show how an overview of the library. After running each thoth command you can check the results in the dashboard to better understand the flow and the behavior of the system.

# Setup

In [1]:
%load_ext autoreload
%autoreload 2

In [2]:
# fix working dir
import pathlib
import os

path = os.path.join(pathlib.Path().absolute(), "../")
os.chdir(path)

In [3]:
# imports
from pyspark.sql import SparkSession
import pydeequ
import json
import datetime
import os

Please set env variable SPARK_VERSION


In [4]:
# spark context
spark = (
    SparkSession.builder.config("spark.sql.session.timeZone", "UTC")
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .appName("thoth")
    .getOrCreate()
)

22/11/05 19:11:06 WARN Utils: Your hostname, rleinio-pc resolves to a loopback address: 127.0.1.1; using 192.168.1.132 instead (on interface enp8s0)
22/11/05 19:11:06 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Ivy Default Cache set to: /home/rleinio/.ivy2/cache
The jars for the packages stored in: /home/rleinio/.ivy2/jars
:: loading settings :: url = jar:file:/home/rleinio/.pyenv/versions/3.9.13/envs/thoth-3.9.13/lib/python3.9/site-packages/pyspark/jars/ivy-2.4.0.jar!/org/apache/ivy/core/settings/ivysettings.xml
com.amazon.deequ#deequ added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-f698148b-05a2-486d-afab-7802abc0d3b2;1.0
	confs: [default]
	found com.amazon.deequ#deequ;1.2.2-spark-3.0 in central
	found org.scalanlp#breeze_2.12;0.13.2 in central
	found org.scalanlp#breeze-macros_2.12;0.13.2 in central
	found org.scala-lang#scala-reflect;2.12.1 in central
	found com.github.fommil.netlib#core;1.1.2 in central
	found net.sf

In [5]:
# metrics repository connection
os.environ["DATABASE_URL"] = os.environ.get(
    "DATABASE_URL",
    "postgresql+pg8000://postgres:postgres@localhost:5432/metrics_repository",
)

# Load Dataset

In [6]:
with open("sample_datasets/temperatures_extended.json") as f:
    json_data = [
        {**record, "ts": datetime.datetime.fromisoformat(record.get("ts"))}
        for record in json.load(f)
    ]
print("Dataset head: ", json_data[:5], "\n")
print("Dataset tail: ", json_data[-5:], "\n")
print("Dataset number of records: ", len(json_data), "\n")
print(
    "Dataset number of ts daily partitions: ",
    len(set(record.get("ts").date() for record in json_data)),
)

Dataset head:  [{'ts': datetime.datetime(1981, 1, 1, 7, 23, 33, tzinfo=datetime.timezone.utc), 'value': 22.1467670458884, 'sensor': 'Sensor E'}, {'ts': datetime.datetime(1981, 1, 1, 21, 57, 57, tzinfo=datetime.timezone.utc), 'value': 22.8849008762327, 'sensor': 'Sensor C'}, {'ts': datetime.datetime(1981, 1, 1, 12, 11, 56, tzinfo=datetime.timezone.utc), 'value': 22.618233805151977, 'sensor': 'Sensor B'}, {'ts': datetime.datetime(1981, 1, 1, 20, 5, 54, tzinfo=datetime.timezone.utc), 'value': 25.770158591638953, 'sensor': 'Sensor E'}, {'ts': datetime.datetime(1981, 1, 1, 10, 45, 2, tzinfo=datetime.timezone.utc), 'value': 23.005804204490918, 'sensor': 'Sensor B'}] 

Dataset tail:  [{'ts': datetime.datetime(1981, 12, 31, 13, 22, 46, tzinfo=datetime.timezone.utc), 'value': 23.434759603073424, 'sensor': 'Sensor D'}, {'ts': datetime.datetime(1981, 12, 31, 12, 55, 17, tzinfo=datetime.timezone.utc), 'value': 25.313363047160777, 'sensor': 'Sensor B'}, {'ts': datetime.datetime(1981, 12, 31, 3, 4, 

## Splitting dataset into history, new scoring batches, and an artificial anomaly batch

In [None]:
# historical data with fair confidence of good quality
history_df = spark.createDataFrame(
    data=[
        record
        for record in json_data
        if record.get("ts").date() <= datetime.date(1981, 12, 25)
    ],
    schema="ts timestamp, value float, sensor string",
)


# new batch of data that need quality validation (normal)
new_batch_1981_12_26_df = spark.createDataFrame(
    data=[
        record
        for record in json_data
        if record.get("ts").date() == datetime.date(1981, 12, 26)
    ],
    schema="ts timestamp, value float, sensor string",
)


# new batch of data that need quality validation (normal)
new_batch_1981_12_27_df = spark.createDataFrame(
    data=[
        record
        for record in json_data
        if record.get("ts").date() == datetime.date(1981, 12, 27)
    ],
    schema="ts timestamp, value float, sensor string",
)


# new batch of data that need quality validation (normal)
new_batch_1981_12_28_df = spark.createDataFrame(
    data=[
        record
        for record in json_data
        if record.get("ts").date() == datetime.date(1981, 12, 28)
    ],
    schema="ts timestamp, value float, sensor string",
)


# new batch of data that need quality validation (normal)
new_batch_1981_12_29_df = spark.createDataFrame(
    data=[
        record
        for record in json_data
        if record.get("ts").date() == datetime.date(1981, 12, 29)
    ],
    schema="ts timestamp, value float, sensor string",
)


# new batch of data that need quality validation (normal)
new_batch_1981_12_30_df = spark.createDataFrame(
    data=[
        record
        for record in json_data
        if record.get("ts").date() == datetime.date(1981, 12, 30)
    ],
    schema="ts timestamp, value float, sensor string",
)
# Artificial anomaly: temperatures in fahrenheit instead of celsius
new_batch_1981_12_30_anomaly_df = spark.createDataFrame(
    data=[
        {
            "ts": record.get("ts"),
            "value": ((record.get("value")) * 9 / 5) + 32
            if record.get("value")
            else None,
            "sensor": record.get("sensor"),
        }
        for record in json_data
        if record.get("ts").date() == datetime.date(1981, 12, 30)
    ],
    schema="ts timestamp, value float, sensor string",
)
# Artificial anomaly: one sensor starts to output only null values
new_batch_1981_12_30_anomaly2_df = spark.createDataFrame(
    data=[
        {
            "ts": record.get("ts"),
            "value": None
            if record.get("sensor") == "Sensor B"
            else record.get("value"),
            "sensor": record.get("sensor"),
        }
        for record in json_data
        if record.get("ts").date() == datetime.date(1981, 12, 30)
    ],
    schema="ts timestamp, value float, sensor string",
)


# new batch of data that need quality validation (normal)
new_batch_1981_12_31_df = spark.createDataFrame(
    data=[
        record
        for record in json_data
        if record.get("ts").date() == datetime.date(1981, 12, 31)
    ],
    schema="ts timestamp, value float, sensor string",
)

# Creating the Dataset on the Metrics Repository

In [None]:
import thoth as th

In [None]:
# Setup connection and init the Metrics Repository db
from sqlmodel import Session

session = Session(th.build_engine())
th.init_db(clear=True)

## 3 Steps: Profile the history data, create dataset and optimize models for each metric

In [None]:
profiling, optimization = th.profile_create_optimize(
    df=history_df,
    dataset_uri="temperatures",
    ts_column="ts",
    profiling_builder=th.profiler.SimpleProfilingBuilder(),
    optimize_last_n=100,
    optimize_target_confidence=0.99,
    session=session,
    spark=spark,
)

You can check [this](http://localhost:8501/?dataset_uri=temperatures&view=%F0%9F%91%A4+Profiling) link to open the UI and see the profiling metrics calculated for the `temperatures` dataset. Try also changing the option from `profiling` to `optimization` to check the models validations and which model and threshold were automatically chosen for each profiling time series.

## Assessing subsequent new (normal) batches of data

In [None]:
th.assess_new_ts(
    df=new_batch_1981_12_26_df,
    ts=datetime.datetime(1981, 12, 26),
    dataset_uri="temperatures",
    profiling_builder=th.profiler.SimpleProfilingBuilder(),
    session=session,
)

In [None]:
th.assess_new_ts(
    df=new_batch_1981_12_27_df,
    ts=datetime.datetime(1981, 12, 27),
    dataset_uri="temperatures",
    profiling_builder=th.profiler.SimpleProfilingBuilder(),
    session=session,
)

In [None]:
th.assess_new_ts(
    df=new_batch_1981_12_28_df,
    ts=datetime.datetime(1981, 12, 28),
    dataset_uri="temperatures",
    profiling_builder=th.profiler.SimpleProfilingBuilder(),
    session=session,
)

In [None]:
th.assess_new_ts(
    df=new_batch_1981_12_29_df,
    ts=datetime.datetime(1981, 12, 29),
    dataset_uri="temperatures",
    profiling_builder=th.profiler.SimpleProfilingBuilder(),
    session=session,
)

You can check the `scoring` option in the dashboard to see these last 4 scorings points, which should be marked as OK 🟢 (normal behavior according to the system)

## Assessing anomalous batches of data

In [None]:
th.assess_new_ts(
    df=new_batch_1981_12_30_anomaly_df,
    ts=datetime.datetime(1981, 12, 30, tzinfo=datetime.timezone.utc),
    dataset_uri="temperatures",
    profiling_builder=th.profiler.SimpleProfilingBuilder(),
    session=session,
)

You can check the `scoring` option in the dashboard to see this last scoring point, which should be marked as Anomaly 🔴

In [None]:
th.assess_new_ts(
    df=new_batch_1981_12_30_anomaly2_df,
    ts=datetime.datetime(1981, 12, 30, tzinfo=datetime.timezone.utc),
    dataset_uri="temperatures",
    profiling_builder=th.profiler.SimpleProfilingBuilder(),
    session=session,
)

You can check the `scoring` option in the dashboard to see this last scoring point, which should be marked as Anomaly 🔴 again

## After "fixing/cleaning" the new batch, continue subsequent assessment of new batches as they arrive at the data platform
Finally these next two runs should correct the anomalous batch, and all metrics in the dashboard should come back to an OK 🟢 state again.

In [None]:
th.assess_new_ts(
    df=new_batch_1981_12_30_df,
    ts=datetime.datetime(1981, 12, 30, tzinfo=datetime.timezone.utc),
    dataset_uri="temperatures",
    profiling_builder=th.profiler.SimpleProfilingBuilder(),
    session=session,
)

In [None]:
th.assess_new_ts(
    df=new_batch_1981_12_31_df,
    ts=datetime.datetime(1981, 12, 31, tzinfo=datetime.timezone.utc),
    dataset_uri="temperatures",
    profiling_builder=th.profiler.SimpleProfilingBuilder(),
    session=session,
)