In this Notebook we will be doing the functional equivalence testing with the test data that contains the first 3 days data from the uber dataset

In [33]:
import import_ipynb
#import required libraries
from snowflake.snowpark import Session
from snowflake.snowpark.functions import *
from snowflake.snowpark.types import *
from snowflake.snowpark import functions as F
from snowflake.snowpark import Window
import pandas as pd
import numpy as np

Running the snowpark notebook that contains all the functions for the usecases
- NOTE: Change the path of the data file accordingly



In [34]:
from quickstart_snowpark_v7 import *

Function to compre the actual and expected dataframe:
- compares the Number of row count
- compares the Schema
- Row by Row comparison

In [35]:
def compare_row_count(actual_df, expected_df):
#     Check if the number of rows match
    if actual_df.count() != expected_df.count():
        print("Number of rows do not match: actual={}, expected={}".format(
            actual_df.count(), expected_df.count()))
        return False
    return True

In [36]:
def compare_schema(actual_df, expected_df):
 # Check if the column names and types match
    if actual_df.schema != expected_df.schema:
        print("Schema does not match: actual={}, expected={}".format(
            actual_df.schema, expected_df.schema))
        return False
    return True

In [37]:
def compare_dataframes(actual_df, expected_df):
  
    if isinstance(expected_df, int):
        if actual_df == expected_df:
            return True
        else:
            #print("Rows of the data frames do not match")
            return False
    else:
        # Comparing the two DataFrames row by row
        actual_df_collect = actual_df.collect()
        expected_df_collect = expected_df.collect()

        if actual_df_collect == expected_df_collect:

            return True
        else:
            #print("Rows of the data frames do not match")
            return False

The below code does the following :
- The test data is migrated from databricks to test_data table in snowflake.
- Next the test data is read into the dataframe from the table for the further testing.
- Function Data cleaning is present in the snowpark file that does the preliminary data cleaning such as data type changes and column renaming.

In [38]:
#Snowflake connection info is saved in config.py
from config import SNOWFLAKE_CONN_PROFILE
session = Session.builder.configs(SNOWFLAKE_CONN_PROFILE).create()
session.sql("use role Accountadmin").collect()
session.sql("create database if not exists  {}".format(SNOWFLAKE_CONN_PROFILE['database'])).collect()
session.sql("use database {}".format(SNOWFLAKE_CONN_PROFILE['database'])).collect()
session.sql("use schema {}".format(SNOWFLAKE_CONN_PROFILE['schema'])).collect()
session.sql("use warehouse {}".format(SNOWFLAKE_CONN_PROFILE['warehouse']))
print(session.sql('select current_warehouse(), current_database(), current_schema()').collect())

[Row(CURRENT_WAREHOUSE()='COMPUTE_WH', CURRENT_DATABASE()='SNOWPARK', CURRENT_SCHEMA()='DEV')]


We have migrated the testing_data.csv from databricks to test_data table in snowflake, 
we will read the data from test_data table into the dataframe 

In [39]:
test_df = session.sql('select * from test_data')
test_df.schema

StructType([StructField('DATE', StringType(), nullable=True), StructField('"Time (Local)"', LongType(), nullable=True), StructField('EYEBALLS', LongType(), nullable=True), StructField('ZEROES', LongType(), nullable=True), StructField('"Completed Trips"', LongType(), nullable=True), StructField('REQUESTS', LongType(), nullable=True), StructField('"Unique Drivers"', LongType(), nullable=True)])

In [40]:
test_df.show()

----------------------------------------------------------------------------------------------------------
|"DATE"     |"Time (Local)"  |"EYEBALLS"  |"ZEROES"  |"Completed Trips"  |"REQUESTS"  |"Unique Drivers"  |
----------------------------------------------------------------------------------------------------------
|22-Sep-12  |0               |62          |4         |19                 |27          |27                |
|22-Sep-12  |1               |39          |5         |17                 |24          |24                |
|22-Sep-12  |2               |30          |0         |9                  |18          |12                |
|22-Sep-12  |3               |11          |3         |3                  |3           |8                 |
|22-Sep-12  |4               |6           |4         |2                  |3           |1                 |
|22-Sep-12  |5               |5           |4         |1                  |2           |0                 |
|22-Sep-12  |6               |5      

