## Importing shared computation logic from library

A concept in software engineering is "testability", the ability to write and execute tests against business logic.  
Notebooks by themselves are not amenable to this approach; as it mixes data access, data processing, data ingestion, and data export in the same code unit; and it makes writing separable tests much harder. 
An approach to writing testable pyspark code, in the python eco-system is to separate certain business logic in a python library file.  This file can be separably executed and tested.  

This particular example shows to separate logic block, imported as a library file, and its testability

In [0]:
notebook_path = dbutils.notebook.entry_point.getDbutils().notebook().getContext().notebookPath().get()
print(notebook_path)
notebook_abs_dir = os.path.dirname(notebook_path)
print(notebook_abs_dir)

In [0]:
%reload_ext autoreload
%autoreload 2

## Load Datasets from external Data Source

These Datasets are currently stored locally and packaged with this repo. However, it is representive of interacting with external data interfaces, such as API (i.e. HTTPS GET) or Delta Lake (i.e. spark.table)

In [0]:
# Pretend we are loading data from an API to get the Class Dataset
import os
class_example_data_path = os.path.join(notebook_abs_dir, "data/Class_Dataset.csv")
class_example_data_path = "file:/Workspace" + class_example_data_path
print(class_example_data_path)
df_class = spark.read.csv(class_example_data_path, header=True, inferSchema=True)
display(df_class)

In [0]:
score_example_data_path = os.path.join(notebook_abs_dir, "data/Score_Dataset.csv")
score_example_data_path = "file:/Workspace" + score_example_data_path
print(score_example_data_path)

df_score = spark.read.csv(score_example_data_path, header=True, inferSchema=True)
display(df_score)

## Data Processing - Joining Class and Score columns


In [0]:
from library.class_business_logic import inner_join_dataframes

In [0]:
df_joined = inner_join_dataframes(df_class, df_score, "class_id")
display(df_joined)

In [0]:
import os

sql_command = """
SELECT * FROM samples.nyctaxi.trips 
order by tpep_pickup_datetime desc
LIMIT 5000
"""
df_sample_data = spark.sql(sql_command)
display(df_sample_data)

In [0]:
from library.business_logic import inner_join_dataframes


In [0]:
from pyspark.sql.functions import to_date


result = (
    df_sample_data.withColumn("pickup_date", to_date("tpep_pickup_datetime"))
    .groupBy("pickup_date")
    .applyInPandas(
        lambda pdf: calculate_statistics(pdf, columns=['trip_distance', 'fare_amount']),
        schema="pickup_date date, trip_distance_stats struct<mean:double,median:double,variance:double>, fare_amount_stats struct<mean:double,median:double,variance:double>"
    )
)

display(result)