<a href="https://colab.research.google.com/github/jmasonlee/efficiently_testing_etl_pipelines/blob/main/fixing_a_big_test.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

## STEP 0.A: Setup Notebook

In [43]:
!rm -rf efficiently_testing_etl_pipelines
!git clone https://github.com/jmasonlee/efficiently_testing_etl_pipelines.git
!cp -r /content/efficiently_testing_etl_pipelines/src/ .
!cp -r /content/efficiently_testing_etl_pipelines/tests/ .
!rm -rf efficiently_testing_etl_pipelines
!rm -rf tests/diamond_pricing_test*
!rm -rf tests/test_helpers/*verification_helpers.py
!rm -rf tests/conftest.py
!rm -rf sample_data


Cloning into 'efficiently_testing_etl_pipelines'...
remote: Enumerating objects: 587, done.[K
remote: Counting objects: 100% (228/228), done.[K
remote: Compressing objects: 100% (151/151), done.[K
remote: Total 587 (delta 146), reused 111 (delta 70), pack-reused 359[K
Receiving objects: 100% (587/587), 258.76 KiB | 3.32 MiB/s, done.
Resolving deltas: 100% (352/352), done.


In [44]:
import pyspark
def build_indep_vars(df, independent_vars, categorical_vars=None, keep_intermediate=False, summarizer=True):
    check_input(categorical_vars, df, independent_vars)

    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
    idx = 'index'
    vec = 'vector'
    if categorical_vars:
        string_indexer = [StringIndexer(inputCol=x,
                                        outputCol=f"{x}_{idx}")
                          for x in categorical_vars]

        encoder        = [OneHotEncoder(dropLast=True,
                                        inputCol =f'{x}_{idx}',
                                        outputCol=f'{x}_{vec}')
                          for x in categorical_vars]

        independent_vars = ['{}_vector'.format(x) if x in categorical_vars else x for x in independent_vars]
    else:
        string_indexer, encoder = [], []

    assembler = VectorAssembler(inputCols=independent_vars,
                                outputCol='indep_vars')
    pipeline  = Pipeline(stages=string_indexer+encoder+[assembler])
    model = pipeline.fit(df)
    df = model.transform(df)

    if not keep_intermediate:
        fcols = [c for c in df.columns if f'_{idx}' not in c[-3:] and f'_{vec}' not in c[-7:]]
        df = df[fcols]

    return df


def check_input(categorical_vars, df, independent_vars):
    assert (type(
        df) is pyspark.sql.dataframe.DataFrame), 'pypark_glm: A pySpark dataframe is required as the first argument.'
    assert (type(
        independent_vars) is list), 'pyspark_glm: List of independent variable column names must be the third argument.'
    for iv in independent_vars:
        assert (type(iv) is str), 'pyspark_glm: Independent variables must be column name strings.'
        assert (iv in df.columns), 'pyspark_glm: Independent variable name is not a dataframe column.'
    if categorical_vars:
        for cv in categorical_vars:
            assert (type(cv) is str), 'pyspark_glm: Categorical variables must be column name strings.'
            assert (cv in df.columns), 'pyspark_glm: Categorical variable name is not a dataframe column.'
            assert (cv in independent_vars), 'pyspark_glm: Categorical variables must be independent variables.'


In [45]:
from pyspark.sql import DataFrame, Window, Column
from pyspark.sql.functions import log, when, mean, col

def replace_null(orig: Column, average: Column):
    return when(orig.isNull(), average).otherwise(orig)

def transform(df: DataFrame) -> DataFrame:

    df = df.withColumn('lprice', log('price'))
    window = Window.partitionBy('cut', 'clarity').orderBy('price').rowsBetween(-3, 3)
    moving_avg = mean(df['price']).over(window)
    df = df.withColumn('moving_avg', moving_avg)

    df_new = df.withColumn('price', when(df.price.isNull(), moving_avg).otherwise(df.price))
    df = df[['id', 'carat', 'clarity', 'color', 'price']]
    df = build_indep_vars(df, ['carat', 'clarity', 'color'],
                                      categorical_vars=['clarity', 'color'],
                                      keep_intermediate=False,
                                      summarizer=True)
    return df

# STEP 0.B: Setup Tests

### Install Dependencies

For the exercise, we will need some special dependencies to allow us to run lots of tests in a notebook.

`ipytest` lets us run our tests in a notebook.



In [46]:
!pip install ipytest



ipytest is what allows us to run our tests in a notebook. This next cell is not needed if you are writing tests in a separate pytest file.

In [47]:
import ipytest
ipytest.autoconfig()

We are installing `pyspark` because it doesn't come with the base colab environment

In [48]:
!pip install pyspark



In [49]:
!pip install chispa



## Create a local SparkSession

Normally spark runs on a bunch of executors in the cloud. Since we want our tests to be able to run on a single dev machine, we make a fixture that gives us a local spark context.

In [50]:
import pytest
from _pytest.fixtures import FixtureRequest
from pyspark import SparkConf
from pyspark.sql import SparkSession

@pytest.fixture(scope="session")
def spark(request: FixtureRequest):
    conf = (SparkConf()
        .setMaster("local")
        .setAppName("sample_pyspark_testing_starter"))

    spark = SparkSession \
        .builder \
        .config(conf=conf) \
        .getOrCreate()

    request.addfinalizer(lambda: spark.stop())
    return spark

## Create Helpers

This is a helper function that retrieves our test output from the expected.json file

In [51]:
import json
from typing import List, Dict

from pyspark.pandas import DataFrame


def create_df_from_json(json_file, spark):
    return spark.read.option("multiline", "true").json(json_file)


def data_frame_to_json(df: DataFrame) -> List:
    output = [json.loads(item) for item in df.toJSON().collect()]
    output.sort(key=lambda item: item["id"])
    print(output)
    return output

def expected_json(name: str) -> Dict:
    with open(f"tests/fixtures/{name}") as f:
        return json.loads(f.read())

# STEP 0.C: Run The Test

In [52]:
%%ipytest -qq
from pyspark.sql import SparkSession

def test_will_do_the_right_thing(spark: SparkSession):
    diamonds_df = create_df_from_json("tests/fixtures/diamonds.json", spark)

    actual_df = transform(diamonds_df)
    assert data_frame_to_json(actual_df) == expected_json("expected.json")

[32m.[0m[32m                                                                                            [100%][0m


# Step 1: Setup For the Saff Squeeze

- [ ] Update the test so that it actually fails for the right reasons. Change `"expected.json"` to `"expected_correct.json"`
- [ ] Run the test. It should fail.
- [ ] Duplicate the test you want to change. Copy and paste it in the cell below, so that you have two copies of the same test.
- [ ] Name the new test something useful. Since we want to make sure we are correctly calculating the new price, I am calling my new test `test_imputes_average_price_based_on_color_cut_and_clarity`

Let's get ready to improve the test.

**our bug**: Diamonds of the same cut and clarity are influencing the calculated price of diamonds with a different color. Only diamonds with the same cut, clarity _**and color**_ should be influencing the calculated price for diamonds with a null price.

**expected behaviour**:
An unpriced diamond with cut=Good, color=D and clarity=VVS1 in a dataset with other diamonds of the same cut, clarity and color all priced at 2690.0, will have it's price set to match the average of all the prices for diamonds of the same cut, clarity and color - or 2690.0.



- [ ] Change the test to check for the behaviour we want.
There is a second json file (`expected_correct.json`) where the expected price for the unpriced diamond has been updated to the correct value. Use that file name as the argument passed to `expected_json`
- [ ] Run the test. It should fail.
- [ ] Duplicate the test. Now you should have two copies of the same test.
One copy will stay the same, so we can make sure that nothing is broken. The second copy is what we will change in the next steps.
- [ ] Rename the test.
Pick a name that tells you what behaviour you are verifying with the test you are using for the Saff Squeeze. I chose `test_null_price_is_replaced_based_on_cut_clarity_and_color`, but names are hard. You might be able to think of a better one :-)
- [ ] Run the tests. They should both fail because our diamond is being returned with a price of 2460.0, when we expect 2690.0

In [None]:
%%ipytest -qq
from pyspark.sql import SparkSession

def test_will_do_the_right_thing(spark: SparkSession):
    diamonds_df = create_df_from_json("tests/fixtures/diamonds.json", spark)

    actual_df = transform(diamonds_df)
    assert data_frame_to_json(actual_df) == expected_json("expected.json")

[32m.[0m[32m                                                                                            [100%][0m


# Step 2: Make The Assert Specific

Right now, our test compares everything in the output dataframe to everything in a large json file. That's a lot of rows to compare and the assert is wrong anyways!

Let's make this test assert on the thing we actually care about - the output price of the diamond!

## Let's make our assert specific!
### We can do the next step in one of 3 ways:
#### With Chispa
- [ ] Add these imports to the top of the cell, below the `%%ipytest -qq` line:  
`from chispa import assert_column_equality`  
`from pySpark.sql.functions import lit`
- [ ] Filter the dataframe for the unique id of the diamond we care about:  
`actual_df=actual_df.filter(actual_df.id == 'DI-26-null-price')`
- [ ] Create a new column in our dataframe that contains our expected price:  
`actual_df=actual_df.withColumn('expected_price', lit(2690.0))`
- [ ] Assert the value in the price column matches the value we want:  
`assert_column_equality(actual_df, 'price', 'expected_price')`


In [10]:
%%ipytest -qq
from pyspark.sql import SparkSession


def test_null_price_is_replaced_based_on_cut_clarity_and_color(spark: SparkSession):
    diamonds_df = create_df_from_json("tests/fixtures/diamonds.json", spark)

    actual_df = transform(diamonds_df)


[32m.[0m[32m                                                                                            [100%][0m


### With Pandas
- [ ] import pandas:  
`import pandas as pd`
- [ ] Filter the dataframe for the unique id of the diamond we care about:  
  `actual_df=actual_df.filter(actual_df.id == 'DI-26-null-price')`
- [ ] Create your expected dataframe using Pandas:  
 `expected = pd.DataFrame(({'id': ["DI-26-null-price"], 'price':[2690.0] }))`
- [ ] Select the column you care about:  
  `actual_df=actual_df.select(['id', 'price'])
- [ ] Assert for dataframe equality using pandas:  
  `pd.testing.assert_frame_equal(actual_df, expected)`

In [11]:
%%ipytest -qq
from pyspark.sql import SparkSession


def test_null_price_is_replaced_based_on_cut_clarity_and_color(spark: SparkSession):
    diamonds_df = create_df_from_json("tests/fixtures/diamonds.json", spark)

    actual_df = transform(diamonds_df)


[32m.[0m[32m                                                                                            [100%][0m


### Assert on properties

- [ ] Filter the dataframe for the unique id of the diamond we care about:  
  `actual_df=actual_df.filter(actual_df.id == 'DI-26-null-price')`
- [ ] Convert your dataframe to JSON:  
`actual_df_json = data_frame_to_json(actual_df)`
- [ ] Assert the price property of the first object matches your expected price:  
`assert actual_df_json[0]['price'] == 2690.0`

In [12]:
%%ipytest -qq
from pyspark.sql import SparkSession


def test_null_price_is_replaced_based_on_cut_clarity_and_color(spark: SparkSession):
    diamonds_df = create_df_from_json("tests/fixtures/diamonds.json", spark)

    actual_df = transform(diamonds_df)



[32m.[0m[32m                                                                                            [100%][0m


# Step 3: Reduce Duplicate Coverage and Fix the Bug

Right now, our test is running the entire transform function. Because there are multiple tests in `diamonds.json`, each test is running the same large block of code over and over again.

## STEP 3.A: Prep

### Instructions
Let's get ready to reduce the duplicate coverage.

- [ ] Run the test. It should be failing.
- [ ] Replace the line that calls the transform function with the body of the transform function.  
- [ ] Rename `df` to `actual_df`, except the first place it's used. (Find and Replace is `ctrl-m-h`).  
This line:  
`df = df.withColumn('lprice', log('price'))`  
should become  
`actual_df = diamonds_df.withColumn('lprice', log('price'))`
- [ ] Change your assert code so that it is testing _for_ the bug.
```
    actual_df=actual_df.withColumn('expected_price', lit(2690.0))
```
becomes
```
  actual_df=actual_df.withColumn('expected_price', lit(2460.0))
```
- [ ] Run the test. It should still be failing for the same reasons.
- [ ] Extract your assert code into a one-line helper function:
```
def assert_diamond_has_expected_price(actual_df):
    actual_df=actual_df.filter(actual_df.id == 'DI-26-null-price')
    actual_df=actual_df.withColumn('expected_price', lit(2460.0))
    assert_column_equality(actual_df, 'price', 'expected_price')
```
- [ ] Run the test. It should still be failing for the same reasons.

### Exercise

#### The Code

Execute both of these cells so that they are available in our test cell


#### The `transform` Function

In [16]:
from pyspark.sql import DataFrame, Window, Column
from pyspark.sql.functions import log, when, mean, col

def replace_null(orig: Column, average: Column):
    return when(orig.isNull(), average).otherwise(orig)

def transform(df: DataFrame) -> DataFrame:

    df = df.withColumn('lprice', log('price'))
    window = Window.partitionBy('cut', 'clarity').orderBy('price').rowsBetween(-3, 3)
    moving_avg = mean(df['price']).over(window)
    df = df.withColumn('moving_avg', moving_avg)

    df_new = df.withColumn('price', when(df.price.isNull(), moving_avg).otherwise(df.price))
    df = df[['id', 'carat', 'clarity', 'color', 'price']]
    df = build_indep_vars(df, ['carat', 'clarity', 'color'],
                                      categorical_vars=['clarity', 'color'],
                                      keep_intermediate=False,
                                      summarizer=True)
    return df

#### The Test

In [14]:
%%ipytest -qq
from pyspark.sql import SparkSession

def test_prep_for_linear_regression(spark: SparkSession):
    diamonds_df = create_df_from_json("tests/fixtures/diamonds.json", spark)

    actual_df = transform(diamonds_df)

    actual_df=actual_df.filter(actual_df.id == 'DI-26-null-price')
    actual_df=actual_df.withColumn('expected_price', lit(2690.0))
    assert_column_equality(actual_df, 'price', 'expected_price')

[31mF[0m[31m                                                                                            [100%][0m
[31m[1m_________________________________ test_prep_for_linear_regression __________________________________[0m

spark = <pyspark.sql.session.SparkSession object at 0x7fd38587cb50>

    [94mdef[39;49;00m [92mtest_prep_for_linear_regression[39;49;00m(spark: SparkSession):[90m[39;49;00m
        diamonds_df = create_df_from_json([33m"[39;49;00m[33mtests/fixtures/diamonds.json[39;49;00m[33m"[39;49;00m, spark)[90m[39;49;00m
    [90m[39;49;00m
        actual_df = transform(diamonds_df)[90m[39;49;00m
    [90m[39;49;00m
        actual_df=actual_df.filter(actual_df.id == [33m'[39;49;00m[33mDI-26-null-price[39;49;00m[33m'[39;49;00m)[90m[39;49;00m
        actual_df=actual_df.withColumn([33m'[39;49;00m[33mexpected_price[39;49;00m[33m'[39;49;00m, lit([94m2960.0[39;49;00m))[90m[39;49;00m
>       assert_column_equality(actual_df, [33m'[39;49;0

#### The original test



In [None]:
%%ipytest -qq
from pyspark.sql import SparkSession

def test_will_do_the_right_thing(spark: SparkSession):
    diamonds_df = create_df_from_json("tests/fixtures/diamonds.json", spark)

    actual_df = transform(diamonds_df)
    assert data_frame_to_json(actual_df) == expected_json("expected_correct.json")

## Step 3.B Squeeze the bottom!



### Instructions
**our bug**: Diamonds of the same cut and clarity are influencing the calculated price of diamonds with a different color. Only diamonds with the same cut, clarity and color should be influencing the calculated price for diamonds with a null price.

**Squeeze the bottom until you find the bug**
- [ ] Move your assert up one line at a time.  
- [ ] After each move, run your tests.  
- [ ] If it fails, figure out why it's failing.(You may need to rename columns in the assert)
- [ ]If the test passes, the line wasn't important for the bug you wanted to catch. Delete it.
- [ ] Continue until you find the source of the bug

### Exercise

#### The Code

##### The `transform` Function

In [None]:
from pyspark.sql import DataFrame, Window, Column
from pyspark.sql.functions import log, when, mean, col

def replace_null(orig: Column, average: Column):
    return when(orig.isNull(), average).otherwise(orig)

def transform(df: DataFrame) -> DataFrame:

    df = df.withColumn('lprice', log('price'))
    window = Window.partitionBy('cut', 'clarity').orderBy('price').rowsBetween(-3, 3)
    moving_avg = mean(df['price']).over(window)
    df = df.withColumn('moving_avg', moving_avg)

    df_new = df.withColumn('price', when(df.price.isNull(), moving_avg).otherwise(df.price))
    df = df[['id', 'carat', 'clarity', 'color', 'price']]
    df = build_indep_vars(df, ['carat', 'clarity', 'color'],
                                      categorical_vars=['clarity', 'color'],
                                      keep_intermediate=False,
                                      summarizer=True)
    return df

#### The Test

In [15]:
%%ipytest -qq
from pyspark.sql import SparkSession, DataFrame, Window, Column
from pyspark.sql.functions import lit, log, when, mean, col
from chispa import assert_column_equality

def assert_diamond_has_expected_price(actual_df):
    actual_df=actual_df.filter(actual_df.id == 'DI-26-null-price')
    actual_df=actual_df.withColumn('expected_price', lit(2460.0))
    assert_column_equality(actual_df, 'price', 'expected_price')

def test_prep_for_linear_regression(spark: SparkSession):
    diamonds_df = create_df_from_json("tests/fixtures/diamonds.json", spark)

    actual_df = diamonds_df.withColumn('lprice', log('price'))
    window = Window.partitionBy('cut', 'clarity').orderBy('price').rowsBetween(-3, 3)
    moving_avg = mean(actual_df['price']).over(window)
    actual_df = actual_df.withColumn('moving_avg', moving_avg)

    actual_df = actual_df.withColumn('price', replace_null(col('price'), col('moving_avg')))
    actual_df = actual_df[['id', 'carat', 'clarity', 'color', 'price']]
    actual_df = build_indep_vars(actual_df, ['carat', 'clarity', 'color'],
                                      categorical_vars=['clarity', 'color'],
                                      keep_intermediate=False,
                                      summarizer=True)

    assert_diamond_has_expected_price(actual_df)





[32m.[0m[32m                                                                                            [100%][0m


#### The Original Test

In [None]:
%%ipytest -qq
from pyspark.sql import SparkSession

def test_will_do_the_right_thing(spark: SparkSession):
    diamonds_df = create_df_from_json("tests/fixtures/diamonds.json", spark)

    actual_df = transform(diamonds_df)
    assert data_frame_to_json(actual_df) == expected_json("expected_correct.json")

## Step 3.C Let's fix the bug!


### Instructions

**Our Bug**: Diamonds of the same cut and clarity are influencing the calculated price of diamonds with a different color. Only diamonds with the same cut, clarity and color should be influencing the calculated price for diamonds with a null price.

**Our Desired Behaviour**:  
The input diamond with these properties:
- id: `"DI-26-null-price"`
- cut: `"Good"`
- color: `"D"`
- clarity: `"VVS1"`
- price: `null`

Should be output with a price of `2690.0` - the average price of the other diamonds with `cut="Good"`, `clarity="VVS1"` and `color="D"`

**Fix the bug**
- [ ] Update your test so that it checks for the good behaviour.  
Replace the expected price on this line with `2690.0`:  
```
actual_df=actual_df.withColumn('expected_price', lit(2460.0))
```
- [ ] Run your test. It should fail with a `columnsNotEqualError`:  
```
E           chispa.column_comparer.ColumnsNotEqualError:
E           +------------+----------------+
E           | moving_avg | expected_price |
E           +------------+----------------+
E           |   2460.0   |     2690.0     |
E           +------------+----------------+
```
- [ ] Fix the code _in your test_ so that the bug is gone.
- [ ] Run your test. It should pass.
- [ ] The behaviour belongs to a group of lines working together. Extract them into a method.  
These lines can't be separated:  
```
    window = Window.partitionBy('cut', 'clarity', 'color').orderBy('price').rowsBetween(-3, 3)
    moving_avg = mean(actual_df['price']).over(window)
    actual_df = actual_df.withColumn('moving_avg', moving_avg)
```  
Use them to make a method:
```
def calculate_avg_price_for_similar_diamonds(diamonds_df: DataFrame) -> DataFrame:
      window = Window.partitionBy('cut', 'clarity', 'color').orderBy('price').rowsBetween(-3, 3)
      moving_avg = mean(diamonds_df['price']).over(window)
      diamonds_df = diamonds_df.withColumn('moving_avg', moving_avg)
      return diamonds_df
```
Replace those lines with a call to the new method in your test:  
```
actual_df = calculate_avg_price_for_similar_diamonds(actual_df)
```
- [ ] Move the new method out of your test and into the transform code.
- [ ] Replace the 3 lines in your transform code with the new method call.
- [ ] Run your test. It should pass.
- [ ] Run the copy of your original large test. It should also pass.

### Exercise

#### The Code

##### The `transform` Function

In [None]:
from pyspark.sql import DataFrame, Window, Column
from pyspark.sql.functions import log, when, mean, col

def replace_null(orig: Column, average: Column):
    return when(orig.isNull(), average).otherwise(orig)

def transform(df: DataFrame) -> DataFrame:

    df = df.withColumn('lprice', log('price'))
    window = Window.partitionBy('cut', 'clarity').orderBy('price').rowsBetween(-3, 3)
    moving_avg = mean(df['price']).over(window)
    df = df.withColumn('moving_avg', moving_avg)

    df_new = df.withColumn('price', when(df.price.isNull(), moving_avg).otherwise(df.price))
    df = df[['id', 'carat', 'clarity', 'color', 'price']]
    df = build_indep_vars(df, ['carat', 'clarity', 'color'],
                                      categorical_vars=['clarity', 'color'],
                                      keep_intermediate=False,
                                      summarizer=True)
    return df

#### The Test

In [68]:
%%ipytest -qq
from pyspark.sql import SparkSession, DataFrame, Window, Column
from pyspark.sql.functions import lit, log, when, mean, col
from chispa import assert_column_equality

def assert_diamond_has_expected_price(actual_df):
    actual_df=actual_df.filter(actual_df.id == 'DI-26-null-price')
    actual_df=actual_df.withColumn('expected_price', lit(2460.0))
    assert_column_equality(actual_df, 'moving_avg', 'expected_price')

def test_prep_for_linear_regression(spark: SparkSession):
    diamonds_df = create_df_from_json("tests/fixtures/diamonds.json", spark)

    actual_df = diamonds_df.withColumn('lprice', log('price'))
    window = Window.partitionBy('cut', 'clarity').orderBy('price').rowsBetween(-3, 3)
    moving_avg = mean(actual_df['price']).over(window)
    actual_df = actual_df.withColumn('moving_avg', moving_avg)

    assert_diamond_has_expected_price(actual_df)





[32m.[0m[32m                                                                                            [100%][0m