In [41]:
cleaned_df=data_cleaning(test_df)
cleaned_df

Unnamed: 0,DATE,TIME,EYEBALLS,ZEROES,COMPLETED_TRIPS,REQUESTS,UNIQUE_DRIVERS,DATE_TIME
0,2012-09-22,0,62,4,19,27,27,2012-09-22 00:00:00
1,2012-09-22,1,39,5,17,24,24,2012-09-22 01:00:00
2,2012-09-22,2,30,0,9,18,12,2012-09-22 02:00:00
3,2012-09-22,3,11,3,3,3,8,2012-09-22 03:00:00
4,2012-09-22,4,6,4,2,3,1,2012-09-22 04:00:00
5,2012-09-22,5,5,4,1,2,0,2012-09-22 05:00:00
6,2012-09-22,6,5,5,0,1,0,2012-09-22 06:00:00
7,2012-09-22,7,8,3,0,0,1,2012-09-22 07:00:00
8,2012-09-22,8,15,5,3,5,3,2012-09-22 08:00:00
9,2012-09-22,9,15,5,1,1,6,2012-09-22 09:00:00


In [42]:
cleaned_df['DATE']=cleaned_df['DATE'].dt.tz_localize('UTC')
cleaned_df['DATE_TIME']=cleaned_df['DATE_TIME'].dt.tz_localize('UTC')
uber_df=session.create_dataframe(cleaned_df)
uber_df.show()

----------------------------------------------------------------------------------------------------------------------------------
|"DATE"               |"TIME"  |"EYEBALLS"  |"ZEROES"  |"COMPLETED_TRIPS"  |"REQUESTS"  |"UNIQUE_DRIVERS"  |"DATE_TIME"          |
----------------------------------------------------------------------------------------------------------------------------------
|2012-09-22 00:00:00  |0       |62          |4         |19                 |27          |27                |2012-09-22 00:00:00  |
|2012-09-22 00:00:00  |1       |39          |5         |17                 |24          |24                |2012-09-22 01:00:00  |
|2012-09-22 00:00:00  |2       |30          |0         |9                  |18          |12                |2012-09-22 02:00:00  |
|2012-09-22 00:00:00  |3       |11          |3         |3                  |3           |8                 |2012-09-22 03:00:00  |
|2012-09-22 00:00:00  |4       |6           |4         |2                  |3      

In [43]:
cleaned_uber=uber_df.select(to_date(col('DATE')).alias('DATE'),"TIME","EYEBALLS","ZEROES","COMPLETED_TRIPS","REQUESTS","UNIQUE_DRIVERS","DATE_TIME")
cleaned_uber.show()

-------------------------------------------------------------------------------------------------------------------------
|"DATE"      |"TIME"  |"EYEBALLS"  |"ZEROES"  |"COMPLETED_TRIPS"  |"REQUESTS"  |"UNIQUE_DRIVERS"  |"DATE_TIME"          |
-------------------------------------------------------------------------------------------------------------------------
|2012-09-22  |0       |62          |4         |19                 |27          |27                |2012-09-22 00:00:00  |
|2012-09-22  |1       |39          |5         |17                 |24          |24                |2012-09-22 01:00:00  |
|2012-09-22  |2       |30          |0         |9                  |18          |12                |2012-09-22 02:00:00  |
|2012-09-22  |3       |11          |3         |3                  |3           |8                 |2012-09-22 03:00:00  |
|2012-09-22  |4       |6           |4         |2                  |3           |1                 |2012-09-22 04:00:00  |
|2012-09-22  |5       |5

