# Great Expectations on Pyspark

Implementing unit tests on Pyspark DataFrames using Great Expections

In [1]:
from pyspark.sql import SparkSession
import pandas as pd

In [2]:
spark = SparkSession.builder.master("local[2]").appName("Great Expectations with Pandas DataFrame").getOrCreate()

23/01/12 09:45:54 WARN Utils: Your hostname, pengfei-Virtual-Machine resolves to a loopback address: 127.0.1.1; using 10.50.2.80 instead (on interface eth0)
23/01/12 09:45:54 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/01/12 09:45:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Load Raw DF

##### Load data from file and create a Pyspark Data Frame View named CAMPAIGNS

In [28]:
raw_df = spark.read.option("header", True).option("inferSchema",True).csv("../data/Kickstarter_projects_Feb19.csv")
raw_df.createOrReplaceTempView("CAMPAIGNS")

                                                                                

In [4]:
# raw_df is a spark dataframe
type(raw_df)

pyspark.sql.dataframe.DataFrame

In [30]:
raw_df.printSchema()

root
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- currency: string (nullable = true)
 |-- main_category: string (nullable = true)
 |-- sub_category: string (nullable = true)
 |-- launched_at: string (nullable = true)
 |-- deadline: string (nullable = true)
 |-- duration: string (nullable = true)
 |-- goal_usd: string (nullable = true)
 |-- city: string (nullable = true)
 |-- state: string (nullable = true)
 |-- country: string (nullable = true)
 |-- blurb_length: string (nullable = true)
 |-- name_length: string (nullable = true)
 |-- status: string (nullable = true)
 |-- start_month: string (nullable = true)
 |-- end_month: string (nullable = true)
 |-- start_Q: string (nullable = true)
 |-- end_Q: string (nullable = true)
 |-- usd_pledged: string (nullable = true)



In [31]:
# we can convert the spark dataframe to pandas dataframe
raw_pdf=raw_df.toPandas()

  if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):
                                                                                

In [32]:
print(raw_pdf.shape)

(192548, 20)


In [33]:
raw_pdf.head(5)

Unnamed: 0,id,name,currency,main_category,sub_category,launched_at,deadline,duration,goal_usd,city,state,country,blurb_length,name_length,status,start_month,end_month,start_Q,end_Q,usd_pledged
0,1687733153,Socks of Speed and Socks of Elvenkind,USD,games,Tabletop Games,2018-10-30 20:00:02,2018-11-15 17:59:00,16.0,2000.0,Menasha,WI,US,14,7,successful,10,11,Q4,Q4,6061.0
1,227936657,Power Punch Boot Camp: An All-Ages Graphic Novel,GBP,comics,Comic Books,2018-08-06 10:00:43,2018-09-05 10:00:43,30.0,3870.99771,Shepperton,England,GB,24,8,successful,8,9,Q3,Q3,3914.505120400001
2,454186436,"""Live Printing with SX8: """"Squeegee Pulp Up""""""",USD,fashion,Apparel,2017-06-09 15:41:03,2017-07-09 15:41:03,30.0,1100.0,Manhattan,NY,US,21,7,successful,6,7,Q2,Q3,1110.0
3,629469071,Lost Dog Street Band's Next Album,USD,music,Country & Folk,2014-09-25 18:46:01,2014-11-10 06:00:00,45.0,3500.0,Nashville,TN,US,15,6,successful,9,11,Q3,Q4,4807.0
4,183973060,"Qto-X, a Tiny Lantern",USD,technology,Gadgets,2016-11-28 16:35:11,2017-01-27 16:35:11,60.0,30000.0,Troy,MI,US,15,4,successful,11,1,Q4,Q1,40368.0


## Step1: Unit Tests on Raw Data

