# whylogs + pySpark

First, you'll need to point to an existing Spark installation. Make sure that the Spark version matches the pyspark version in the environment.

You can download a Spark distribution and extract it to a local path: https://www.apache.org/dyn/closer.lua/spark/spark-3.1.1/spark-3.1.1-bin-hadoop2.7.tgz

In [1]:
env SPARK_HOME=/Volumes/Workspace/spark/spark-3.1.1-bin-hadoop3.2

env: SPARK_HOME=/Volumes/Workspace/spark/spark-3.1.1-bin-hadoop3.2


## whylogs Spark jar

You'll need to load a fat jar for whylogs. This jar contains all the required dependencies (some are shaded) for running whylogs in Spark.

Spark will then inject the `whyspark` module into the path.

You can download the jar bundle here: https://drive.google.com/file/d/1IHlXJZU6HxTeIuWlMCKh3v7N6H9EeJxI/view?usp=sharing

In [2]:
whylogs_jar = "/Users/andy/Workspace/tmp/pyspark/whylogs-spark_3.1.1.jar"

You'll need pyspark in your Jupyter kernel

In [3]:
pip install pyspark==3.1.1

Note: you may need to restart the kernel to use updated packages.


In [4]:
import pyspark
pyspark.__version__

'3.1.1'

In [5]:
import sys
import os

In [6]:
%env PYSPARK_PYTHON={sys.executable}
%env PYSPARK_DRIVER_PYTHON={sys.executable}

env: PYSPARK_PYTHON=/Users/andy/miniconda3/envs/whylogs-spark/bin/python
env: PYSPARK_DRIVER_PYTHON=/Users/andy/miniconda3/envs/whylogs-spark/bin/python


In [7]:
spark = pyspark.sql.SparkSession.builder \
                .appName("whylogs") \
                .config("spark.pyspark.driver.python", sys.executable) \
                .config("spark.pyspark.python", sys.executable) \
                .config("spark.executor.userClassPathFirst", "true") \
                .config("spark.submit.pyFiles", whylogs_jar) \
                .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
                .config("spark.jars", whylogs_jar) \
                .getOrCreate() 

## Using Spark bridge

`whyspark` is the module that is bundled in the above jar. It's a thin bridge into whylogs Spark.

In [8]:
import whyspark
whyspark

<module 'whyspark' from '/private/var/folders/58/g0k4klhn3915_fs0s1gc7h7r0000gn/T/spark-396a5059-b82e-42eb-8884-5d6a779ef0ac/userFiles-dcf9d009-749c-4b64-a61c-7f43efc61932/whylogs-spark_3.1.1.jar/whyspark/__init__.py'>

# Read a parquet data
The example dataset can be downloaded from here: https://drive.google.com/file/d/11kNZ-rxoIZL6dyLOOYjFUWiMUOc0hzhA/view?usp=sharing

In [12]:
df = spark.read.parquet("demo.parquet")

In [13]:
df.printSchema()

root
 |-- feature0: double (nullable = true)
 |-- feature1: double (nullable = true)
 |-- feature2: double (nullable = true)
 |-- feature3: double (nullable = true)
 |-- feature4: double (nullable = true)
 |-- feature5: double (nullable = true)
 |-- feature6: double (nullable = true)
 |-- feature7: double (nullable = true)
 |-- feature8: double (nullable = true)
 |-- feature9: double (nullable = true)
 |-- feature10: double (nullable = true)
 |-- feature11: double (nullable = true)
 |-- feature12: double (nullable = true)
 |-- feature13: double (nullable = true)
 |-- feature15: double (nullable = true)
 |-- feature16: double (nullable = true)
 |-- feature17: double (nullable = true)
 |-- feature18: double (nullable = true)
 |-- feature19: double (nullable = true)
 |-- targets: double (nullable = true)
 |-- predictions: double (nullable = true)



In [15]:
df.limit(10).toPandas()