The below function test the max_trips_date function with sample data file(test data ) and the expected output is present in the output1 table in snowflake . The function calls the function defined above for validation and compares if the actual and expected output match

In [44]:
output1_df = session.sql('select * from output1')
output1_df.schema

StructType([StructField('DATE', DateType(), nullable=True)])

In [45]:
def test_max_trips_date():
    output_df = max_trips_date(cleaned_uber)
    output_df.show()
    # print(output_df.schema)
    # Define expected output data
    expected_data = output1_df
    expected_data.show()
    if compare_row_count(output_df, expected_data):
        print("Row count of the actual_df and expected_df match")
    else:
        print("Row count of the actual_df and expected_df do not match")
        return False
    if compare_schema(output_df, expected_data):
        print("Schema of the actual_df and expected_df match")
    else:
        print("Schema of the actual_df and expected_df do not match")
        return False
    if compare_dataframes(output_df, expected_data):
        print("Dataframes are equivalent!")
    else:
        print("Dataframes are not equivalent")
test_max_trips_date()    

--------------
|"DATE"      |
--------------
|2012-09-22  |
--------------

--------------
|"DATE"      |
--------------
|2012-09-22  |
--------------

Row count of the actual_df and expected_df match
Schema of the actual_df and expected_df match
Dataframes are equivalent!


The below function test the highest_completed_trip function with sample data file(test data ). The function calls the compare_dataframe function defined above and compares if the actual and expected output match

In [46]:
def test_highest_completed_trips():
    output_df = highest_completed_trips(cleaned_uber)
    print("output value",output_df)
    expected_data = 248
    if compare_dataframes(output_df, expected_data):
        print("Dataframes are equivalent!")
    else:
        print("Dataframes are not equivalent")
test_highest_completed_trips()    

output value 248
Dataframes are equivalent!


The below function test the most_requests_per_hour function with sample data file(test data ). The function calls the compare_dataframe function defined above and compares if the actual and expected output match

In [47]:
def test_most_requests_per_hour():
    output_df = most_requests_per_hour(cleaned_uber)
    print("output value",output_df)

    expected_data = 0
#     print(type(expected_data))
#     # display(expected_data)
    if compare_dataframes(output_df, expected_data):
        print("Dataframes are equivalent!")
    else:
        print("Dataframes are not equivalent")
test_most_requests_per_hour()   

output value 0
Dataframes are equivalent!


In [48]:
# def test_percentage_zeroes():
#     output_df = percentage_zeroes(cleaned_df)
#     print("output value",output_df)
# #     expected_data = 19
# #     print(type(expected_data))
# #     # display(expected_data)
# #     if compare_dataframes(output_df, expected_data):
# #         print("Dataframes are equivalent!")
# #     else:
# #         print("Dataframes are not equivalent")
# test_percentage_zeroes()   

The below function test the weighted_avg_ratio function with sample data file(test data ) and the expected output is present in the output2 table in snowflake . The function calls the function defined above for validation and compares if the actual and expected output match

In [49]:
output2_df = session.sql('select * from output2')
output2_df.schema

StructType([StructField('WEIGHTED_AVERAGE', DoubleType(), nullable=True)])

In [50]:
def test_weighted_avg_ratio():
    output = weighted_avg_ratio(cleaned_uber)
#     print(output.schema)
    output2=output.select(trunc(col("Weighted_average"), lit(2)).alias('Weighted_average'))
#     output2.show()
#     print(output2.schema)
    output_df = output2.withColumn("Weighted_average",output2.Weighted_average.cast(DoubleType()))
    output_df.show()
    expected_data=output2_df.select(trunc(col("Weighted_average"), lit(2)).alias('Weighted_average'))
    expected_data.show() 
#     print(expected_data.schema)
    if compare_schema(output_df, expected_data):
        print("Schema of the actual_df and expected_df match")
    else:
        print("Schema of the actual_df and expected_df do not match")
        return False
    if compare_dataframes(output_df, expected_data):
        print("Dataframes are equivalent!")
    else:
        print("Dataframes are not equivalent")
