In [1]:
from pyspark.sql import SparkSession

def _spark_context():
    'Creates a local spark context'

    return SparkSession.builder \
      .master('local') \
      .appName('syllabus') \
      .getOrCreate()

SPARK = _spark_context()
SPARK

22/06/21 09:30:11 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [76]:
import json

from pyspark.sql import DataFrame
from pyspark.sql import functions as F

from pygments import highlight
from pygments.lexers import JsonLexer
from pygments.formatters import TerminalTrueColorFormatter

def ppj(j, indent=2):
    print(highlight(
        code      = json.dumps(json.loads(j), indent=indent),
        lexer     = JsonLexer(),
        formatter = TerminalTrueColorFormatter()
    ).strip())
   
def ppd(d, indent=2):
    ppj(json.dumps(d), indent=indent)

def count_nulls(df: DataFrame) -> int:
    return df.select(
        sum([F.count(F.when(F.col(c).isNull(), c)).alias(c) for c in df.columns])
    ).collect()[0][0]

def count_cells(df: DataFrame) -> int:
    return df.count() * len(df.columns)

class DFLoader:
    @staticmethod
    def from_file(records: list, fpath: str) -> DataFrame:
        with open(fpath, 'w') as ostream:
            for record in records:
                print(json.dumps(record), file=ostream, end='\n')
        df = SPARK.read.json(fpath)
        df.show()
        print('cells', count_cells(df), '/', 'nulls', count_nulls(df))
        print(ppj(df.schema.json()))
        return df

In [69]:
records = [
    { "a": "a", "c": "d" },
    { "a": "b" },
    { "a": "c" },
    { "a": 1, "d": "z" }
]
df = DFLoader.from_file(records, 'f.ndjson')
count_nulls(df)

+---+----+----+
|  a|   c|   d|
+---+----+----+
|  a|   d|null|
|  b|null|null|
|  c|null|null|
|  1|null|   z|
+---+----+----+

