In [36]:
from pyspark.sql.types import StructType, StructField, TimestampType, IntegerType, DoubleType, DateType
from datetime import datetime

Importing our functions that we can run tests on and creating a helper method to compare dataframes

In [25]:
# Import our functions to run our transforms with
%run 'functions.ipynb'

In [4]:
def assert_dataframes_equal(df1, df2):
    ''' 
        Helper Function to compare two dataframes are equal
        Validates schemas, counts and rows are al equivalent, regardless of order
    '''
    # Check if the schemas match
    assert df1.schema == df2.schema

    # Compare row count
    assert df1.count() == df2.count()

    # Convert DataFrames to sets of tuples and compare
    df1_rows = {tuple(row) for row in df1.collect()}
    df2_rows = {tuple(row) for row in df2.collect()}

    # Compare the sets of rows regardless of order
    assert df1_rows == df2_rows

Silver Function Unit Tests

In [5]:
def test_extract_rows_with_null_data():
    """
        Test function to validate our extract outliers function works as intended using some sample data
    """

    # Define our schema
    schema = StructType([
        StructField("timestamp", TimestampType(), True),
        StructField("turbine_id", IntegerType(), True),
        StructField("wind_speed", DoubleType(), True),
        StructField("wind_direction", IntegerType(), True),
        StructField("power_output", DoubleType(), True)
    ])

    # Define some input data so we can test our transform works
    # Typically each row is a condition we are testing for
    sample_input_data = [
        (None, 3, 5.4, 130, 2.6),  # None for timestamp
        (datetime.strptime('2022-03-01 00:00:00', '%Y-%m-%d %H:%M:%S'), None, 5.4, 130, 2.6),
        (datetime.strptime('2022-03-01 00:00:00', '%Y-%m-%d %H:%M:%S'), 6, None, 112, 4.2),
        (datetime.strptime('2022-03-01 00:00:00', '%Y-%m-%d %H:%M:%S'), 6, None, 112, 4.2),
        (datetime.strptime('2022-04-22 20:00:00', '%Y-%m-%d %H:%M:%S'), 7, 10.8, None, 3.9),
        (datetime.strptime('2022-04-11 18:00:00', '%Y-%m-%d %H:%M:%S'), 9, 12.3, 98, None),
        (datetime.strptime('2022-03-01 16:00:00', '%Y-%m-%d %H:%M:%S'), 4, 9.3, 120, 3.2),
        (datetime.strptime('2022-04-11 11:00:00', '%Y-%m-%d %H:%M:%S'), 5, 8.5, 105, 4.5)
    ]

    # Our input data had 5 bad records and 2 good records, we expect only the good records to be kept
    expected_output_data = [
            (datetime.strptime('2022-03-01 16:00:00', '%Y-%m-%d %H:%M:%S'), 4, 9.3, 120, 3.2), 
            (datetime.strptime('2022-04-11 11:00:00', '%Y-%m-%d %H:%M:%S'), 5, 8.5, 105, 4.5)
    ]

    # Create Our Dataframes
    test_df = spark.createDataFrame(sample_input_data, schema)
    expected_output_df = spark.createDataFrame(expected_output_data, schema)

    result_df = extract_rows_with_null_data(test_df)

    assert_dataframes_equal(result_df, expected_output_df)

In [6]:
def test_extract_outliers():
    """
    Test function to validate our extract_outliers function works as intended using some sample data.
    """

    # Define our schema
    schema = StructType([
        StructField("timestamp", TimestampType(), True),
        StructField("turbine_id", IntegerType(), True),
        StructField("wind_speed", DoubleType(), True),
        StructField("wind_direction", IntegerType(), True),
        StructField("power_output", DoubleType(), True)
    ])

    # Define some input data for testing
    sample_input_data = [
        (datetime.strptime('2022-03-01 00:00:00', '%Y-%m-%d %H:%M:%S'), 1, 17.0, 100, 3.0), # Within range
        (datetime.strptime('2022-03-01 01:00:00', '%Y-%m-%d %H:%M:%S'), 2, 19.0, 150, 4.5), # Outlier in wind_speed
        (datetime.strptime('2022-03-01 02:00:00', '%Y-%m-%d %H:%M:%S'), 3, 15.0, 360, 2.0), # Outlier in wind_direction
        (datetime.strptime('2022-03-01 03:00:00', '%Y-%m-%d %H:%M:%S'), 4, 10.0, 200, 0.5)  # Outlier in power_output
    ]

    # Expected data after removing outliers
    expected_output_data = [
        (datetime.strptime('2022-03-01 00:00:00', '%Y-%m-%d %H:%M:%S'), 1, 17.0, 100, 3.0) # Only this row should remain
    ]

    # Create DataFrames
    test_df = spark.createDataFrame(sample_input_data, schema)
    expected_output_df = spark.createDataFrame(expected_output_data, schema)

    # Apply the extract_outliers function
    result_df = extract_outliers(test_df)

    # Use an assert function to compare the dataframes
    assert_dataframes_equal(result_df, expected_output_df)

In [7]:
test_extract_rows_with_null_data()



In [8]:
test_extract_outliers()

Gold Functions

