In [1]:
# Import libraries
from typing import Any
import pyspark
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from whylogs.api.pyspark.experimental import collect_column_profile_views
from whylogs.api.pyspark.experimental import collect_dataset_profile_view
from whylogs.core.metrics.condition_count_metric import Condition
from whylogs.core.relations import Predicate
from whylogs.core.schema import DeclarativeSchema
from whylogs.core.resolvers import STANDARD_RESOLVER
from whylogs.core.specialized_resolvers import ConditionCountMetricSpec
from whylogs.core.constraints.factories import condition_meets
from whylogs.core.constraints import ConstraintsBuilder
from whylogs.core.constraints.factories import no_missing_values
from whylogs.core.constraints.factories import greater_than_number
from whylogs.viz import NotebookProfileVisualizer
import pandas as pd
import datetime

In [2]:
# Initialize a SparkSession
spark = SparkSession.builder.appName('whylogs').getOrCreate()
spark.conf.set("spark.sql.execution.arrow.pyspark.enabled","true")

In [5]:
# Create a dataframe from CSV file
df = spark.read.option("header",True).option("inferSchema",True).csv("/home/patient_data.csv")
df.printSchema()

root
 |-- patient_id: integer (nullable = true)
 |-- patient_name: string (nullable = true)
 |-- height: integer (nullable = true)
 |-- weight: integer (nullable = true)
 |-- visit_date: string (nullable = true)



In [6]:
# First row from dataframe
df.show(n=1, vertical=True)

-RECORD 0------------------
 patient_id   | 8286975    
 patient_name | Jane Davis 
 height       | 170        
 weight       | 97         
 visit_date   | 2023-04-19 
only showing top 1 row



### Data profile with whylogs

In [9]:
# Profile the data with whylogs
df_profile = collect_column_profile_views(df)
print(df_profile)

{'height': <whylogs.core.view.column_profile_view.ColumnProfileView object at 0x7f66958a32d0>, 'patient_id': <whylogs.core.view.column_profile_view.ColumnProfileView object at 0x7f673d937a90>, 'patient_name': <whylogs.core.view.column_profile_view.ColumnProfileView object at 0x7f6695925450>, 'visit_date': <whylogs.core.view.column_profile_view.ColumnProfileView object at 0x7f673d9351d0>, 'weight': <whylogs.core.view.column_profile_view.ColumnProfileView object at 0x7f673d936b90>}


In [10]:
# Let's look at mean of height column
# whylogs will look at every data point and statistically decide wether or not that data point is relevant to the final calculation.
df_profile["height"].get_metric("distribution").mean.value

174.98855

In [12]:
# Compare with mean from dataframe
df.select(F.mean(F.col("height"))).show()

+-----------+
|avg(height)|
+-----------+
|  174.98855|
+-----------+



In [18]:
# Putting everything together
df_profile_view = collect_dataset_profile_view(input_df=df)
df_profile_view.to_pandas().head()

Unnamed: 0_level_0,cardinality/est,cardinality/lower_1,cardinality/upper_1,counts/inf,counts/n,counts/nan,counts/null,distribution/max,distribution/mean,distribution/median,...,frequent_items/frequent_strings,ints/max,ints/min,type,types/boolean,types/fractional,types/integral,types/object,types/string,types/tensor
column,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1,Unnamed: 21_level_1
height,51.000006,51.0,51.002553,0,100000,0,0,200.0,174.9886,175.0,...,"[FrequentItem(value='174', est=2203, upper=220...",200.0,150.0,SummaryType.COLUMN,0,0,100000,0,0,0
patient_id,9624.479972,9471.462071,9782.831357,0,100000,0,0,9998201.0,5441664.0,5439154.0,...,[],9998201.0,1000595.0,SummaryType.COLUMN,0,0,100000,0,0,0
patient_name,100.000025,100.0,100.005018,0,100000,0,0,,0.0,,...,"[FrequentItem(value='Robert Williams', est=125...",,,SummaryType.COLUMN,0,0,0,0,100000,0
visit_date,2230.288911,2194.829943,2266.983812,0,100000,0,0,,0.0,,...,[],,,SummaryType.COLUMN,0,0,0,0,100000,0
weight,56.000008,56.0,56.002804,0,100000,0,0,102.0,73.60038,75.0,...,"[FrequentItem(value='55', est=2107, upper=2107...",102.0,0.0,SummaryType.COLUMN,0,0,100000,0,0,0