test_weighted_avg_ratio()

----------------------
|"WEIGHTED_AVERAGE"  |
----------------------
|0.92                |
----------------------

----------------------
|"WEIGHTED_AVERAGE"  |
----------------------
|0.92                |
----------------------

Schema of the actual_df and expected_df match
Dataframes are equivalent!


The below function test the busiest_hours function with sample data file(test data ) and the expected output expected output is present in the output3 table in snowflake . The function calls the function defined above for validation and compares if the actual and expected output match

In [51]:
output3_df = session.sql('select * from output3')
output3_df.show()


-------------------------------------------
|"TIME"  |"TOTAL_REQUESTS"  |"SUM_8_HRS"  |
-------------------------------------------
|1       |3                 |21           |
-------------------------------------------



In [52]:
def test_busiest_hours():
    output_df = busiest_hours(cleaned_uber)
    
    output_df.show()
    print(output_df.schema)
    # Define expected output data
    expected_data=output3_df
#         .select(trunc(col("Weighted_average"), lit(2)).alias('Weighted_average'))
    expected_data.show() 
    print(expected_data.schema)
    if compare_row_count(output_df, expected_data):
        print("Row count of the actual_df and expected_df match")
    else:
        print("Row count of the actual_df and expected_df do not match")
        return False
    if compare_dataframes(output_df, expected_data):
        print("Dataframes are equivalent!")
    else:
        print("Dataframes are not equivalent")
test_busiest_hours() 

-------------------------------------------
|"TIME"  |"TOTAL_REQUESTS"  |"SUM_8_HRS"  |
-------------------------------------------
|1       |3                 |21           |
-------------------------------------------

StructType([StructField('TIME', LongType(), nullable=True), StructField('TOTAL_REQUESTS', LongType(), nullable=False), StructField('SUM_8_HRS', LongType(), nullable=True)])
-------------------------------------------
|"TIME"  |"TOTAL_REQUESTS"  |"SUM_8_HRS"  |
-------------------------------------------
|1       |3                 |21           |
-------------------------------------------

StructType([StructField('TIME', LongType(), nullable=True), StructField('TOTAL_REQUESTS', LongType(), nullable=True), StructField('SUM_8_HRS', LongType(), nullable=True)])
Row count of the actual_df and expected_df match
Dataframes are equivalent!


The below function test the zeroes_to_eyeballs_ratio function with sample data file(test data ) and the expected output is in the Output4.csv file that is present in the filestore . The function calls the compare_dataframe function defined above and compares if the actual and expected output match

In [53]:
output4_df = session.sql('select * from output4')
output4_df.show()

-----------------------------------------------
|"PERIOD"  |"ZEROES"  |"EYEBALLS"  |"RATIO"   |
-----------------------------------------------
|0         |353       |1520        |0.232237  |
-----------------------------------------------



In [54]:
def test_zeroes_to_eyeballs_ratio():
    output4 = zeroes_to_eyeballs_ratio(cleaned_uber)
#     output4.show()
    output_df = output4.withColumn("RATIO",output4.RATIO.cast(DoubleType()))
    output_df.show()
    expected_data =output4_df
    expected_data.show()
    if compare_row_count(output_df, expected_data):
        print("Row count of the actual_df and expected_df match")
    else:
        print("Row count of the actual_df and expected_df do not match")
        return False
    if compare_schema(output_df, expected_data):
        print("Schema of the actual_df and expected_df match")
    else:
        print("Schema of the actual_df and expected_df do not match")
        return False
    if compare_dataframes(output_df, expected_data):
        print("Dataframes are equivalent!")
    else:
        print("Dataframes are not equivalent")
test_zeroes_to_eyeballs_ratio() 

-----------------------------------------------
|"PERIOD"  |"ZEROES"  |"EYEBALLS"  |"RATIO"   |
-----------------------------------------------
|0         |353       |1520        |0.232237  |
-----------------------------------------------

