# Guidance on data provenance/lineage

A key aspect in D4U is the data lineage between listings (core, basic etc). Each record is uniquely identified with a set a columns (aka data provenance) and we know its record/table origin, if any.

The system not only expects these columns but also consistency overtime in the data lineage. If not, saving a listing to the Silver layer will fail.

We'll discuss below two concrete scenarios where this could happen: using non-deterministic code and aggregating data.

### 1) Non-deterministic code

A non-deterministic code is a program that does not produce the same outcome in each run. 

The issue arises with operations like dropDuplicates without a consistent order, leading to different outputs in each run. To ensure deterministic results, you need to sort the DataFrame by all relevant columns, including any unique identifiers. By adding a sorting step before deduplication, you can guarantee consistent outcomes across multiple executions.

If your code is non-deterministic, saving the dataframe to the Silver layer will probably fail.

Consider this df:

| SUBJID | PARAM | AGE | REC_ID        |
|--------|-------|-----|---------------|
| 1      | test1 | 28  | 784574124541  |
| 1      | test2 | 28  | 163746845476  |
| 2      | test1 | 35  | 8132687656497 |
| 2      | test2 | 35  | 687423121313  |

Say you want to use dropDuplicates() and keep only SUBJID, AGE and REC_ID. 

To save to Silver, you need to carry the data provenance columns (i.e., REC_ID in this fictional example) and make sure the REC_ID values are consistent overtime. Without sorting before dropDuplicates() you cannot guarantee that the same record will be consistently kept, which will prevent from saving to Silver.

In [0]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
from pyspark.sql import functions as F
from pyspark.sql import Window, Row
from datetime import date

In [0]:
# To showcase the issue, in the next cells I'll create a dataframe, inflate it, shuffle it and perform non-deterministic operations on several iterations. 
# The following is a function that calculate the MD5 checksum (somekind of a fingerprint) of a PySpark dataframe. I'll use that for comparing outcomes.

import hashlib

def calculate_md5(df):
    ''' 
    Takes a PySpark dataframe and returns its MD5 sum.
    First we convert to Pandas df, then to a string, finally we calculate the MD5 sum.
    '''

    df_string = df.toPandas().to_string(index=False)

    return hashlib.md5(df_string.encode()).hexdigest()

In [0]:
# Let's create some 'large' test data, as we are more likely to see the problem on a large dataset  

# Example data
data    = [("John", 34), ("Bob", 40), ("John", 53), ("Bob", 45)]
columns = ["Name", "Age"]
df = spark.createDataFrame(data, columns)

# Inflate the dataset x1000 by adding an ID column (values: 0 to 999)
inflated_df = df.crossJoin(spark.range(0, 1000))

# Shuffle the dataframe
inflated_df = inflated_df.orderBy(F.rand())

inflated_df.display()

In [0]:
# dropDuplicates() is a non-deterministic operation, as it depends on the order of the rows. Let's demonstrate this.

def perform_operations(
    df,
    sort_beforehand: bool
):
    
    if sort_beforehand:
        df = df.orderBy("Name", 'ID')

    md5_checksums = []

    # Let's iterate 10 times, to see if we get the same MD5 sum each time
    for _ in range(10):

        # Perform non-deterministic operations
        df_dedup = df.dropDuplicates(subset=["Name"])
        
        # Calculate MD5 checksum
        md5_checksum = calculate_md5(df_dedup)
        md5_checksums.append(md5_checksum)

    # Display the MD5 checksums
    for i, checksum in enumerate(md5_checksums):
        print(f"Iteration {i+1}: MD5 Checksum = {checksum}")


In [0]:
# Let's run the operations without sorting
perform_operations(inflated_df, sort_beforehand=False)

# See below? Different MD5 sums. Not good!

In [0]:
# Let's run the operations with sorting
perform_operations(inflated_df, sort_beforehand=True)

# See below? Always the same sum. Much better!

**Take home message**: remember to sort your data before <code>dropDuplicates()</code>, <code>first</code>, <code>last</code> or any other operation that requires uniqueness.

### 2) Data provenance when aggregating data 

In [0]:
# When we aggregate data, we can lose the data provenance. Without it, standard Marvel listings will fail, specifically addDataProvenance().
# Say we want to add to DM domain the average HR by subject, coming from VS. We want to push this later on to the silver layer.

basic_DM = spark.createDataFrame([
    Row(SUBJID='1', AGE=25, SEX='M', DM_URECID='1657912664161'),
    Row(SUBJID='2', AGE=44, SEX='F', DM_URECID='9873131643217'),
])

basic_VS = spark.createDataFrame([
    Row(SUBJID='1', VISIT=1, HR=80,  VS_URECID='103241324165'),
    Row(SUBJID='1', VISIT=2, HR=85,  VS_URECID='237566541315'),
    Row(SUBJID='2', VISIT=1, HR=120, VS_URECID='787465415615'),
    Row(SUBJID='2', VISIT=2, HR=75,  VS_URECID='023413354787'),
])

In [0]:
# Aggregating the data using groupBy/agg will not keep the data provenance. We do NOT recommend using it in that scenario
VS_summary = basic_VS.groupBy("SUBJID").agg(F.mean("HR").alias("Mean_HR"))

# See, at this point, data provenance (VS_URECID in my example) was lost:
VS_summary.display()

In [0]:
# Aggregating the data using partitionBy is the best way here, since we don't lose the data provenance. 

window = Window.partitionBy('SUBJID')
basic_VS = basic_VS.withColumn(
    'HR_mean',
    F.mean("HR").over(window)
)

basic_VS.display()

In [0]:
# If we need to keep only one record, we can always use dropDuplicates(), but remember to sort before to avoid non-deterministic issues:
basic_VS.orderBy('VS_URECID').dropDuplicates(['SUBJID']).display()

# We can merge this onto DM by SUBJID, and we'll have provenance from both DM and VS.

In [0]:
# In some other scenarios (e.g. pivot/unpivot), it won't be possible to use partitionBy. In that case, we need to bring back arbritrary provenance. 
# Let's filter VS to keep only one record for each SUBJID.

# This is one way. Create a rank column, and filter on it.
VS_unique  = basic_VS.withColumn('rank', F.rank().over(Window.partitionBy('SUBJID').orderBy(['SUBJID', 'VS_URECID']))) # Deterministic sort, otherwise the result will not be deterministic and saving to silver layer will fail!
VS_unique = VS_unique.filter(F.col('rank') == 1).drop('rank')

# We have provenance info
VS_unique.display()

In [0]:
# Merge the summary against this nodup VS:
df = VS_unique.join(VS_summary, "SUBJID", "left")

# We have now a summary with data provenance, though arbritrary. And we can merge this onto DM, and get data provenance from both domains.
df.display()