In [1]:
# This should be set by Pyspark
from pyspark.sql import SparkSession
spark = SparkSession.builder.config("spark.ui.enabled", False).getOrCreate()

In [2]:
# Initialise test environment
from dapla.magics import DaplaLineageMagics
from IPython import get_ipython
ipython = get_ipython()
import requests
import json


# Lineage template provider can either call a local endpoint or read from attached json file
use_mock_response = True

def lineage_template_provider(output_schema, input_schema_map):
    def mapper(x):
        return (x[0], {
            "schema": x[1]['schema'],
            "schemaType": "SPARK",
            "timestamp": x[1]['timestamp'],
        })
    request = {
        "schema": output_schema['schema'],
        "timestamp": output_schema['timestamp'],
        "schemaType": "SPARK",
        "simpleLineage": False,
        "dependencies": [dict(map(mapper, input_schema_map.items()))],
    }
    if use_mock_response:
        with open('lineage-template.json', 'r') as f:
            return json.load(f)
    else:
        response = requests.post('http://localhost:10190/lineage/template', json=request,
                                 headers={
                                 }, allow_redirects=False)
        return response.json()
    
# Register dapla magics manually
magics = DaplaLineageMagics(ipython, lineage_template_provider)
ipython.register_magics(magics)

In [3]:
from pyspark.sql.types import *

# Create 3 test dataframe
person_type = StructType([
    StructField('personidentifikator', StringType()),
    StructField('kontonummer', StringType())])
person_data = [
    ('1234', '12345555'),
    ('1235', '12347777'),
]
person = spark.createDataFrame(person_data, person_type)

unrelated_type = StructType([
    StructField('weird', StringType()),
    StructField('stuff', StringType())])
unrelated = spark.createDataFrame([], unrelated_type)


konto_type = StructType([
    StructField('kontonummer', StringType()),
    StructField('innskudd', IntegerType())])
konto_data = [
    ('12345555', 25000),
    ('12347777', 120000),
]
konto = spark.createDataFrame(konto_data, konto_type)

# Create a 3rd dataframe based on the other two
innskudd = person.join(konto, 'kontonummer', how='inner')

In [4]:
%%input
/skatt/person
/skatt/konto
/skatt/unrelated

In [5]:
%%output
/skatt/innskudd

In [6]:
# This will be done automatically by spark.read.path

%on_input_load /skatt/person 1111 {person.schema.json()}
%on_input_load /skatt/konto 1111 {konto.schema.json()}
%on_input_load /skatt/unrelated 1111 {unrelated.schema.json()}

In [7]:
# This will be done automatically by spark.write.path

%on_output_save /skatt/innskudd 1111 {innskudd.schema.json()}

In [8]:
# This is for debug purposes

%lineage_tree

Input datasets:
 |-- /skatt/person (loaded)
 |-- /skatt/konto (loaded)
 |-- /skatt/unrelated (loaded)
Output datasets:
 |-- /skatt/innskudd


In [9]:
# Show GUI for mapping lineage fields

%lineage innskudd

VBox(children=(Accordion(children=(VBox(children=(HTML(value='<style>.widget-checkbox-label-bold > label > spa…

In [10]:
# This will be the lineage output from GUI

innskudd.lineage

{'lineage': {'name': 'spark_schema',
  'type': 'structure',
  'fields': [{'name': 'kontonummer',
    'type': 'inherited',
    'confidence': 0.9,
    'sources': [{'field': 'kontonummer',
      'path': '/skatt/person',
      'version': 1600430243658},
     {'field': 'kontonummer',
      'path': '/skatt/konto',
      'version': 1600430243658}]},
   {'name': 'personidentifikator',
    'type': 'inherited',
    'confidence': 0.9,
    'sources': [{'field': 'personidentifikator',
      'path': '/skatt/person',
      'version': 1600430243658}]},
   {'name': 'innskudd',
    'type': 'inherited',
    'confidence': 0.9,
    'sources': [{'field': 'innskudd',
      'path': '/skatt/konto',
      'version': 1600430243658}]}],
  'sources': [{'path': '/skatt/person', 'version': 1600430243658},
   {'path': '/skatt/konto', 'version': 1600430243658},
   {'path': '/skatt/unrelated', 'version': 1600430243658}]}}

In [11]:
# Return simple lineage template by path

%lineage_json --path /skatt/innskudd

{'lineage': {'name': 'spark_schema',
  'type': 'structure',
  'fields': [{'name': 'kontonummer',
    'type': 'inherited',
    'confidence': 0.9,
    'sources': [{'field': 'kontonummer',
      'path': '/skatt/person',
      'version': 1600430243658},
     {'field': 'kontonummer',
      'path': '/skatt/konto',
      'version': 1600430243658}]},
   {'name': 'personidentifikator',
    'type': 'inherited',
    'confidence': 0.9,
    'sources': [{'field': 'personidentifikator',
      'path': '/skatt/person',
      'version': 1600430243658}]},
   {'name': 'innskudd',
    'type': 'inherited',
    'confidence': 0.9,
    'sources': [{'field': 'innskudd',
      'path': '/skatt/konto',
      'version': 1600430243658}]}],
  'sources': [{'path': '/skatt/person', 'version': 1600430243658},
   {'path': '/skatt/konto', 'version': 1600430243658},
   {'path': '/skatt/unrelated', 'version': 1600430243658}]}}