In [21]:
# The directory /home/jovyan in the Docker container for Jupyter-based Docker images comes from the Jupyter Docker Stacks
# In these Docker images, jovyan is the default non-root user created to run Jupyter applications
# /home/jovyan is where you'll find the default working directory for Jupyter notebooks
# This is where you should place files if you want them to be accessible via the Jupyter notebook interface

# Persist profile as a file
df_profile_view.to_pandas().reset_index().to_csv("/home/jovyan/patint_profile.csv",header = True,index = False)

### Data Validation with whylogs
Perform following checks:
- `patient_id`: should not contain any missing values
- `weight`: should be greater than 0
- `visit_date`: should be in the format YYYY-MM-DD

In [23]:
# We can use the collect_dataset_profile_view from whylog's PySpark module to create a profile with standard metrics
# But if we need constraints that need to be checked for individual values, we need to create condition count metrics

def check_date_format(date_value: Any) -> bool:
    date_format = '%Y-%m-%d'
    try:
        datetime.datetime.strptime(date_value, date_format)
        return True
    except ValueError:
        return False

visit_date_condition = {"is_date_format": Condition(Predicate().is_(check_date_format))}

In [27]:
# Create condition count metric
schema = DeclarativeSchema(STANDARD_RESOLVER)
schema.add_resolver_spec(column_name="visit_date", metrics=[ConditionCountMetricSpec(visit_date_condition)])

In [28]:
# Use the schema to pass to logger with collect_dataset_profile_view
# This creates profile with standard metrics as well as condition count metrics
df_profile_view_v2 = collect_dataset_profile_view(input_df=df, schema=schema)

In [31]:
# Create and metric constraints
# Find list of all constraints here: https://nbviewer.org/github/whylabs/whylogs/blob/mainline/python/examples/basic/Constraints_Suite.ipynb
builder = ConstraintsBuilder(dataset_profile_view=df_profile_view_v2)
builder.add_constraint(no_missing_values(column_name="patient_id"))
builder.add_constraint(condition_meets(column_name="visit_date", condition_name="is_date_format"))
builder.add_constraint(greater_than_number(column_name="weight",number=0))

constraints = builder.build()
constraints.generate_constraints_report()

[ReportResult(name='patient_id has no missing values', passed=1, failed=0, summary=None),
 ReportResult(name='visit_date meets condition is_date_format', passed=0, failed=1, summary=None),
 ReportResult(name='weight greater than number 0', passed=0, failed=1, summary=None)]

In [34]:
# Visualize constraints report using Notebook Profile Visualizer
visualization = NotebookProfileVisualizer()
visualization.constraints_report(constraints, cell_height=300)

In [43]:
# Validate visit_date column
df \
.withColumn("check_visit_date",F.to_date(F.col("visit_date"),"yyyy-MM-dd")) \
.withColumn("null_check",F.when(F.col("check_visit_date").isNull(),"null").otherwise("not_null")) \
.groupBy("null_check") \
.count() \
.show(truncate = False)

+----------+-----+
|null_check|count|
+----------+-----+
|not_null  |98977|
|null      |1023 |
+----------+-----+



In [39]:
# Validate weight column
df \
.select("weight") \
.groupBy("weight") \
.count() \
.orderBy(F.col("weight")) \
.limit(1) \
.show(truncate = False)

+------+-----+
|weight|count|
+------+-----+
|0     |2039 |
+------+-----+