cells 12 / nulls 6
{
[38;2;187;187;187m  [39m[38;2;0;128;0;01m"fields"[39;00m:[38;2;187;187;187m [39m[
[38;2;187;187;187m    [39m{
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"metadata"[39;00m:[38;2;187;187;187m [39m{},
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"name"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"a"[39m,
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"nullable"[39;00m:[38;2;187;187;187m [39m[38;2;0;128;0;01mtrue[39;00m,
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"type"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"string"[39m
[38;2;187;187;187m    [39m},
[38;2;187;187;187m    [39m{
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"metadata"[39;00m:[38;2;187;187;187m [39m{},
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"name"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"c"[39m,
[38;2

6

In [70]:
records = [
    { "a": 1, "c": "d" },
    { "a": "b" },
    { "a": "c" },
    { "a": "123", "d": "z" }
]
df = DFLoader.from_file(records, 'f.ndjson')

+---+----+----+
|  a|   c|   d|
+---+----+----+
|  1|   d|null|
|  b|null|null|
|  c|null|null|
|123|null|   z|
+---+----+----+

cells 12 / nulls 6
{
[38;2;187;187;187m  [39m[38;2;0;128;0;01m"fields"[39;00m:[38;2;187;187;187m [39m[
[38;2;187;187;187m    [39m{
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"metadata"[39;00m:[38;2;187;187;187m [39m{},
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"name"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"a"[39m,
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"nullable"[39;00m:[38;2;187;187;187m [39m[38;2;0;128;0;01mtrue[39;00m,
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"type"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"string"[39m
[38;2;187;187;187m    [39m},
[38;2;187;187;187m    [39m{
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"metadata"[39;00m:[38;2;187;187;187m [39m{},
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"name"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"c"[39m,
[38;2

In [160]:
records = [
    { "id": 123, "a": 1, "c": "d" },
    { "id": 122, "a": "b" },
    { "id": 111, "a": "c" },
    { "id": 234, "a": 1, "d": "z" }
]
df = DFLoader.from_file(records, 'f.ndjson')

+---+----+----+---+
|  a|   c|   d| id|
+---+----+----+---+
|  1|   d|null|123|
|  b|null|null|122|
|  c|null|null|111|
|  1|null|   z|234|
+---+----+----+---+



[Stage 76:>                                                         (0 + 1) / 1]

cells 16 / nulls 6
{
[38;2;187;187;187m  [39m[38;2;0;128;0;01m"fields"[39;00m:[38;2;187;187;187m [39m[
[38;2;187;187;187m    [39m{
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"metadata"[39;00m:[38;2;187;187;187m [39m{},
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"name"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"a"[39m,
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"nullable"[39;00m:[38;2;187;187;187m [39m[38;2;0;128;0;01mtrue[39;00m,
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"type"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"string"[39m
[38;2;187;187;187m    [39m},
[38;2;187;187;187m    [39m{
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"metadata"[39;00m:[38;2;187;187;187m [39m{},
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"name"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"c"[39m,
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"nullable"[39;00m:[38;2;187;187;187m [39m[38;2;0;128;0;01mtrue[39;00m,
[38;2;187;

                                                                                

In [180]:
records = [
    { "id": 123, "key": "a", "value": 1},
    { "id": 123, "key": "c", "value": "d" },
    { "id": 122, "key": "a", "value": "b" },
    { "id": 111, "key": "a", "value": "c" },
    { "id": 234, "key": "a", "value": 1 },
    { "id": 234, "key": "d", "value": "z" },
]
df = DFLoader.from_file(records, 'f.ndjson')

+---+---+-----+
| id|key|value|
+---+---+-----+
|123|  a|    1|
|123|  c|    d|
|122|  a|    b|
|111|  a|    c|
|234|  a|    1|
|234|  d|    z|
+---+---+-----+



[Stage 88:>                                                         (0 + 1) / 1]

cells 18 / nulls 0
{
[38;2;187;187;187m  [39m[38;2;0;128;0;01m"fields"[39;00m:[38;2;187;187;187m [39m[
[38;2;187;187;187m    [39m{
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"metadata"[39;00m:[38;2;187;187;187m [39m{},
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"name"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"id"[39m,
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"nullable"[39;00m:[38;2;187;187;187m [39m[38;2;0;128;0;01mtrue[39;00m,
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"type"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"long"[39m
[38;2;187;187;187m    [39m},
[38;2;187;187;187m    [39m{
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"metadata"[39;00m:[38;2;187;187;187m [39m{},
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"name"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"key"[39m,
[38;2;187;187;187m      [39m[38;2;0;128;0;01m"nullable"[39;00m:[38;2;187;187;187m [39m[38;2;0;128;0;01mtrue[39;00m,
[38;2;187

                                                                                

In [193]:
records = [
    { "id": 123, "a": "1", "c": "d" },
    { "id": 122, "a": "b" },
    { "id": 111, "a": "c" },
    { "id": 234, "a": "z", "d": "z" }
]

schema = {
    "type" : "object",
    "properties" : {
        "id" : {"type" : "integer"},
        "a": {"type": "string"},
    },
    "required": ["id"],
    "additionalProperties": False
}

print("--- records")
ppd(records)
print("--- schema")
ppd(schema)

--- records
[
[38;2;187;187;187m  [39m{
[38;2;187;187;187m    [39m[38;2;0;128;0;01m"id"[39;00m:[38;2;187;187;187m [39m[38;2;102;102;102m123[39m,
[38;2;187;187;187m    [39m[38;2;0;128;0;01m"a"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"1"[39m,
[38;2;187;187;187m    [39m[38;2;0;128;0;01m"c"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"d"[39m
[38;2;187;187;187m  [39m},
[38;2;187;187;187m  [39m{
[38;2;187;187;187m    [39m[38;2;0;128;0;01m"id"[39;00m:[38;2;187;187;187m [39m[38;2;102;102;102m122[39m,
[38;2;187;187;187m    [39m[38;2;0;128;0;01m"a"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"b"[39m
[38;2;187;187;187m  [39m},
[38;2;187;187;187m  [39m{
[38;2;187;187;187m    [39m[38;2;0;128;0;01m"id"[39;00m:[38;2;187;187;187m [39m[38;2;102;102;102m111[39m,
[38;2;187;187;187m    [39m[38;2;0;128;0;01m"a"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"c"[39m
[38;2;187;187;187m  [39m},
[38;2;187;187;187m  [39m{
[38;2;187;

In [194]:
from typing import Dict, List
from jsonschema.exceptions import ValidationError
from jsonschema.validators import Draft3Validator

def translate(df: dict) -> dict:
    v = Draft3Validator(schema)
    for record in df:
        for error in sorted(v.iter_errors(record), key=str):
            yield record, error

def parse_unexpected_keys(instance: dict, e: ValidationError) -> List[str]:
    '''
    "Additional properties are not allowed ('a' was unexpected)" -> ['a']
    "Additional properties are not allowed ('a', 'c' were unexpected)" -> ['a', 'c']
    '''
    return (
        list(map(
            # Strip the single quote from the outside of each key
            lambda x: x.strip("'"),
            # Strip the message of the beginning/end, 
            e.args[0]
            .lstrip('Additional properties are not allowed (')
            .rstrip(' was unexpected)')
            .rstrip(' were unexpected)')
            # split on comma e.g. "('a', 'c' was...)" -> ["'a'", "'b'"]
            .split(', ')
        ))
    )
            
for record, error in translate(records):
    ppd({
        'instance': record,
        'error': error.message,
        'keys': parse_unexpected_keys(record, error)
    })

{
[38;2;187;187;187m  [39m[38;2;0;128;0;01m"instance"[39;00m:[38;2;187;187;187m [39m{
[38;2;187;187;187m    [39m[38;2;0;128;0;01m"id"[39;00m:[38;2;187;187;187m [39m[38;2;102;102;102m123[39m,
[38;2;187;187;187m    [39m[38;2;0;128;0;01m"a"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"1"[39m,
[38;2;187;187;187m    [39m[38;2;0;128;0;01m"c"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"d"[39m
[38;2;187;187;187m  [39m},
[38;2;187;187;187m  [39m[38;2;0;128;0;01m"error"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"Additional properties are not allowed ('c' was unexpected)"[39m,
[38;2;187;187;187m  [39m[38;2;0;128;0;01m"keys"[39;00m:[38;2;187;187;187m [39m[
[38;2;187;187;187m    [39m[38;2;186;33;33m"c"[39m
[38;2;187;187;187m  [39m]
}
{
[38;2;187;187;187m  [39m[38;2;0;128;0;01m"instance"[39;00m:[38;2;187;187;187m [39m{
[38;2;187;187;187m    [39m[38;2;0;128;0;01m"id"[39;00m:[38;2;187;187;187m [39m[38;2;102;102;102m234[39m,
[3

In [195]:
from dataclasses import dataclass, field

@dataclass
class UnexpectedKeys:
    schema: dict
    unique_keys: set = field(default_factory=set)
    
    def __post_init__(self):
        self.validator = Draft3Validator(self.schema)

    @property
    def keys(self):
        return list(sorted(self.unique_keys))

    @staticmethod
    def parse_unexpected_keys(e: ValidationError) -> List[str]:
        '''
        "Additional properties are not allowed ('a' was unexpected)" -> ['a']
        "Additional properties are not allowed ('a', 'c' were unexpected)" -> ['a', 'c']
        '''
        return (
            list(map(
                # Strip the single quote from the outside of each key
                lambda x: x.strip("'"),
                # Strip the message of the beginning/end, 
                e.args[0]
                .lstrip('Additional properties are not allowed (')
                .rstrip(' was unexpected)')
                .rstrip(' were unexpected)')
                # split on comma e.g. "('a', 'c' was...)" -> ["'a'", "'b'"]
                .split(', ')
            ))
        )

    
    def check(self, instance: dict):
        for error in sorted(self.validator.iter_errors(instance), key=str):
            for key in UnexpectedKeys.parse_unexpected_keys(error):
                self.unique_keys.add(key)
            yield error

In [196]:
k = UnexpectedKeys(schema)
for record in records:
    for error in k.check(record):
        print(record, error)

{'id': 123, 'a': '1', 'c': 'd'} Additional properties are not allowed ('c' was unexpected)

Failed validating 'additionalProperties' in schema:
    {'additionalProperties': False,
     'properties': {'a': {'type': 'string'}, 'id': {'type': 'integer'}},
     'required': ['id'],
     'type': 'object'}

On instance:
    {'a': '1', 'c': 'd', 'id': 123}
{'id': 234, 'a': 'z', 'd': 'z'} Additional properties are not allowed ('d' was unexpected)

Failed validating 'additionalProperties' in schema:
    {'additionalProperties': False,
     'properties': {'a': {'type': 'string'}, 'id': {'type': 'integer'}},
     'required': ['id'],
     'type': 'object'}

On instance:
    {'a': 'z', 'd': 'z', 'id': 234}


In [197]:
k.unique_keys

{'c', 'd'}

In [220]:
k.unique_keys

def split_dict(d: dict, keys: list):
    dd, extras = {}, {}
    for k in d.keys():
        if k in keys:
            extras[k] = d[k]
        else:
            dd[k] = d[k]
    return dd, extras

def translate(df: list) -> list:
    uk = UnexpectedKeys(schema)
    for record in records:
        for error in uk.check(record):
            d, extras = split_dict(record, UnexpectedKeys.parse_unexpected_keys(error))
            if extras:
                for k, v in extras.items():
                    yield {
                        **d,
                        **{
                            "custom": [
                                {"key": k, "value": v}
                            ]
                        }
                    }
            else:
                yield {**d, **{"custom": []}}
            break
        else:
            yield {**record, **{"custom": []}}

In [221]:
print('--- records')
for record in records:
    ppd(record, indent=None)
print('--- translated')
for record in translate(records):
    ppd(record, indent=None)

--- records
{[38;2;0;128;0;01m"id"[39;00m:[38;2;187;187;187m [39m[38;2;102;102;102m123[39m,[38;2;187;187;187m [39m[38;2;0;128;0;01m"a"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"1"[39m,[38;2;187;187;187m [39m[38;2;0;128;0;01m"c"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"d"[39m}
{[38;2;0;128;0;01m"id"[39;00m:[38;2;187;187;187m [39m[38;2;102;102;102m122[39m,[38;2;187;187;187m [39m[38;2;0;128;0;01m"a"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"b"[39m}
{[38;2;0;128;0;01m"id"[39;00m:[38;2;187;187;187m [39m[38;2;102;102;102m111[39m,[38;2;187;187;187m [39m[38;2;0;128;0;01m"a"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"c"[39m}
{[38;2;0;128;0;01m"id"[39;00m:[38;2;187;187;187m [39m[38;2;102;102;102m234[39m,[38;2;187;187;187m [39m[38;2;0;128;0;01m"a"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"z"[39m,[38;2;187;187;187m [39m[38;2;0;128;0;01m"d"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"z"[39m}
--- tran

In [222]:
schema = {
    "type" : "object",
    "properties" : {
        "id" : {"type" : "integer"},
        "a": {"type": "string"},
        "custom": {
            "type": "array",
            "items": {
                "type": "object",
                "properties": {
                    "key": {"type": "string"},
                    "value": {"type": "string"},
                }
            }
        }
    },
    "required": ["id", "custom"],
    "additionalProperties": False
}

In [223]:
for record in translate(records):
    ppd(record, indent=None)
    validate(record, schema)

{[38;2;0;128;0;01m"id"[39;00m:[38;2;187;187;187m [39m[38;2;102;102;102m123[39m,[38;2;187;187;187m [39m[38;2;0;128;0;01m"a"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"1"[39m,[38;2;187;187;187m [39m[38;2;0;128;0;01m"custom"[39;00m:[38;2;187;187;187m [39m[{[38;2;0;128;0;01m"key"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"c"[39m,[38;2;187;187;187m [39m[38;2;0;128;0;01m"value"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"d"[39m}]}
{[38;2;0;128;0;01m"id"[39;00m:[38;2;187;187;187m [39m[38;2;102;102;102m122[39m,[38;2;187;187;187m [39m[38;2;0;128;0;01m"a"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"b"[39m,[38;2;187;187;187m [39m[38;2;0;128;0;01m"custom"[39;00m:[38;2;187;187;187m [39m[]}
{[38;2;0;128;0;01m"id"[39;00m:[38;2;187;187;187m [39m[38;2;102;102;102m111[39m,[38;2;187;187;187m [39m[38;2;0;128;0;01m"a"[39;00m:[38;2;187;187;187m [39m[38;2;186;33;33m"c"[39m,[38;2;187;187;187m [39m[38;2;0;128;0;01m"custom"[39