To use GreatExpectation **expectations/validators** on dataframe, we need to first convert the `spark dataframe` to `ge SparkDFDataset`. You can find a full list of available `expectations` via this [page](https://greatexpectations.io/expectations/)

In [9]:
from great_expectations.dataset import SparkDFDataset

In [34]:
# convert spark dataframe to ge sparkdf
print(f"before conversion {type(raw_df)}")
raw_test_df = SparkDFDataset(raw_df)
print(f"after conversion {type(raw_test_df)}")

before conversion <class 'pyspark.sql.dataframe.DataFrame'>
after conversion <class 'great_expectations.dataset.sparkdf_dataset.SparkDFDataset'>


### 1.1 Test 1: Check if mandatory columns exist

Here we use a `expectation/validation` function called `expect_column_to_exist`. It takes a column name and returns a dictionary with various attributes. Below is an example

In [11]:
# test with a valid column name
result=raw_test_df.expect_column_to_exist("id")
print(f"result for : {result}")

result for : {
  "expectation_config": {
    "meta": {},
    "expectation_type": "expect_column_to_exist",
    "kwargs": {
      "column": "id",
      "result_format": "BASIC"
    }
  },
  "result": {},
  "meta": {},
  "success": true,
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}


In [12]:
# test with an invalid column name
result=raw_test_df.expect_column_to_exist("toto")
print(f"result for : {result}")

result for : {
  "expectation_config": {
    "meta": {},
    "expectation_type": "expect_column_to_exist",
    "kwargs": {
      "column": "toto",
      "result_format": "BASIC"
    }
  },
  "result": {},
  "meta": {},
  "success": false,
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}


You can notice the first part of the dictionary describes the expectation (validation rule) and args. The second part is the result, in this example, there is nothing because this rule is very simple, no need to give extra information to understand the result. The most important attribute is **success** (bool), which tells us if the expectation is passed or not. In our example, it has value true when passes, otherwise it has value false. For more information, you can visit this [page](https://docs.greatexpectations.io/docs/terms/validation_result/)

We can use this `expectation` to test if a dataframe contains a list of mandatory columns or not


In [15]:
MANDATORY_COLUMNS = [
  "id",
  "currency",
  "main_category",
  "launched_at",
  "deadline",
  "country",
  "status",
  "usd_pledged"  
]

In [16]:
for column in MANDATORY_COLUMNS:
    try:
        assert raw_test_df.expect_column_to_exist(column).success, f"Uh oh! Mandatory column {column} does not exist: FAILED"
        print(f"Column {column} exists : PASSED")
    except AssertionError as e:
        print(e)

Column id exists : PASSED
Column currency exists : PASSED
Column main_category exists : PASSED
Column launched_at exists : PASSED
Column deadline exists : PASSED
Column country exists : PASSED
Column status exists : PASSED
Column usd_pledged exists : PASSED


To make a reusable, we can define a function


In [19]:
from typing import List


def expect_df_to_contain_columns(df:SparkDFDataset,colList: List[str]):
    badColList=[]
    for column in colList:
        if not df.expect_column_to_exist(column).success:
            badColList.append(column)
    if len(badColList)>0:
        return False,badColList
    else:
        return True,badColList



In [20]:
status,output=expect_df_to_contain_columns(raw_test_df,
                             MANDATORY_COLUMNS)

print(f"Expectation status: {status}, output: {output}")

Expectation status: True, output: []


In [21]:
status,output=expect_df_to_contain_columns(raw_test_df,["toto","titi","tata"])

print(f"Expectation status: {status}, output: {output}")

Expectation status: False, output: ['toto', 'titi', 'tata']


### 1.2 Test 2: Check if column value has specific type

Here, we have two possible function which we can use:
- expect_column_values_to_be_of_type(colName, type)
- expect_column_values_to_be_in_type_list

For more information, please visit this [page](https://greatexpectations.io/expectations/expect_column_values_to_be_of_type)

In [35]:
from pyspark.sql.types import IntegerType
# test with a valid type
result=raw_test_df.expect_column_values_to_be_of_type("id","IntegerType")
print(f"result content :\n {result}")

result for : {
  "expectation_config": {
    "meta": {},
    "expectation_type": "expect_column_values_to_be_of_type",
    "kwargs": {
      "column": "id",
      "type_": "IntegerType",
      "result_format": "BASIC"
    }
  },
  "result": {
    "observed_value": "IntegerType"
  },
  "meta": {},
  "success": true,
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}


In [36]:
# test with an invalid type
result=raw_test_df.expect_column_values_to_be_of_type("id","StringType")
print(f"result for : {result}")

result for : {
  "expectation_config": {
    "meta": {},
    "expectation_type": "expect_column_values_to_be_of_type",
    "kwargs": {
      "column": "id",
      "type_": "StringType",
      "result_format": "BASIC"
    }
  },
  "result": {
    "observed_value": "IntegerType"
  },
  "meta": {},
  "success": false,
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}


With the above example, we can check if the column of the dataframe has certain type. Note if the data source is semi-structural, the column type is probably inferred by spark. So you may need to do give a schema when you create the data frame.

And this time the result has an attribute called **observed_value**, this value is from the dataframe current schema.

### 1.3 Test 3: Check if mandatory columns contains null rows

We can use **expect_column_values_to_not_be_null**

In [37]:
# test with column id
result=raw_test_df.expect_column_values_to_not_be_null("id")
print(f"result content :\n {result}")

                                                                                

result content :
 {
  "expectation_config": {
    "meta": {},
    "expectation_type": "expect_column_values_to_not_be_null",
    "kwargs": {
      "column": "id",
      "result_format": "BASIC"
    }
  },
  "result": {
    "element_count": 192548,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "partial_unexpected_list": []
  },
  "meta": {},
  "success": true,
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}


In [38]:
# test with column main_category
result=raw_test_df.expect_column_values_to_not_be_null("main_category")
print(f"result content :\n {result}")

result content :
 {
  "expectation_config": {
    "meta": {},
    "expectation_type": "expect_column_values_to_not_be_null",
    "kwargs": {
      "column": "main_category",
      "result_format": "BASIC"
    }
  },
  "result": {
    "element_count": 192548,
    "unexpected_count": 1,
    "unexpected_percent": 0.0005193510189666992,
    "unexpected_percent_total": 0.0005193510189666992,
    "partial_unexpected_list": [
      null
    ]
  },
  "meta": {},
  "success": false,
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}


The result contains much useful information:

```text
"result": {
    "element_count": 192548, # total row
    "unexpected_count": 1, # null row
    "unexpected_percent": 0.0005193510189666992,
    "unexpected_percent_total": 0.0005193510189666992,
    "partial_unexpected_list": [
      null
    ]
  }

```


In [39]:
from pyspark.sql.utils import AnalysisException

for column in MANDATORY_COLUMNS:
    try:
        test_result = raw_test_df.expect_column_values_to_not_be_null(column)
        assert test_result.success, \
            f"Uh oh! {test_result.result['unexpected_count']} of {test_result.result['element_count']} items in column {column} are null: FAILED"
        print(f"All items in column {column} are not null: PASSED")
    except AssertionError as e:
        print(e) 
    except AnalysisException as e:
        print(e)

All items in column id are not null: PASSED
All items in column currency are not null: PASSED
Uh oh! 1 of 192548 items in column main_category are null: FAILED
Uh oh! 1 of 192548 items in column launched_at are null: FAILED
Uh oh! 1 of 192548 items in column deadline are null: FAILED
Uh oh! 1 of 192548 items in column country are null: FAILED
Uh oh! 1 of 192548 items in column status are null: FAILED
Uh oh! 1 of 192548 items in column usd_pledged are null: FAILED


### 1.4 Test 4: Check if launched_at column is a valid datetime format

In [41]:
# test with column id
result=raw_test_df.expect_column_values_to_match_strftime_format('launched_at','%Y-%m-%d %H:%M:%S')
print(f"result content :\n {result}")

[Stage 102:>                                                        (0 + 1) / 1]

result content :
 {
  "expectation_config": {
    "meta": {},
    "expectation_type": "expect_column_values_to_match_strftime_format",
    "kwargs": {
      "column": "launched_at",
      "strftime_format": "%Y-%m-%d %H:%M:%S",
      "result_format": "BASIC"
    }
  },
  "result": {
    "element_count": 192548,
    "missing_count": 1,
    "missing_percent": 0.0005193510189666992,
    "unexpected_count": 562,
    "unexpected_percent": 0.2918767885243603,
    "unexpected_percent_total": 0.2918752726592849,
    "unexpected_percent_nonmissing": 0.2918767885243603,
    "partial_unexpected_list": [
      "Hip-Hop",
      "Rock",
      "Webseries",
      "Musical",
      "Kids",
      "film & video",
      "Painting",
      "Webseries",
      "Comedy",
      "World Music",
      "Children's Books",
      "Indie Rock",
      "Documentary",
      "Cookbooks",
      "Country & Folk",
      "Hip-Hop",
      "Jazz",
      "Shorts",
      "Classical Music",
      "Art Books"
    ]
  },
  "meta": {}

                                                                                

In [47]:
from pyspark.sql.functions import col

raw_df.groupby(col("launched_at")).count().orderBy(col("count")).filter(col("count")>2).show(20)

+-------------------+-----+
|        launched_at|count|
+-------------------+-----+
|2018-10-10 14:58:06|    3|
|             People|    3|
|2018-06-12 16:04:59|    3|
|2017-11-15 23:29:31|    3|
|               Kids|    3|
|2018-11-26 17:30:30|    3|
|2017-03-15 00:15:53|    3|
|        Young Adult|    3|
|2018-01-31 18:00:33|    3|
|              Music|    3|
|           Textiles|    3|
|2014-11-21 18:01:56|    3|
|              games|    3|
|2018-10-02 11:05:11|    3|
|2016-09-06 19:49:38|    3|
|2018-03-27 15:01:06|    3|
|2018-02-01 22:34:51|    3|
|               food|    3|
|2017-07-01 20:27:21|    3|
|           Academic|    3|
+-------------------+-----+
only showing top 20 rows



With above example, we are sure someone messed with the column, the result section are very useful

```text
"result": {
    "element_count": 192548, # all rows
    "missing_count": 1,  # null rows
    "missing_percent": 0.0005193510189666992,
    "unexpected_count": 562, # row that does not match the format
    "unexpected_percent": 0.2918767885243603,
    "unexpected_percent_total": 0.2918752726592849,
    "unexpected_percent_nonmissing": 0.2918767885243603,
    "partial_unexpected_list": [  # row values that does not match
      "Hip-Hop",
      "Rock",
      "Webseries",
      "Musical",
      "Kids",
      "film & video",
      ...]
```

In [40]:
test_result =  raw_test_df.expect_column_values_to_match_strftime_format('launched_at','%Y-%m-%d %H:%M:%S')
f"""{round(test_result.result['unexpected_percent'], 2)}% is not a valid date time format"""

                                                                                

'0.29% is not a valid date time format'

### 1.5 Test 5: Check if deadline is a valid datetime format

Same for dealine column

In [48]:
test_result =  raw_test_df.expect_column_values_to_match_strftime_format('deadline','%Y-%m-%d %H:%M:%S')
f"""{round(test_result.result['unexpected_percent'], 2)}% is not a valid date time format"""

                                                                                

'0.04% is not a valid date time format'

### 1.6 Test 6: Check if id is unique

Some column values must be unique. We can use the function **expect_column_values_to_be_unique**

In [49]:
# test with column id
result=raw_test_df.expect_column_values_to_be_unique('id')
print(f"result content :\n {result}")

                                                                                

result content :
 {
  "expectation_config": {
    "meta": {},
    "expectation_type": "expect_column_values_to_be_unique",
    "kwargs": {
      "column": "id",
      "result_format": "BASIC"
    }
  },
  "result": {
    "element_count": 192548,
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_count": 48162,
    "unexpected_percent": 25.01298377547417,
    "unexpected_percent_total": 25.01298377547417,
    "unexpected_percent_nonmissing": 25.01298377547417,
    "partial_unexpected_list": [
      39036,
      39036,
      39235,
      39235,
      50419,
      50419,
      188790,
      188790,
      342881,
      342881,
      358771,
      358771,
      377517,
      377517,
      390870,
      390870,
      442565,
      442565,
      538372,
      538372
    ]
  },
  "meta": {},
  "success": false,
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}


With the above example, we can have the following output, we can detect all duplicated row values in column id

```text
'element_count': 192548,
'missing_count': 0,
'missing_percent': 0.0,
'unexpected_count': 48162,
'unexpected_percent': 25.01298377547417,
'unexpected_percent_total': 25.01298377547417,
'unexpected_percent_nonmissing': 25.01298377547417,
'partial_unexpected_list': [39036, 39036, 39235, 39235, 50419, 50419, 188790, 188790, 342881, 342881, 358771, 358771, 377517, 377517, 390870, 390870, 442565, 442565, 538372, 538372]
```

In [51]:
print(result.result)

{'element_count': 192548, 'missing_count': 0, 'missing_percent': 0.0, 'unexpected_count': 48162, 'unexpected_percent': 25.01298377547417, 'unexpected_percent_total': 25.01298377547417, 'unexpected_percent_nonmissing': 25.01298377547417, 'partial_unexpected_list': [39036, 39036, 39235, 39235, 50419, 50419, 188790, 188790, 342881, 342881, 358771, 358771, 377517, 377517, 390870, 390870, 442565, 442565, 538372, 538372]}


In [14]:
test_result = raw_test_df.expect_column_values_to_be_unique("id")
failed_msg = " ".join([f"""Uh oh!""",
              f"""{test_result.result['unexpected_count']} of {test_result.result['element_count']} items""",
              f"""or {round(test_result.result['unexpected_percent'],2)}% are not unique: FAILED"""])
print(f"""{'Column id is unique: PASSED' if test_result.success else failed_msg}""")

                                                                                

Uh oh! 48162 of 192548 items or 25.01% are not unique: FAILED


## 2 Step2 : Filter Data

### Business Rules

1. Should filter campaigns in country "US" only
2. Should filter campaigns that were active in 2017 and 2018 only based on "launch_at" and "deadline".
3. Include only the below categories:
    - art
    - publishing
    - film & video
    - technology
    - journalism
    - food
    - dance
    - photography
    - games
    - crafts
    - music
    - comics
    - theater
    - design
4. Should include successful campaigns only
5. Should include USD currency only

### Transforming Data

In [53]:
MAIN_CATEGORIES = [
    'art',
    'publishing',
    'film & video',
    'technology',
    'journalism',
    'food',
    'dance',
    'photography',
    'games',
    'crafts',
    'music',
    'comics',
    'theater',
    'design'    
]
ASSESSMENT_YEAR = ['2017','2018']
COUNTRY = 'US'
CURRENCY = 'USD'

#### Generate a Reference Data for Assessment Year

In [54]:
assessment_year_reference = {
    'assessment_year': ['2017', '2018'], 
    'period_start_dt': ['2016-07-01', '2017-07-01'],
    'period_end_dt': ['2017-06-30', '2018-06-30'],
}
ay_df = pd.DataFrame(data=assessment_year_reference)
ay_df

Unnamed: 0,assessment_year,period_start_dt,period_end_dt
0,2017,2016-07-01,2017-06-30
1,2018,2017-07-01,2018-06-30


#### Convert to a Pyspark DataFrame to be able to join to CAMPAIGNS

In [55]:
spark_ay_df = spark.createDataFrame(ay_df) 
spark_ay_df.createOrReplaceTempView("assessment_year_ref")
type(spark_ay_df)

  if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):


pyspark.sql.dataframe.DataFrame

#### Apply Transformation and create a view named FILTERED_CAMPAIGNS

In [56]:
filtered_df = spark.sql(f"""
    SELECT id,
           name,
           currency,
           main_category,
           launched_at,
           deadline,
           goal_usd,
           country,
           usd_pledged,
           status,
           assessment_year
    FROM (SELECT t.*,
               ay.assessment_year,
               row_number() OVER (
                   PARTITION BY t.id
                   ORDER BY t.launched_at, 
                            ay.assessment_year DESC) row_no
          FROM CAMPAIGNS t
          INNER JOIN assessment_year_ref ay
              ON TO_DATE(t.launched_at) <= ay.period_end_dt 
              AND t.deadline > ay.period_start_dt
          WHERE country = '{COUNTRY}'
          AND status = 'successful'
          AND main_category IN ('{"','".join(MAIN_CATEGORIES)}')
          AND ay.assessment_year IN ('{"','".join(ASSESSMENT_YEAR)}')
          AND currency = '{CURRENCY}'
   ) WHERE row_no = 1 
    """)

In [57]:
filtered_df.createOrReplaceTempView("FILTERED_CAMPAIGNS")

In [58]:
filtered_df.toPandas()

  if LooseVersion(pandas.__version__) < LooseVersion(minimum_pandas_version):
                                                                                

Unnamed: 0,id,name,currency,main_category,launched_at,deadline,goal_usd,country,usd_pledged,status,assessment_year
0,50419,Graphic design tools for award-winning designs,USD,design,2018-05-02 18:22:31,2018-06-01 18:22:31,1250.0,US,2985.0,successful,2018
1,303187,Paleocene #1 (Comic Book),USD,comics,2017-05-08 07:04:46,2017-06-07 07:04:46,400.0,US,1661.52,successful,2017
2,821031,T21 Blaster : A Star Wars project,USD,art,2017-08-30 18:10:39,2017-10-29 18:10:39,300.0,US,301.0,successful,2018
3,1430657,Hobnobbin' with Slim Man,USD,journalism,2017-05-11 00:02:44,2017-06-10 00:02:44,3000.0,US,6813.0,successful,2017
4,1694298,Vampire Scarlett Playing Cards Deck-NSFW,USD,games,2018-06-05 15:15:18,2018-06-12 15:15:18,99.0,US,653.0,successful,2018
...,...,...,...,...,...,...,...,...,...,...,...
17851,2146763433,SAYER - Season 4 of the Narrative Science Fict...,USD,publishing,2017-01-10 21:56:15,2017-02-09 21:56:15,4000.0,US,17048.0,successful,2017
17852,2146764201,Row,USD,film & video,2016-07-11 19:52:31,2016-08-10 19:52:31,5750.0,US,5862.42,successful,2017
17853,2146831280,KicoBox- A Robot like RubikÃ¢ÂÂs Cube to Tea...,USD,technology,2018-03-01 15:12:02,2018-03-27 14:12:02,10000.0,US,12914.0,successful,2018
17854,2147144291,Jubilee Espresso Rub,USD,food,2017-06-15 18:19:18,2017-06-30 18:19:18,1000.0,US,1642.0,successful,2017


### Valid filtered Data

#### Create a SparkDFDataset instance of filtered_df

In [59]:
filtered_test_df = SparkDFDataset(filtered_df)

#### 2.1 Test 1: Check if filter_df main_category within scope

Here we use expect_column_values_to_be_in_set(colName, expected_value_list)

The result will contain values which are not in the list

```text
"result": {
    "element_count": 17856,
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0,
    "partial_unexpected_list": []
  },
```

In [60]:
result = filtered_test_df.expect_column_values_to_be_in_set("main_category", MAIN_CATEGORIES)
print(f"result content :\n {result}")

                                                                                

result content :
 {
  "expectation_config": {
    "meta": {},
    "expectation_type": "expect_column_values_to_be_in_set",
    "kwargs": {
      "column": "main_category",
      "value_set": [
        "art",
        "publishing",
        "film & video",
        "technology",
        "journalism",
        "food",
        "dance",
        "photography",
        "games",
        "crafts",
        "music",
        "comics",
        "theater",
        "design"
      ],
      "result_format": "BASIC"
    }
  },
  "result": {
    "element_count": 17856,
    "missing_count": 0,
    "missing_percent": 0.0,
    "unexpected_count": 0,
    "unexpected_percent": 0.0,
    "unexpected_percent_total": 0.0,
    "unexpected_percent_nonmissing": 0.0,
    "partial_unexpected_list": []
  },
  "meta": {},
  "success": true,
  "exception_info": {
    "raised_exception": false,
    "exception_traceback": null,
    "exception_message": null
  }
}


In [22]:
result = filtered_test_df.expect_column_values_to_be_in_set("main_category", MAIN_CATEGORIES)
print(f"""Categories are within scope: {'PASSED' if result.success else 'FAILED'}""")

                                                                                

Categories are within scope: PASSED


#### 2.2 Test 2: Check if country is equal to "US"

In [23]:
test_result = filtered_test_df.expect_column_values_to_be_in_set("country", ["US"])
print(f"""All campaigns are done in the country of USA: {'PASSED' if test_result.success else 'FAILED'}""")



All campaigns are done in the country of USA: PASSED


                                                                                

#### 2.3 Test 3: Check if status = 'success'

In [24]:
test_result = filtered_test_df.expect_column_values_to_be_in_set("status", ["successful"])
print(f"""All campaigns are successful: {'PASSED' if test_result.success else 'FAILED'}""")

                                                                                

All campaigns are successful: PASSED


#### 2.4 Test 4: Check if currency = 'USD'

In [25]:
test_result = filtered_test_df.expect_column_values_to_be_in_set("currency", ["USD"])
print(f"""All campaigns are successful: {'PASSED' if test_result.success else 'FAILED'}""")



All campaigns are successful: PASSED


                                                                                

#### 2.5 Test 5: Check if mandatory columns are not null

In [26]:
for column in MANDATORY_COLUMNS:
    try:
        test_result = filtered_test_df.expect_column_values_to_not_be_null(column)
        assert test_result.success, f"Uh oh! {test_result.result['unexpected_count']} of {test_result.result['element_count']} items in column {column} are null: FAILED"
        print(f"All items in column {column} are not null: PASSED")
    except AssertionError as e:
        print(e)

                                                                                

All items in column id are not null: PASSED


                                                                                

All items in column currency are not null: PASSED


                                                                                

All items in column main_category are not null: PASSED


                                                                                

All items in column launched_at are not null: PASSED


                                                                                

All items in column deadline are not null: PASSED


                                                                                

All items in column country are not null: PASSED


                                                                                

All items in column status are not null: PASSED




All items in column usd_pledged are not null: PASSED


                                                                                

#### 2.6 Test 6: Check if id is unique in each assessment year

The **expect_compound_columns_to_be_unique** take a list of columns and check if the value combination of the given column are unique. We can use it to detect duplicate rows.

In [27]:
test_result = filtered_test_df.expect_compound_columns_to_be_unique(["id", "assessment_year"])
print(f"""id column is unique for each assessment year: {'PASSED' if test_result.success else 'FAILED'}""")



id column is unique for each assessment year: PASSED


                                                                                

#### 2.7 Test 7: Check if launched_at is a valid datetime format

In [28]:
test_result =  filtered_test_df.expect_column_values_to_match_strftime_format('launched_at','%Y-%m-%d %H:%M:%S')
f"""launched_at column values are compliant to datetime format: {'PASSED' if test_result.success else 'FAILED'}"""

                                                                                

'launched_at column values are compliant to datetime format: PASSED'

#### 2.8 Test 8: Check if deadline is a valid datetime format

In [29]:
test_result =  filtered_test_df.expect_column_values_to_match_strftime_format('deadline','%Y-%m-%d %H:%M:%S')
f"""deadline column values are compliant to datetime format: {'PASSED' if test_result.success else 'FAILED'}"""

                                                                                

'deadline column values are compliant to datetime format: PASSED'

## 3. Standardise Data

### Business Rules
- Reduce metric categories to 6 final categories
- Define Pledge Categorise based on Total Pledge

In [66]:
METRIC_CATEGORIES = [
    'art, crafts, photography & design',
    'dance & theater',
    'music, film & video',
    'comics, publishing & journalism',
    'games & technology',
    'food'  
]

In [67]:
PLEDGE_CATEGORIES = [
    '100 Thousand and under',
    'Between 100 Thousand and 500 Thousand',
    'Between 500 Thousand and 1 Million',
    'Between 1 Million and 5 Million',
    '5 Million and over'
]

### Transforming Data

##### Apply Transformation and create a view named STANDARDISED_CAMPAIGNS

In [61]:
standardised_df = spark.sql(f"""
    SELECT t.*,
         CASE 
             WHEN main_category IN ('art','crafts','photography','design') THEN 'art, crafts, photography & design'
              WHEN main_category IN ('dance','theater') THEN 'dance & theater'
              WHEN main_category IN ('music','film & video') THEN 'music, film & video'
              WHEN main_category IN ('comics','publishing','journalism') THEN 'comics, publishing & journalism'
              WHEN main_category IN ('games', 'technology') THEN 'games & technology'
              WHEN main_category IN ('food') THEN 'food'                  
             END metric_category,
         CASE WHEN usd_pledged <= 100000 THEN '100 Thousand and under'
              WHEN usd_pledged > 100000 AND usd_pledged < 500000 THEN 'Between 100 Thousand and 500 Thousand'
              WHEN usd_pledged > 500000 AND usd_pledged < 1000000 THEN 'Between 500 Thousand and 1 Million'
              WHEN usd_pledged > 1000000 AND usd_pledged < 5000000 THEN 'Between 1 Million and 5 Million'
           ELSE '5 Million and over'
           END pledge_category    
    FROM FILTERED_CAMPAIGNS t       
""")

In [62]:
standardised_df.createOrReplaceTempView("STANDARDISED_CAMPAIGNS")

### Valid Standardised Data

In [64]:
# Create a SparkDFDataset instance of standardised_df
standardised_test_df = SparkDFDataset(standardised_df)

#### 3.1 Test 1: Check if metric_category is within scope

In [68]:
test_result = standardised_test_df.expect_column_values_to_be_in_set("metric_category", METRIC_CATEGORIES)

print(f"""Categories are within scope: {'PASSED' if test_result.success else 'FAILED'}""")



Categories are within scope: PASSED


                                                                                

#### 3.2 Test 2: Check if campaign population is equal to the previous

In [69]:
test_result = standardised_test_df.expect_column_values_to_be_in_set("pledge_category", PLEDGE_CATEGORIES)

print(f"""Categories are within scope: {'PASSED' if test_result.success else 'FAILED'}""")



Categories are within scope: PASSED


                                                                                

#### 3.3 Test 3: Check if campaign population is equal to the previous

In [38]:
filtered_total_rows = filtered_test_df.get_row_count()
test_result = standardised_test_df.expect_table_row_count_to_equal(filtered_total_rows)

print(f"""Total row count of standardised_df ({test_result.result['observed_value']}) \
is equal to total row count of filtered_df ({filtered_total_rows}): \
{'PASSED' if test_result.result['observed_value'] == filtered_total_rows else 'FAILED'}""")

Total row count of standardised_df (17856) is equal to total row count of filtered_df (17856): PASSED


## 4. Generate Metrics

### Business Rules

- Metric #1: Count Number of successful campaigns for each metric category and pledge category per assessment year

### Generate SQL to produce Metrics

In [70]:
successful_campaigns_df = spark.sql(f"""
    SELECT assessment_year,
           metric_category,
           pledge_category,
           count(id) total_successful_campaigns
    FROM STANDARDISED_CAMPAIGNS
    GROUP BY assessment_year,
            metric_category,
            pledge_category
    ORDER BY assessment_year,
            metric_category,
            pledge_category
    """)

In [71]:
successful_campaigns_df.createOrReplaceTempView("SUCCESSFUL_CAMPAIGNS")

## Unit Tests on Metrics

In [72]:
# Create a SparkDFDataset instance of successful_campaigns_df
successful_campaigns_df_test_df = SparkDFDataset(successful_campaigns_df)

#### 4.1 Test 1: Check if metric_category and pledge_category pair is unique for each assessment year

In [73]:
test_result = successful_campaigns_df_test_df.expect_compound_columns_to_be_unique(["assessment_year","metric_category","pledge_category"])

print(f"""metric_category column is unique for each assessment year: {'PASSED' if test_result.success else 'FAILED'}""")

                                                                                

metric_category column is unique for each assessment year: PASSED


#### 4.2 Test 2: Check if sum total of campaigns in metrics dataset is equal to total campaigns in standardised dataset

In [74]:
standardised_total_rows = standardised_test_df.get_row_count()
test_result = successful_campaigns_df_test_df.expect_column_sum_to_be_between('total_successful_campaigns', 
                                                                              standardised_total_rows, standardised_total_rows)

print(f"""Total sum of campaigns in metrics dataset ({test_result.result['observed_value']}) \
is equal to total rows in standardised dataset ({standardised_total_rows}): \
{'PASSED' if test_result.result['observed_value'] ==  standardised_total_rows else 'FAILED'}""")



Total sum of campaigns in metrics dataset (17856) is equal to total rows in standardised dataset (17856): PASSED


                                                                                

## Integration Tests

When we call an expectation function on a great expectation spark dataframe, the expectation is registered to the dataframe. When you call dataframe.validate() against the Great Expectations dataset (SparkDFDataSet). All registered expectation will be evaluated, and you will get an overall result.



check below example

In [75]:
raw_test_df_validation = raw_test_df.validate()
print(f"""1. Raw dataset validations: {raw_test_df_validation} """)

                                                                                

1. Raw dataset validations: {
  "meta": {
    "great_expectations_version": "0.15.24",
    "expectation_suite_name": "default",
    "run_id": {
      "run_time": "2023-01-12T11:14:36.408979+00:00",
      "run_name": null
    },
    "batch_kwargs": {
      "ge_batch_id": "a06ca0fe-925e-11ed-a5da-f5d72802541e"
    },
    "batch_markers": {},
    "batch_parameters": {},
    "validation_time": "20230112T111436.408886Z",
    "expectation_suite_meta": {
      "great_expectations_version": "0.15.24"
    }
  },
  "success": false,
  "statistics": {
    "evaluated_expectations": 12,
    "successful_expectations": 2,
    "unsuccessful_expectations": 10,
    "success_percent": 16.666666666666664
  },
  "results": [
    {
      "expectation_config": {
        "meta": {},
        "expectation_type": "expect_column_values_to_be_of_type",
        "kwargs": {
          "column": "id",
          "type_": "StringType",
          "result_format": "BASIC"
        }
      },
      "result": {
        "obse

In [45]:
raw_test_df_validation = raw_test_df.validate()
print(f"""1. Raw dataset validations: {raw_test_df_validation.success}; {raw_test_df_validation.statistics['success_percent']} successful""")
filtered_test_df_validation = filtered_test_df.validate()
print(f"""2. Filtered dataset validations: {filtered_test_df_validation.success}; {filtered_test_df_validation.statistics['success_percent']} successful""")
standardised_test_df_validation = standardised_test_df.validate()
print(f"""3. Standardised dataset validations: {standardised_test_df_validation.success}; {standardised_test_df_validation.statistics['success_percent']} successful""")
successful_campaigns_df_test_df_validation = successful_campaigns_df_test_df.validate()
print(f"""4. Metrics dataset validations: {successful_campaigns_df_test_df_validation.success}; {successful_campaigns_df_test_df_validation.statistics['success_percent']} successful""")


1. Raw dataset validations: False; 52.63157894736842 successful
2. Filtered dataset validations: True; 100.0 successful
3. Standardised dataset validations: True; 100.0 successful
4. Metrics dataset validations: True; 100.0 successful


## 5. Custom expectation functions

If you can't find an expectation function that fits your needs,you can define your own expectation functions. GE provides several methods for building and deploying custom expectations.

In this tutorial, we only focus on the `expectation decorators`. For more information, you can read this [article](https://docs.greatexpectations.io/docs/guides/expectations/creating_custom_expectations/overview).

There are two major decorators:
- **column_map_expectations**: which apply their condition to each value in a column independently of other values
- **column_aggregate_expectations**: which apply their condition to an aggregate value or values from the column

Major steps on implementing a custom expectation function:

1. Create a `subclass` from the dataset class of your choice

2. Define custom functions containing your business logic

3. Use the `column_map_expectation` and `column_aggregate_expectation` decorators to turn them into full Expectations. Note that each dataset class implements its own versions of @column_map_expectation and @column_aggregate_expectation, so you should consult the documentation of each class to ensure you are returning the correct information to the decorator.

> You better follow the expectation naming convention.


### Pandas example

```python
from great_expectations.dataset import PandasDataset, MetaPandasDataset

class CustomPandasDataset(PandasDataset):

    _data_asset_type = "CustomPandasDataset"

    @MetaPandasDataset.column_map_expectation
    def expect_column_values_to_equal_2(self, column):
        return column.map(lambda x: x==2)

    @MetaPandasDataset.column_aggregate_expectation
    def expect_column_mode_to_equal_0(self, column):
        mode = self[column].mode[0]
        return {
            "success" : mode == 0,
            "result": {
                "observed_value": mode,
            }
        }

```

### SqlAlchemy example

Below sqlAlchemy
```python
import sqlalchemy as sa
from great_expectations.data_asset import DataAsset
from great_expectations.dataset import SqlAlchemyDataset

import numpy as np
import scipy.stats as stats
import scipy.special as special

if sys.version_info.major >= 3 and sys.version_info.minor >= 5:
    from math import gcd
else:
    from fractions import gcd

class CustomSqlAlchemyDataset(SqlAlchemyDataset):

    _data_asset_type = "CustomSqlAlchemyDataset"

    @DataAsset.expectation(["column_A", "column_B", "p_value", "mode"])
    def expect_column_pair_histogram_ks_2samp_test_p_value_to_be_greater_than(
            self,
            column_A,
            column_B,
            p_value=0.05,
            mode='auto'
    ):
        """Execute the two sample KS test on two columns of data that are expected to be **histograms** with
        aligned values/points on the CDF. ."""
        LARGE_N = 10000  # 'auto' will attempt to be exact if n1,n2 <= LARGE_N

        # We will assume that these are already HISTOGRAMS created as a check_dataset
        # either of binned values or of (ordered) value counts
        rows = sa.select([
            sa.column(column_A).label("col_A_counts"),
            sa.column(column_B).label("col_B_counts")
        ]).select_from(self._table).fetchall()

        cols = [col for col in zip(*rows)]
        cdf1 = np.array(cols[0])
        cdf2 = np.array(cols[1])
        n1 = cdf1.sum()
        n2 = cdf2.sum()
        cdf1 = cdf1 / n1
        cdf2 = cdf2 / n2

        # This code is taken verbatim from scipy implementation,
        # skipping the searchsorted (using sqlalchemy check asset as a view)
        # https://github.com/scipy/scipy/blob/v1.3.1/scipy/stats/stats.py#L5385-L5573
        cddiffs = cdf1 - cdf2
        minS = -np.min(cddiffs)
        maxS = np.max(cddiffs)
        alt2Dvalue = {'less': minS, 'greater': maxS, 'two-sided': max(minS, maxS)}
        d = alt2Dvalue[alternative]
        g = gcd(n1, n2)
        n1g = n1 // g
        n2g = n2 // g
        prob = -np.inf
        original_mode = mode
        if mode == 'auto':
            if max(n1, n2) <= LARGE_N:
                mode = 'exact'
            else:
                mode = 'asymp'
        elif mode == 'exact':
            # If lcm(n1, n2) is too big, switch from exact to asymp
            if n1g >= np.iinfo(np.int).max / n2g:
                mode = 'asymp'
                warnings.warn(
                    "Exact ks_2samp calculation not possible with samples sizes "
                    "%d and %d. Switching to 'asymp' " % (n1, n2), RuntimeWarning)

        saw_fp_error = False
        if mode == 'exact':
            lcm = (n1 // g) * n2
            h = int(np.round(d * lcm))
            d = h * 1.0 / lcm
            if h == 0:
                prob = 1.0
            else:
                try:
                    if alternative == 'two-sided':
                        if n1 == n2:
                            prob = stats._compute_prob_outside_square(n1, h)
                        else:
                            prob = 1 - stats._compute_prob_inside_method(n1, n2, g, h)
                    else:
                        if n1 == n2:
                            # prob = binom(2n, n-h) / binom(2n, n)
                            # Evaluating in that form incurs roundoff errors
                            # from special.binom. Instead calculate directly
                            prob = 1.0
                            for j in range(h):
                                prob = (n1 - j) * prob / (n1 + j + 1)
                        else:
                            num_paths = stats._count_paths_outside_method(n1, n2, g, h)
                            bin = special.binom(n1 + n2, n1)
                            if not np.isfinite(bin) or not np.isfinite(num_paths) or num_paths > bin:
                                raise FloatingPointError()
                            prob = num_paths / bin

                except FloatingPointError:
                    # Switch mode
                    mode = 'asymp'
                    saw_fp_error = True
                    # Can't raise warning here, inside the try
                finally:
                    if saw_fp_error:
                        if original_mode == 'exact':
                            warnings.warn(
                                "ks_2samp: Exact calculation overflowed. "
                                "Switching to mode=%s" % mode, RuntimeWarning)
                    else:
                        if prob > 1 or prob < 0:
                            mode = 'asymp'
                            if original_mode == 'exact':
                                warnings.warn(
                                    "ks_2samp: Exact calculation incurred large"
                                    " rounding error. Switching to mode=%s" % mode,
                                    RuntimeWarning)

        if mode == 'asymp':
            # The product n1*n2 is large.  Use Smirnov's asymptoptic formula.
            if alternative == 'two-sided':
                en = np.sqrt(n1 * n2 / (n1 + n2))
                # Switch to using kstwo.sf() when it becomes available.
                # prob = distributions.kstwo.sf(d, int(np.round(en)))
                prob = distributions.kstwobign.sf(en * d)
            else:
                m, n = max(n1, n2), min(n1, n2)
                z = np.sqrt(m*n/(m+n)) * d
                # Use Hodges' suggested approximation Eqn 5.3
                expt = -2 * z**2 - 2 * z * (m + 2*n)/np.sqrt(m*n*(m+n))/3.0
                prob = np.exp(expt)

        prob = (0 if prob < 0 else (1 if prob > 1 else prob))

        return {
            "success": prob > p_value,
            "result": {
                "observed_value": prob,
                "details": {
                    "ks_2samp_statistic": d
                }
            }
        }

```

### Spark dataset example

In below example, we create a `CustomSparkDFDataset` which extends `SparkDFDataset`. we add two custom expectation functions with decorator `@MetaSparkDFDataset.column_aggregate_expectation`.

In [45]:
from great_expectations.dataset import MetaSparkDFDataset

from datetime import datetime, timedelta
from dateutil.parser import parse


In [46]:
class CustomSparkDFDataset(SparkDFDataset):
    _data_asset_type = "CustomSparkDFDataset"
    
    @MetaSparkDFDataset.column_aggregate_expectation
    def expect_column_max_to_be_less_than(
        self,
        column,
        value,
        strict=False,
        parse_strings_as_datetimes=False,
        output_strftime_format=None,
        result_format=None,
        include_config=True,
        catch_exceptions=None,
        meta=None,
    ):
        if parse_strings_as_datetimes:
            if value:
                value = parse(value)

        column_max = self.get_column_max(column, parse_strings_as_datetimes)       
        if isinstance(column_max, datetime):
            try:
                value = parse(value)
            except (ValueError, TypeError) as e:
                pass

        success = column_max < value if strict else column_max <= value
        
        if parse_strings_as_datetimes:
            if output_strftime_format:
                column_max = datetime.strftime(column_max, output_strftime_format)
            else:
                column_max = str(column_max)

        return {"success": success, "result": {"observed_value": column_max}}  
    
    @MetaSparkDFDataset.column_aggregate_expectation
    def expect_column_min_to_be_more_than(
        self,
        column,
        value,
        strict=False,
        parse_strings_as_datetimes=False,
        output_strftime_format=None,
        result_format=None,
        include_config=True,
        catch_exceptions=None,
        meta=None,
    ):
        if parse_strings_as_datetimes:
            if value:
                value = parse(value)

        column_min = self.get_column_min(column, parse_strings_as_datetimes)       
        if isinstance(column_min, datetime):
            try:
                value = parse(value)
            except (ValueError, TypeError) as e:
                pass
        
        success = column_min > value if strict else column_min >= value
        
        if parse_strings_as_datetimes:
            if output_strftime_format:
                column_min = datetime.strftime(column_min, output_strftime_format)
            else:
                column_min = str(column_min)

        return {"success": success, "result": {"observed_value": column_min}}      

#### Filtered DF Custom Test 1: Check if earliest launch_at date is not later than the period_end_dt

In [47]:
ay_df

Unnamed: 0,assessment_year,period_start_dt,period_end_dt
0,2017,2016-07-01,2017-06-30
1,2018,2017-07-01,2018-06-30


## Use the custom expectation
To use the custom, we need to first create an instance of the custom dataset instance, then call the custom expectations function of the custom dataset. Below is an example

In [48]:
# create an instance of the customSparkDFDataset
ge_dataset_by_year = {}
for yr in ASSESSMENT_YEAR:
    ge_dataset_by_year[yr] = CustomSparkDFDataset(filtered_df.where(f"assessment_year = {yr}"))
ge_dataset_by_year    

{'2017': <__main__.CustomSparkDFDataset at 0x7f29b858c520>,
 '2018': <__main__.CustomSparkDFDataset at 0x7f29b858c5e0>}

In [49]:
def get_period_end_dt(yr):
    year_filter = ay_df["assessment_year"]==yr
    end_dt = ay_df.loc[year_filter].period_end_dt.item()
    return datetime.strftime(datetime.strptime(end_dt, "%Y-%m-%d") + timedelta(days=1), "%Y-%m-%d")

In [50]:
for yr in ASSESSMENT_YEAR:
    period_end_dt = get_period_end_dt(yr)   
    test_result = ge_dataset_by_year[yr]\
                    .expect_column_max_to_be_less_than("launched_at", 
                                                       period_end_dt, 
                                                       parse_strings_as_datetimes=True, 
                                                       strict=True)
    print(f"""AY {yr} latest launched_at {test_result.result['observed_value']}""", 
          f"""< period_end_dt {period_end_dt}: {'PASSED' if test_result.success else 'FAILED'}""")

                                                                                

AY 2017 latest launched_at 2017-06-21 20:23:32 < period_end_dt 2017-07-01: PASSED




AY 2018 latest launched_at 2018-06-30 23:01:04 < period_end_dt 2018-07-01: PASSED


                                                                                

#### Filtered DF Custom Test 2: Check if earliest campaign deadline is past or falls on the period_start_dt

In [52]:
def get_period_start_dt(yr):
    year_filter = ay_df["assessment_year"]==yr
    return ay_df.loc[year_filter].period_start_dt.item()

In [53]:
for yr in ASSESSMENT_YEAR:
    period_start_dt = get_period_start_dt(yr)    
    test_result = ge_dataset_by_year[yr]\
                    .expect_column_min_to_be_more_than("deadline", 
                                                       period_start_dt, 
                                                       parse_strings_as_datetimes=True)
    print(f"""AY {yr} earliest deadline {test_result.result['observed_value']}""", 
          f""">= period_start_dt {period_start_dt}: {'PASSED' if test_result.success else 'FAILED'}""")   

AY 2017 earliest deadline 2016-07-01 00:00:00 >= period_start_dt 2016-07-01: PASSED
AY 2018 earliest deadline 2017-07-01 00:00:00 >= period_start_dt 2017-07-01: PASSED


### Integration Test

In [54]:
for yr, yearly_df in ge_dataset_by_year.items():
    yearly_df_validation = yearly_df.validate()
    print(f"""Filtered dataset for year {yr} validations: {yearly_df_validation.success}; {yearly_df_validation.statistics['success_percent']} successful""")

Filtered dataset for year 2017 validations: True; 100.0 successful
Filtered dataset for year 2018 validations: True; 100.0 successful