-----------------------------------------------
|"PERIOD"  |"ZEROES"  |"EYEBALLS"  |"RATIO"   |
-----------------------------------------------
|0         |353       |1520        |0.232237  |
-----------------------------------------------

Row count of the actual_df and expected_df match
Schema of the actual_df and expected_df match
Dataframes are equivalent!


The below function test the requests_per_driver function with sample data file(test data ) and the expected output is in the Output5.csv file that is present in the filestore . The function calls the compare_dataframe function defined above and compares if the actual and expected output match

In [55]:
output5_df = session.sql('select * from output5')
output5_df.show()

----------------------------------
|"TIME"  |"REQUESTS_PER_DRIVER"  |
----------------------------------
|23      |23.5                   |
----------------------------------



In [56]:
def test_requests_per_driver():
    output5 = requests_per_driver(cleaned_uber)
#     output5.show()
    output_df = output5.withColumn("REQUESTS_PER_DRIVER",output5.REQUESTS_PER_DRIVER.cast(DoubleType()))
    output_df.show()
    # Define expected output data
    
    expected_data = output5_df
    if compare_row_count(output_df, expected_data):
        print("Row count of the actual_df and expected_df match")
    else:
        print("Row count of the actual_df and expected_df do not match")
        return False
    if compare_schema(output_df, expected_data):
        print("Schema of the actual_df and expected_df match")
    else:
        print("Schema of the actual_df and expected_df do not match")
        return False
    if compare_dataframes(output_df, expected_data):
        print("Dataframes are equivalent!")
    else:
        print("Dataframes are not equivalent")
test_requests_per_driver() 

----------------------------------
|"TIME"  |"REQUESTS_PER_DRIVER"  |
----------------------------------
|23      |23.5                   |
----------------------------------

Row count of the actual_df and expected_df match
Schema of the actual_df and expected_df match
Dataframes are equivalent!


The below function test the true_end_day function with sample data file(test data ) and the expected output is in the Output6.csv file that is present in the filestore . The function calls the compare_dataframe function defined above and compares if the actual and expected output match

In [57]:
output6_df = session.sql('select * from output6')
output6_df.show()

--------------------------------------------------
|"TIME"  |"AVG_REQUESTS"  |"AVG_UNIQUE_DRIVERS"  |
--------------------------------------------------
|9       |0.5             |6                     |
--------------------------------------------------



In [58]:
def test_true_end_day():
    output6 = true_end_day(cleaned_uber)
    output_df = output6.withColumn("AVG_REQUESTS",output6.AVG_REQUESTS.cast(DoubleType()))
    output_df = output_df.withColumn("AVG_UNIQUE_DRIVERS", output6_df.AVG_UNIQUE_DRIVERS.cast(LongType()))
    output_df.show()
    # Define expected output data

    expected_data = output6_df
    expected_data.show()
    if compare_row_count(output_df, expected_data):
        print("Row count of the actual_df and expected_df match")
    else:
        print("Row count of the actual_df and expected_df do not match")
        return False
    if compare_schema(output_df, expected_data):
        print("Schema of the actual_df and expected_df match")
    else:
        print("Schema of the actual_df and expected_df do not match")
        return False
    if compare_dataframes(output_df, expected_data):
        print("Dataframes are equivalent!")
    else:
        print("Dataframes are not equivalent")
test_true_end_day() 

--------------------------------------------------
|"TIME"  |"AVG_REQUESTS"  |"AVG_UNIQUE_DRIVERS"  |
--------------------------------------------------
|9       |0.5             |6                     |
--------------------------------------------------

--------------------------------------------------
|"TIME"  |"AVG_REQUESTS"  |"AVG_UNIQUE_DRIVERS"  |
--------------------------------------------------
|9       |0.5             |6                     |
--------------------------------------------------

Row count of the actual_df and expected_df match
Schema of the actual_df and expected_df match
Dataframes are equivalent!