Unnamed: 0,feature0,feature1,feature2,feature3,feature4,feature5,feature6,feature7,feature8,feature9,...,feature11,feature12,feature13,feature15,feature16,feature17,feature18,feature19,targets,predictions
0,-0.770211,-1.150007,1.267444,1.543401,6.845826,-0.80908,0.516353,-1.01332,0.0,0.0,...,1.189272,1.357314,1.097288,0.895369,2.022678,0.836942,-0.364999,-0.448945,249.869482,475.699682
1,-0.209804,0.420597,-0.611037,-1.239387,-3.405289,1.959404,0.28243,-1.115981,0.0,0.0,...,0.612708,-2.584602,-1.500597,0.537443,-1.538254,0.183368,-0.961058,-1.037713,-390.560276,-518.176652
2,2.668646,2.169074,0.744704,0.873234,-1.525391,1.180888,-0.366199,1.173369,0.0,0.0,...,1.701718,0.398231,1.450776,0.162088,-1.856014,-1.007003,-0.301078,-0.122349,220.721564,144.361448
3,-0.299021,1.528501,0.76289,0.080424,0.950978,-0.630125,-0.1347,-0.633652,0.0,0.0,...,-0.630305,-1.872968,0.616833,0.073704,-1.081827,0.103161,-0.180834,0.862916,65.126969,68.374668
4,0.834264,0.315182,-0.15883,-0.570665,-2.190307,0.478157,-1.323692,-0.831484,0.0,0.0,...,2.509004,-0.203884,-1.197428,-0.464271,-1.940522,0.322344,0.720388,-0.299949,-180.414795,-291.098377
5,-0.410635,-0.72997,-0.379872,0.335286,0.076386,0.257638,-0.41855,1.146481,0.0,0.0,...,1.663541,-1.028033,-1.354744,-0.243626,-0.428517,-1.519307,0.41944,0.408956,128.801929,117.521255
6,-0.408978,0.463264,0.32873,0.735546,-0.545499,-0.972379,0.018453,0.440376,0.0,0.0,...,1.109912,0.182653,0.125567,2.101821,-0.648903,1.223861,-0.390747,-0.441613,-169.732409,-111.987417
7,-0.989288,0.458408,1.392145,-0.792931,-0.148603,0.606489,-0.071866,-1.048558,0.0,0.0,...,-0.094266,0.377068,-0.634952,1.704654,0.506391,0.862971,1.756921,0.674557,-76.037622,-139.440168
8,0.563311,1.521921,-0.155797,1.194707,-1.831027,-2.199984,0.504092,-0.158731,0.0,0.0,...,0.893653,0.599627,1.403464,0.10502,1.777538,-0.81422,0.119774,0.106125,63.950866,-78.570837
9,0.329955,-1.749117,1.332187,-0.242201,-0.164931,-1.836846,-0.008868,1.469891,0.0,0.0,...,-0.462468,0.945933,0.043585,-1.331824,-0.832666,-1.056759,0.470669,-1.61479,175.050885,142.696762


## Tracking the datasets

To track a new dataset, you have to create a whylog session.

In [17]:
import whyspark

In [18]:
whyspark.new_profiling_session?

