# Detox usecase Notebook

This notebook is divided in two sections. The first section creates an user defined size Dataframe with columns in order to emulate a generic DE/DS/ML pipeline result. The second session calls rules from pyDeequ to generate logs 

In [1]:
import os
os.environ["SPARK_VERSION"] = "3.3"  # Change this to your version if needed

In [2]:
!pip install pydeequ

[0m

In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, IntegerType, FloatType
from pyspark.sql import functions as F
import pydeequ

# Initialize Spark session with Deequ package
spark = (SparkSession
    .builder
    .appName("Data Quality Checks Example")
    .config("spark.jars.packages", "com.amazon.deequ:deequ:2.0.1-spark-3.2")  # Specify Deequ version
    .config("spark.jars.packages", pydeequ.deequ_maven_coord)
    .config("spark.jars.excludes", pydeequ.f2j_maven_coord)
    .getOrCreate())


:: loading settings :: url = jar:file:/usr/local/lib/python3.9/dist-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
com.amazon.deequ#deequ added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-1056773b-c0e1-4ddd-a645-ca58a1eb9689;1.0
	confs: [default]
	found com.amazon.deequ#deequ;2.0.3-spark-3.3 in central
	found org.scala-lang#scala-reflect;2.12.10 in central
	found org.scalanlp#breeze_2.12;0.13.2 in central
	found org.scalanlp#breeze-macros_2.12;0.13.2 in central
	found com.github.fommil.netlib#core;1.1.2 in central
	found net.sf.opencsv#opencsv;2.3 in central
	found com.github.rwl#jtransforms;2.4.0 in central
	found junit#junit;4.8.2 in central
	found org.apache.commons#commons-math3;3.2 in central
	found org.spire-math#spire_2.12;0.13.0 in central
	found org.spire-math#spire-macros_2.12;0.13.0 in central
	found org.typelevel#machinist_2.12;0.6.1 in central
	found com.chuusai#shapeless_2.12;2.3.2 in central
	found org.typelevel#macro-compat_2.12;1.1.1 in central
	fo

24/09/23 23:50:05 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


24/09/23 23:50:06 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [4]:
from pyspark.sql import SparkSession
import random
import time
def create_data_table(spark):
    """
    Generate a DataFrame containing randomly generated data.

    The DataFrame will have the following columns:
    - 'Scores': Random integers between 0 and 10.
    - 'Temperature': Random integers between -10 and 50.
    - 'RandomValues': Random floating-point numbers between 0 and 1, with some values randomly set to None (NaN).
    - 'NormallyDistributed': Random values generated from a normal distribution with a mean of 10 and a standard deviation of 4.

    Returns:
    DataFrame: A PySpark DataFrame containing the generated data with four columns.
    """
    num_rows = random.randint(0, 1000)
    scores = [random.randint(0, 10) for _ in range(num_rows)]
    temperature = [random.randint(-10, 50) for _ in range(num_rows)]
    random_values = [random.uniform(0, 1) for _ in range(num_rows)]
    normally_distributed = [random.gauss(0, 1) for _ in range(num_rows)]


    # Create a DataFrame
    data = {
        'Scores': scores,
        'Temperature': temperature,
        'RandomValues': random_values,
        'NormallyDistributed': normally_distributed
    }
    
    # Convert to DataFrame
    df = spark.createDataFrame([(s, t, r, n) for s, t, r, n in zip(scores, temperature, random_values, normally_distributed)],
                                schema=['Scores', 'Temperature', 'RandomValues', 'NormallyDistributed'])
    return df


def bump_df(df, spark):
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, when, rand, expr
    import numpy as np

    if np.random.random() < 0.20:
 
        # Insert random values greater or lower than 10 into random positions in the 'Scores' column
        df = df.withColumn(
            'Scores',
            when(rand() < 0.50, (rand() * 20) - 10).otherwise(col('Scores'))
        )

        # Insert NaNs into random positions in the 'RandomValues' column
        df = df.withColumn(
            'RandomValues',
            when(rand() < 0.30, None).otherwise(col('RandomValues'))
        )
        
        # Insert NaNs into random positions in the 'RandomValues' column
        df = df.withColumn(
            'Temperature',
            when(rand() < 0.40, -999).otherwise(col('RandomValues'))
        )

    
    return df


df = create_data_table(spark)
bdf = bump_df(df, spark)
bdf.show()



[Stage 0:>                                                          (0 + 1) / 1]

+-------------------+--------------------+--------------------+--------------------+
|             Scores|         Temperature|        RandomValues| NormallyDistributed|
+-------------------+--------------------+--------------------+--------------------+
|                1.0|  0.9520685788946797|  0.9520685788946797| 0.37753894771242275|
|  4.186511062244069| 0.41224462431392883| 0.41224462431392883| -1.2359142700919765|
|  7.933151473106406|              -999.0|  0.9839312708387886|-0.16365980985727172|
|               10.0|                null|                null|  -1.225107123424144|
|               10.0|                null|                null|  -0.131061343635309|
|  3.283370012164891|  0.9190952257084841|  0.9190952257084841|-0.04047789955820...|
|               10.0|              -999.0|                null|  0.6523931848660555|
|   9.97876857428571|                null|                null| -1.8501500456629973|
| -1.515260283681803| 0.19511099669962084| 0.19511099669962084| -

                                                                                

## Second step

In this second phase, the pydeequ implements some checks in the columns of the matrices and yields logs about their results 

In [5]:
def pydeequ_check(df, spark):

    from pydeequ.checks import Check, CheckLevel
    from pydeequ.verification import VerificationSuite, VerificationResult
    from pyspark.sql import SparkSession

    # Define checks using PyDeequ
    check = Check(spark, CheckLevel.Error, "Data Quality Checks")

    # Add checks for the DataFrame
    check_result = (
            check
            .hasSize(lambda x: x == 300)  # Check for DataFrame size
            .hasMin("Scores", lambda x: x == 0.0)  # Scores should have a minimum of 0
            .hasMax("Scores", lambda x: x == 10.0)  # Scores should have a maximum of 10
            .isComplete("RandomValues")  # Check for completeness in RandomValues
            .isNonNegative("Scores")  # Check Temperature for non-negativity
            .hasMean("NormallyDistributed", lambda x: x > 0.0)  # Check Normally Distributed values
            .hasStandardDeviation("NormallyDistributed", lambda x: x >  1.0)

    )
    
    result = VerificationSuite(spark) \
    .onData(df) \
    .addCheck(check) \
    .run()
    
    return VerificationResult.checkResultsAsDataFrame(spark, result, pandas=True)

## Third step

Finally, Detox log each result into the Cloudwatch

In [None]:
import boto3
import json
import time
import os

import time
import json

def create_watchtower_log(log_group_name, log_stream_name, log_message, client):
    """
    Creates a log group and log stream if they do not exist, and writes a log event with status.

    Parameters:
    - log_group_name: Name of the CloudWatch log group.
    - log_stream_name: Name of the CloudWatch log stream.
    - log_message: Message to log.
    - log_status: Status to log (e.g., 'INFO', 'ERROR').
    - client: Boto3 CloudWatch Logs client.
    """
    
    # Create the log group if it doesn't exist
    try:
        client.create_log_group(logGroupName=log_group_name)
    except client.exceptions.ResourceAlreadyExistsException:
        pass

    # Create the log stream if it doesn't exist
    try:
        client.create_log_stream(logGroupName=log_group_name, logStreamName=log_stream_name)
    except client.exceptions.ResourceAlreadyExistsException:
        pass

    # Get the current timestamp
    timestamp = int(time.time() * 1000)

    # Prepare the log event with status
    log_event = {
        'timestamp': timestamp,
        'message': str(log_message)

    }

    # Format the log event as JSON
    json_log_event = json.dumps(log_event)

    # Get the sequence token for the log stream
    response = client.describe_log_streams(
        logGroupName=log_group_name,
        logStreamNamePrefix=log_stream_name
    )
    
    sequence_token = response['logStreams'][0].get('uploadSequenceToken')

    # Put the log event to CloudWatch
    if sequence_token:
        client.put_log_events(
            logGroupName=log_group_name,
            logStreamName=log_stream_name,
            logEvents=[{
                'timestamp': timestamp,
                'message': json_log_event
            }],
            sequenceToken=sequence_token
        )
    else:
        client.put_log_events(
            logGroupName=log_group_name,
            logStreamName=log_stream_name,
            logEvents=[{
                'timestamp': timestamp,
                'message': json_log_event
            }]
        )
    
# Main function. Make sure you have a credential file accessible for boto3
# This main function makes 10 trials for pydeequ + cloudwatch
if __name__ == "__main__":

    # Create a session with your access keys
    os.environ['AWS_SHARED_CREDENTIALS_FILE'] = '/opt/workspace/.aws/credentials'
    session = boto3.Session(profile_name='hackathon', region_name='us-west-2')
    
    # Create a CloudWatch Logs client
    client = session.client('logs')
    log_group = 'hackathon'
    log_stream = 'detox_usecase'
    
    
    for i in range(1,100):
        print(f"step {i}")
        df = create_data_table(spark)
        bdf = bump_df(df, spark)
        checkResult_df = pydeequ_check(df, spark)
        print(checkResult_df)
        checkResult_df.apply(lambda row: create_watchtower_log(log_group, log_stream, row['constraint_message'], client), axis=1)
        # Wait for 3 minutes (180 seconds)
        time.sleep(1.5)

step 1
Python Callback server started!




                 check check_level check_status  \
0  Data Quality Checks       Error        Error   
1  Data Quality Checks       Error        Error   
2  Data Quality Checks       Error        Error   
3  Data Quality Checks       Error        Error   
4  Data Quality Checks       Error        Error   
5  Data Quality Checks       Error        Error   
6  Data Quality Checks       Error        Error   

                                          constraint constraint_status  \
0                         SizeConstraint(Size(None))           Failure   
1            MinimumConstraint(Minimum(Scores,None))           Success   
2            MaximumConstraint(Maximum(Scores,None))           Success   
3  CompletenessConstraint(Completeness(RandomValu...           Success   
4  ComplianceConstraint(Compliance(Scores is non-...           Success   
5     MeanConstraint(Mean(NormallyDistributed,None))           Failure   
6  StandardDeviationConstraint(StandardDeviation(...           Failure   

In [None]:
checkResult_df