In [39]:
def test_generate_summary_statistics_gold_df():
    """
    Test function to validate our generate_summary_statistics_gold_df function.
    """

    # Define our schema
    schema = StructType([
        StructField("timestamp", TimestampType(), True),
        StructField("turbine_id", IntegerType(), True),
        StructField("wind_speed", DoubleType(), True),
        StructField("wind_direction", IntegerType(), True),
        StructField("power_output", DoubleType(), True)
    ])

    # Define some input data for testing
    sample_input_data = [
        (datetime.strptime('2022-03-01 00:00:00', '%Y-%m-%d %H:%M:%S'), 1, 5.0, 100, 2.5),
        (datetime.strptime('2022-03-01 01:00:00', '%Y-%m-%d %H:%M:%S'), 1, 6.0, 110, 3.0),
        (datetime.strptime('2022-03-01 00:00:00', '%Y-%m-%d %H:%M:%S'), 2, 7.0, 120, 4.5),
        (datetime.strptime('2022-03-02 00:00:00', '%Y-%m-%d %H:%M:%S'), 1, 5.5, 130, 2.7),
        (datetime.strptime('2022-03-02 01:00:00', '%Y-%m-%d %H:%M:%S'), 1, 6.5, 140, 3.2)
    ]

    # Define schema for expected output data
    expected_schema = StructType([
        StructField("turbine_id", IntegerType(), True),
        StructField("date", DateType(), True),
        StructField("min_power_output", DoubleType(), True),
        StructField("max_power_output", DoubleType(), True),
        StructField("avg_power_output", DoubleType(), True)
    ])

    # Expected summary statistics
    expected_output_data = [
        (1, datetime.strptime('2022-03-01', '%Y-%m-%d').date(), 2.5, 3.0, 2.75), # turbine 1 on 2022-03-01
        (1, datetime.strptime('2022-03-02', '%Y-%m-%d').date(), 2.7, 3.2, 2.95), # turbine 1 on 2022-03-02
        (2, datetime.strptime('2022-03-01', '%Y-%m-%d').date(), 4.5, 4.5, 4.5)  # turbine 2 on 2022-03-01
    ]

    # Create DataFrames
    test_df = spark.createDataFrame(sample_input_data, schema)
    expected_output_df = spark.createDataFrame(expected_output_data, schema=expected_schema)

    # Apply the generate_summary_statistics_gold_df function
    result_df = generate_summary_statistics_gold_df(test_df)

    # Use an assert function to compare the dataframes
    assert_dataframes_equal(result_df, expected_output_df)

In [30]:
def test_generate_anomalies_gold_df():
    """
    Test function to validate our generate_anomalies_gold_df function.
    """

    # Define the input schema
    schema = StructType([
        StructField("timestamp", TimestampType(), True),
        StructField("turbine_id", IntegerType(), True),
        StructField("power_output", DoubleType(), True)
    ])

    # Define some input data for testing, including normal and anomalous values
    sample_input_data = [
        (datetime.strptime('2022-03-01 00:00:00', '%Y-%m-%d %H:%M:%S'), 1, 2.5),
        (datetime.strptime('2022-03-01 01:00:00', '%Y-%m-%d %H:%M:%S'), 1, 2.7),
        (datetime.strptime('2022-03-01 02:00:00', '%Y-%m-%d %H:%M:%S'), 1, 2.7),
        (datetime.strptime('2022-03-01 03:00:00', '%Y-%m-%d %H:%M:%S'), 1, 2.7),
        (datetime.strptime('2022-03-01 04:00:00', '%Y-%m-%d %H:%M:%S'), 1, 2.7),
        (datetime.strptime('2022-03-01 05:00:00', '%Y-%m-%d %H:%M:%S'), 1, 2.7),
        (datetime.strptime('2022-03-01 06:00:00', '%Y-%m-%d %H:%M:%S'), 1, 10.4),  # Anomalous high value
        (datetime.strptime('2022-03-01 00:00:00', '%Y-%m-%d %H:%M:%S'), 2, 3.5),
        (datetime.strptime('2022-03-01 01:00:00', '%Y-%m-%d %H:%M:%S'), 2, 3.6)
    ]

    # Define schema for expected output data
    expected_schema = StructType([
        StructField("timestamp", TimestampType(), True),
        StructField("turbine_id", IntegerType(), True),
        StructField("power_output", DoubleType(), True),
        StructField("mean_power_output", DoubleType(), True),
        StructField("stddev_power_output", DoubleType(), True),
        StructField("lower_bound", DoubleType(), True),
        StructField("upper_bound", DoubleType(), True)
    ])

    # Expected output data after identifying anomalies
    # Including only the row identified as an anomaly
    expected_output_data = [
        (datetime.strptime('2022-03-01 06:00:00', '%Y-%m-%d %H:%M:%S'), 1, 10.4, 3.7714285714285714, 2.9238754452007045, -2.0763223189728377, 9.61917946182998)  # Anomaly details
    ]

    # Create DataFrames
    test_df = spark.createDataFrame(sample_input_data, schema)
    expected_output_df = spark.createDataFrame(expected_output_data, schema=expected_schema)

    # Apply the generate_anomalies_gold_df function
    result_df = generate_anomalies_gold_df(test_df)

    # Use an assert function to compare the dataframes
    assert_dataframes_equal(result_df, expected_output_df)

In [31]:
test_generate_anomalies_gold_df()

In [40]:
test_generate_summary_statistics_gold_df()

In [None]:
print("Unit tests completed successfully")