# 1. Prepare

## 1.1. Calculate number of cores

In [1]:
! nproc --all

16


## 1.2. Download schema

In [2]:
! wget -qOopentargets.json https://raw.githubusercontent.com/opentargets/json_schema/master/opentargets.json

## 1.3. Prepare test data

Test data is the top 40000 rows of the latest genetics portal evidence. This number is selected because it's 1/(4\*60)th of the total number of EPMC evidence.

Hence, the number of **seconds** required for running this sample on a 16 core machine is equivalent to the number of **minutes** required for running the full EPMC on a 64 core machine.

In [3]:
! gsutil cat gs://otar000-evidence_input/Genetics_portal/json/genetics-portal-evidence-2022-04-12.json.gz | gzip 2>/dev/null -d | head -n 40000 | gzip -c -9 >test.json.gz

# 2. Benchmark the existing approach: opentargets_validator

In [4]:
%%time
%%bash
python3 -m opentargets_validator.cli --schema file://opentargets.json test.json.gz &>result
echo $?

0
CPU times: user 4.49 ms, sys: 7.35 ms, total: 11.8 ms
Wall time: 37.9 s


# 3. Validate directly inside Spark with `asDict` and a helper function

## 3.1. Install fastjsonschema library

In [5]:
%%bash
pip3 -q install --upgrade fastjsonschema

In [6]:
from importlib.metadata import version
import fastjsonschema
version('fastjsonschema')

'2.16.1'

## 3.2. Set up Spark session

In [7]:
from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import pyspark.sql.types as t

sparkConf = (
    SparkConf()
    .set('spark.driver.memory', '60g')
    .set('spark.executor.memory', '60g')
    .set('spark.driver.maxResultSize', '0')
    .set('spark.debug.maxToStringFields', '2000')
    .set('spark.sql.execution.arrow.maxRecordsPerBatch', '500000')
)
spark = (
    SparkSession
    .builder
    .config(conf=sparkConf)
    .master('local[*]')
    .appName("Cerberus test")
    .getOrCreate()
)

## 3.3. Load data into Spark
Of course, normally this data wouldn't be loaded from JSON, but generated by the pipeline; but it doesn't matter because the resulting dataframe (which we want to validate) is the same.

In [8]:
df = spark.read.json('test.json.gz')

## 3.4. Load schema and compile the validator

In [9]:
import json
schema = json.load(open('opentargets.json'))
fast_validate = fastjsonschema.compile(schema)

## 3.5. Prepare the validation UDF

In [10]:
from pyspark.sql.functions import udf, struct
from pyspark.sql.types import IntegerType


def remove_none_recursively(d):
    return {
        k: (remove_none_recursively(v) if isinstance(v, dict) else v)
        for k, v in d.items()
        if v
    }


def validate_row(row):
    try:
        fast_validate(remove_none_recursively(row.asDict(recursive=True)))
    except:
        return 0
    else:
        return 1

validate_row_udf = udf(validate_row, IntegerType())

## 3.6. Validate and benchmark

In [11]:
%%time
# Calculate number of valid evidences
new_df = df.withColumn("is_valid", validate_row_udf(struct([df[x] for x in df.columns])))
new_df.select('is_valid').groupby().sum().collect()

CPU times: user 874 ms, sys: 231 µs, total: 874 ms
Wall time: 12 s


[Row(sum(is_valid)=40000)]

# 4. Alternative PySpark approach with `to_json` → `json.loads()`

This is a bit redundant (serialising and then deserialising the data), but still works with approximately the same performance.

In [12]:
def validate_row_2(j):
    try:
        fast_validate(json.loads(j))
    except:
        return 0
    else:
        return 1

validate_row_udf_2 = udf(validate_row_2, IntegerType())

In [13]:
%%time
df2 = df.withColumn("JSON",f.to_json(f.struct([df[x] for x in df.columns])))
df2 = df2.withColumn('is_valid', validate_row_udf_2(df2.JSON))
df2.select('is_valid').groupby().sum().collect()

CPU times: user 413 ms, sys: 2.28 ms, total: 415 ms
Wall time: 9.5 s


[Row(sum(is_valid)=40000)]