[0;31mSignature:[0m
[0mwhyspark[0m[0;34m.[0m[0mnew_profiling_session[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mdf[0m[0;34m:[0m [0mpyspark[0m[0;34m.[0m[0msql[0m[0;34m.[0m[0mdataframe[0m[0;34m.[0m[0mDataFrame[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mname[0m[0;34m:[0m [0mstr[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mtime_column[0m[0;34m:[0m [0mUnion[0m[0;34m[[0m[0mstr[0m[0;34m,[0m [0mNoneType[0m[0;34m][0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m <no docstring>
[0;31mFile:[0m      /private/var/folders/58/g0k4klhn3915_fs0s1gc7h7r0000gn/T/spark-396a5059-b82e-42eb-8884-5d6a779ef0ac/userFiles-dcf9d009-749c-4b64-a61c-7f43efc61932/whylogs-spark_3.1.1.jar/whyspark/udt/profile.py
[0;31mType:[0m      function


In [19]:
session = whyspark.new_profiling_session(df, "my-model-name")

In [20]:
session.log?

[0;31mSignature:[0m
[0msession[0m[0;34m.[0m[0mlog[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mdt[0m[0;34m:[0m [0mUnion[0m[0;34m[[0m[0mdatetime[0m[0;34m.[0m[0mdatetime[0m[0;34m,[0m [0mNoneType[0m[0;34m][0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0morg_id[0m[0;34m:[0m [0mstr[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mmodel_id[0m[0;34m:[0m [0mstr[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mapi_key[0m[0;34m:[0m [0mstr[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Run profiling and send results to WhyLabs using the WhyProfileSession's configurations.

Users must specify the organization ID, the model ID and the API key.

You can specify via WHYLABS_ORG_ID, WHYLABS_MODEL_ID and WHYLABS_API_KEY environment variables as well.
:param dt: the datetime of the dataset. Default to the cur

## Run the profiling

Note that the result has three entries for three days

In [21]:
session.aggProfiles?

[0;31mSignature:[0m
[0msession[0m[0;34m.[0m[0maggProfiles[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mdatetime_ts[0m[0;34m:[0m [0mUnion[0m[0;34m[[0m[0mdatetime[0m[0;34m.[0m[0mdatetime[0m[0;34m,[0m [0mNoneType[0m[0;34m][0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mtimestamp_ms[0m[0;34m:[0m [0mint[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m [0;34m->[0m [0mpyspark[0m[0;34m.[0m[0msql[0m[0;34m.[0m[0mdataframe[0m[0;34m.[0m[0mDataFrame[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m <no docstring>
[0;31mFile:[0m      /private/var/folders/58/g0k4klhn3915_fs0s1gc7h7r0000gn/T/spark-396a5059-b82e-42eb-8884-5d6a779ef0ac/userFiles-dcf9d009-749c-4b64-a61c-7f43efc61932/whylogs-spark_3.1.1.jar/whyspark/udt/profile.py
[0;31mType:[0m      method


In [22]:
%%time
profile_df = session.aggProfiles().cache()
profile_df.count()

CPU times: user 1.22 ms, sys: 933 µs, total: 2.15 ms
Wall time: 709 ms


1


## Publish to WhyLabs service

You'll need the following information:
* Organization ID (`org_id`)
* Model ID (`model_id`)
* API Key specific to the organization

Please reach out to the WhyLabs team if you don't have the information.

You can pass these via the method parameters or pass them as environment variables.

In [23]:
session.log?

[0;31mSignature:[0m
[0msession[0m[0;34m.[0m[0mlog[0m[0;34m([0m[0;34m[0m
[0;34m[0m    [0mdt[0m[0;34m:[0m [0mUnion[0m[0;34m[[0m[0mdatetime[0m[0;34m.[0m[0mdatetime[0m[0;34m,[0m [0mNoneType[0m[0;34m][0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0morg_id[0m[0;34m:[0m [0mstr[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mmodel_id[0m[0;34m:[0m [0mstr[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m    [0mapi_key[0m[0;34m:[0m [0mstr[0m [0;34m=[0m [0;32mNone[0m[0;34m,[0m[0;34m[0m
[0;34m[0m[0;34m)[0m[0;34m[0m[0;34m[0m[0m
[0;31mDocstring:[0m
Run profiling and send results to WhyLabs using the WhyProfileSession's configurations.

Users must specify the organization ID, the model ID and the API key.

You can specify via WHYLABS_ORG_ID, WHYLABS_MODEL_ID and WHYLABS_API_KEY environment variables as well.
:param dt: the datetime of the dataset. Default to the cur

In [25]:
# Here we're passing using environment variables

%env WHYLABS_API_KEY=<your-api-key>
%env WHYLABS_ORG_ID=<your-organization-id>
%env WHYLABS_MODEL_ID=<your-model-id>

env: WHYLABS_API_KEY=<your-api-key>
env: WHYLABS_ORG_ID=<your-organization-id>
env: WHYLABS_MODEL_ID=<your-model-id>


In [None]:
%%time
from datetime import datetime
session.log(datetime(2021, 3, 18, 0, 0))