# Batch jobs and the Data Subjects Service
This blog post describes an interaction with batch jobs on StrmPrivacy, and the kind of data we would see in the 'Data Subjects Service'. The example uses the well known [UCI online retail dataset](https://archive.ics.uci.edu/ml/datasets/online+retail) that provides online shopping details.

In [128]:
!pip install tabulate

Collecting tabulate
  Downloading tabulate-0.8.10-py3-none-any.whl (29 kB)
Installing collected packages: tabulate
Successfully installed tabulate-0.8.10


In [95]:
import pandas as pd

In [96]:
df = pd.read_excel("https://archive.ics.uci.edu/ml/machine-learning-databases/00352/Online%20Retail.xlsx",
                  dtype=str)
df['InvoiceDate'] = pd.to_datetime(df['InvoiceDate'])
df['UnitPrice'] = df['UnitPrice'].astype(float)

df.head()

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850,United Kingdom
1,536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850,United Kingdom
2,536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850,United Kingdom
3,536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01 08:26:00,3.39,17850,United Kingdom
4,536365,84029E,RED WOOLLY HOTTIE WHITE HEART.,6,2010-12-01 08:26:00,3.39,17850,United Kingdom


## preparation
Before you can run the rest of the notebook, you
1. must prepare a [data-connector](https://docs.strmprivacy.io/docs/latest/quickstart/batch/data-connectors/) that allows access to some cloud bucket like S3, gcloud or Azure
2. make sure you have access to the files created by the batch-job on that bucket. In this example I use
   a _mounted_ s3 bucket with [`s3fs`](https://github.com/s3fs-fuse/s3fs-fuse). Similar tools exist for [gcloud](https://github.com/GoogleCloudPlatform/gcsfuse) and [Azure](https://github.com/Azure/azure-storage-fuse)
   
We have prepared a dataconnector named `s3-batch-demo` that points to an AWS S3 bucket
```
strm get data-connector s3-batch-demo
 NAME            TYPE               TARGET NAME

 s3-batch-demo   Amazon S3 Bucket   strm-batch-demo
```
You could ofcourse use any of the supported data-connector types. See [the data-connector documentation](https://docs.strmprivacy.io/docs/latest/quickstart/batch/data-connectors/) for how to create one.

I have mounted my s3 bucket in a subdirectory named `s3`

    s3fs strm-batch-demo s3

Since Strm batch processing currently only supports csv files we convert the dataframe and store it in the S3 bucket.

In [97]:
df.to_csv("s3/uci_online_retail.csv", index=False)

As a sanity check we read back the top of the csv file. It looks okay.

In [114]:
! head -5 s3/uci_online_retail.csv

InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country
536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00,2.55,17850,United Kingdom
536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00,3.39,17850,United Kingdom
536365,84406B,CREAM CUPID HEARTS COAT HANGER,8,2010-12-01 08:26:00,2.75,17850,United Kingdom
536365,84029G,KNITTED UNION FLAG HOT WATER BOTTLE,6,2010-12-01 08:26:00,3.39,17850,United Kingdom


# The data contract
In order to be able to process the rows in the csv file, these rows need to conform to a _data contract_. This is a combination of a _schema_ and an _event-contract_.

The schema defines the _shape_ of the rows, in this case it pretty much matches csv column headers. The event-contract defines the rules that govern the event. Together these two are called the _datacontract_.

**These steps where done by us so you don't have to do them**
```
strm create schema strmprivacy/online-retail/1.0.0 --definition=online-retail.json --public
strm activate schema strmprivacy/online-retail/1.0.0
strm create event-contract strmprivacy/online-retail/1.0.0 --public \
    -F online-retail-contract.json -S strmprivacy/online-retail/1.0.0
strm activate event-contract strmprivacy/online-retail/1.0.0
```
With schema

```online-retail.json
{
  "name": "UCI Online Retail",
  "nodes": [
    { "type": "STRING", "name": "InvoiceNo" },
    { "type": "STRING", "name": "StockCode" },
    { "type": "STRING", "name": "Description" },
    { "type": "INTEGER", "name": "Quantity" },
    { "type": "STRING", "name": "InvoiceDate" },
    { "type": "STRING", "name": "UnitPrice" },
    { "type": "STRING", "name": "CustomerID" },
    { "type": "STRING", "name": "Country" }
  ]
}
```

and event contract

```online-retail-contract.json
{
  "keyField": "CustomerID",
  "piiFields": { "CustomerID": 1 },
  "dataSubjectField": "CustomerID"
}
```


## Configuring the batch job.
In order to define a batch job, we need to

1. figure out the bucket locations and access. This is a matter of setting up the correct [data-connector](https://docs.strmprivacy.io/docs/latest/quickstart/batch/data-connectors/).
2. figuring out the timestamp format. This uses
   [Java time format](https://docs.oracle.com/javase/8/docs/api/java/time/format/DateTimeFormatter.html)
3. determine the consent that was given, per batch job or even per record
4. write the _batch job configuration file_. This file has a json-schema defined format; if your text editor knows about json schema, it can use `strmprivacy-batch-job-configuration` from [schemastore.org](https://schemastore.org)
   
   
### the time format
The way we converted the Excel file to csv *changed the timestamp format*. Make sure you look at the timestamp in the csv file.
A sample timestamp in the UCI *csv* file is `2010-12-01 08:26:00`. This suggests the following format pattern: `yyyy-MM-dd HH:mm:ss`. Since this timestamp has no timezone information, it's necessary to add a default timezone. Look [here](https://en.wikipedia.org/wiki/List_of_tz_database_time_zones#List) for a list.

If you get the format *wrong*, you'll only notice after the batch job has started:

```
strm list batch-jobs
 BATCH JOB ID                           TIMESTAMP                           STATE   DETAILS                                           
 b2cead50-f85b-42cd-9198-53d7caa998e0   2022-09-13 08:24:03.226 +0000 UTC   ERROR   Invalid timestamp [Text '2010-12-01 08:26:00' could not be parsed at index 4] in row #1
```

### Consent
The batch jobs _can_ extract consent per record using pattern matching, but we will leave this for another blog post. In the sample dataset, such information is absent, and we provide a bulk level consent to all the records in the whole file. In the sample below we've defined

        "consent": { "default_consent_levels": [ 2 ] },

whatever the `2` means in your organization. See [our documentation](https://docs.strmprivacy.io/docs/latest/concepts/batch-jobs/#consent) for more information about consent.

# the final configuration file


```batch-job-config.json
{
    "source_data": {
      "data_connector_ref": { "name": "s3-batch-demo"},
      "file_name": "uci_online_retail.csv",
      "data_type": { "csv": { "charset": "UTF-8" } }
    },
    "consent": { "default_consent_levels": [ 2 ] },
    "encryption": {
      "batch_job_group_id": "7824e975-20e1-4995-b129-2f9582728ca5",
      "timestamp_config": {
        "field": "InvoiceDate",
        "format": "yyyy-MM-dd HH:mm:ss",
        "default_time_zone": { "id": "Europe/Amsterdam" }
      }
    },
    "event_contract_ref": {
      "handle": "strmprivacy",
      "name": "online-retail",
      "version": "1.0.0"
    },
    "encrypted_data": {
      "target": {
        "data_connector_ref": { "name": "s3-batch-demo"},
        "data_type": { "csv": { "charset": "UTF-8" } },
        "file_name": "uci_online_retail/encrypted.csv"
      }
    },
    "encryption_keys_data": {
      "target": {
        "data_connector_ref": { "name": "s3-batch-demo"},
        "data_type": { "csv": { "charset": "UTF-8" } },
        "file_name": "uci_online_retail/keys.csv"
      }
    },
    "derived_data": [      {
        "target": {
          "data_connector_ref": { "name": "s3-batch-demo"},
          "data_type": { "csv": { "charset": "UTF-8" } },
          "file_name": "uci_online_retail/decrypted-0.csv"
        },
        "consent_levels": [ 2 ],
        "consent_level_type": "CUMULATIVE"
      }

    ]
  }
  ```

# Executing the batch job

```
strm create batch-job -F batch-job-config.json
# shows a uuid that you use to access its progress.
strm get batch-job <uuid>
```
I like to do the following to see how the batch job is progressing
```
watch "strm get batch-job <uuid> -o json  | jq '.batchJob.states  | last'"
```
which gives you somewhat realtime information on how the job is progressing. When finished, you should see something like

```
{
  "stateTime": "2022-09-13T10:06:47.113Z",
  "state": "FINISHED",
  "message": "Processed 541909 records in 194 s. which is 2790 records/s."
}
```
Once the job has finished, you'll see output files in your bucket

In [116]:
!ls -lRh s3

s3:
total 46M
drwxr-x--- 1 bvdeenen bvdeenen   0 Jan  1  1970 uci_online_retail
-rw-r--r-- 1 bvdeenen bvdeenen 46M Sep 13 15:36 uci_online_retail.csv

s3/uci_online_retail:
total 234M
-rw-r----- 1 bvdeenen bvdeenen 106M Sep 13 15:17 decrypted-0.csv
-rw-r----- 1 bvdeenen bvdeenen 122M Sep 13 15:17 encrypted.csv
-rw-r----- 1 bvdeenen bvdeenen 7.0M Sep 13 15:17 keys.csv


We see that the encrypted and decrypted files are substantially bigger than the original file. This is due to the much longer encrypted string than the original five character `CustomerId`, as well as the information in `strmMeta`. The good thing is that csv is a very inefficient format, and these kinds of data are typically stored in a columnar format, with compression. Just a regular `zip` compression of the 106MB decrypted file brings it down to 11MB.

# Parsing the encrypted dataset

In [118]:
import json
import pandas as pd
import numpy as np
import time
import pytz
import datetime
config = json.loads(open("batch-job-config.json","r").read())
config_timezone = config["encryption"]["timestamp_config"]["default_time_zone"]["id"]

# The batch job configuration specified that the localtime InvoiceDate fields should be interpreted
# in the timezone Europe/Amsterdam
config_timezone = pytz.timezone(config_timezone)
print("loaded timezone", config_timezone, "from config file")

# The date parser to convert the millisecond since the epoch timestamp in `strmMeta.timestamp
# to a datetime instance. This is not strictly necessary but nice to have
def date_parser(string_list):
    # Java ms epoch to datetime
    return [datetime.datetime.fromtimestamp(float(x)*0.001).astimezone(config_timezone) for x in string_list]


loaded timezone Europe/Amsterdam from config file


In [100]:
encrypted = pd.read_csv("s3/uci_online_retail/encrypted.csv", dtype=str,
                        keep_default_na=False, parse_dates=['strmMeta.timestamp'], date_parser=date_parser)
encrypted['InvoiceDate'] = pd.to_datetime(encrypted['InvoiceDate']).dt.tz_localize(config_timezone)
encrypted.head(2)

Unnamed: 0,InvoiceNo,StockCode,Description,Quantity,InvoiceDate,UnitPrice,CustomerID,Country,strmMeta.eventContractRef,strmMeta.nonce,strmMeta.timestamp,strmMeta.keyLink,strmMeta.billingId,strmMeta.consentLevels
0,536365,85123A,WHITE HANGING HEART T-LIGHT HOLDER,6,2010-12-01 08:26:00+01:00,2.55,ASqc1Q0QalDEN+LHeyZSfGHE+s9Lqu4o+jM=,United Kingdom,strmprivacy/online-retail/1.0.0,0,2010-12-01 08:26:00+01:00,0fd20015-40e4-484d-ab1f-182acff382ac,,[2]
1,536365,71053,WHITE METAL LANTERN,6,2010-12-01 08:26:00+01:00,3.39,ASqc1Q0QalDEN+LHeyZSfGHE+s9Lqu4o+jM=,United Kingdom,strmprivacy/online-retail/1.0.0,0,2010-12-01 08:26:00+01:00,0fd20015-40e4-484d-ab1f-182acff382ac,,[2]


note the encrypted `CustomerID` and the `strmMeta...` columns.
* `strmMeta.billingId` is no longer used, and not filled in.
* `strmMeta.consentLevels` is the consent for this particular record.
  In this case all the records have the same consent but this is not necessarily the case.
* `strmMeta.keyLink` is the uuid that allows us to find the encryption key that was used to encrypt the
  PII columns in this event.

# Encryption keys
The exported encryption keys were read from the keys export file in the bucket. These are all the keys that were created or re-used for processing this job

In [101]:
keys = pd.read_csv("s3/uci_online_retail/keys.csv")
keys.head(2)

Unnamed: 0,keyLink,encryptionKey
0,0fd20015-40e4-484d-ab1f-182acff382ac,"{""primaryKeyId"":714921229,""key"":[{""keyData"":{""..."
1,1de9b1fc-9ad5-43a2-b501-af6933624f67,"{""primaryKeyId"":1369926497,""key"":[{""keyData"":{..."


The `encryptionKey` column is a [Google Tink](https://developers.google.com/tink) format encryption key

In [120]:
json.loads(keys.iloc[0]['encryptionKey'])

{'primaryKeyId': 714921229,
 'key': [{'keyData': {'typeUrl': 'type.googleapis.com/google.crypto.tink.AesSivKey',
    'value': 'EkBKGzBuy9C3UUmWaOzpe7NBEg6QK21FRhZ9MjuD5hpa0+hPJy0kn1HngA9QUT5aGbTNQQyow0V6qJCFoFRQNNTH',
    'keyMaterialType': 'SYMMETRIC'},
   'status': 'ENABLED',
   'keyId': 714921229,
   'outputPrefixType': 'TINK'}]}

# Decrypting records
In this part of the notebook I'll decrypt the encrypted dataset within this notebook.

In [102]:
!pip install tink
import tink
from tink import daead, cleartext_keyset_handle
daead.register()



In [103]:
class Decrypter():
    def __init__(self, encryptionKey):
        # create a primitive from a json cleartext encryption key
        reader = tink.JsonKeysetReader(encryptionKey)
        self.prim = cleartext_keyset_handle.read(reader).primitive(daead.DeterministicAead)
    def decrypt(self, cipher_text):
        return self.prim.decrypt_deterministically(base64.b64decode(cipher_text),b'').decode('utf-8')
    

In [104]:
# create the decrypters table
decrypters=keys.set_index('keyLink')
decrypters['decrypter'] = decrypters['encryptionKey'].apply(lambda e: Decrypter(e))
decrypters.head(2)

Unnamed: 0_level_0,encryptionKey,decrypter
keyLink,Unnamed: 1_level_1,Unnamed: 2_level_1
0fd20015-40e4-484d-ab1f-182acff382ac,"{""primaryKeyId"":714921229,""key"":[{""keyData"":{""...",<__main__.Decrypter object at 0x7f64018164f0>
1de9b1fc-9ad5-43a2-b501-af6933624f67,"{""primaryKeyId"":1369926497,""key"":[{""keyData"":{...",<__main__.Decrypter object at 0x7f6401816ee0>


In [105]:
# Test a decrypter instance
customerId = df.iloc[0].CustomerID
decrypter = decrypters.loc[encrypted.iloc[0]['strmMeta.keyLink']].decrypter
print("original:", customerId, "decrypted:", decrypter.decrypt(encrypted.iloc[0].CustomerID))


original: 17850 decrypted: 17850


In [121]:
# create a merged table of the encrypted values and the encryption keys
encrypted_and_keys = encrypted.merge(keys, how='left', left_on = 'strmMeta.keyLink', right_on='keyLink')

In [123]:
# decrypt every 'CustomerID' to 'decryptedCustomerId'
def decrypt_row(row):
    return decrypters.loc[row['strmMeta.keyLink']]['decrypter'].decrypt(row['CustomerID'])
encrypted_and_keys['decryptedCustomerId'] = encrypted_and_keys.apply( decrypt_row, axis=1)

In [125]:
# sanity check
encrypted_and_keys['decryptedCustomerId'].head()

0    17850
1    17850
2    17850
3    17850
4    17850
Name: decryptedCustomerId, dtype: object

# The Data subject service

The DSS is a service that provides links to all the different encryption keys that were generated for a certain _data-subject_ (like a customer id). Encryption keys last 24 hours, so if a certain customer has shopped on multiple days in the dataset, we should find multiple key links

In [106]:
!strm list data-subjects --page-size 3 

CgUxMjM0OA==

12346
12348


The first line is the `next-page-token` that you can use for the next _page_ of data-subjects. There can be _many data subjects_, say as many as you have customers. When this token is empty, you've reached the last page of data subjects.
Also interesting is the empty line, which is actually all the _unknown_ customers, that had an empty string as `CustomerID`. All these were seen as the _same entity_ and encrypted with the same key.

In [107]:
!strm list data-subjects --page-token CgUxMjM0Nw== --page-size 3

CgUxMjM1MA==
12348
12349
12350


Assume you want to know what key links were ever generated for a certain data-subject, you execute a `list data-subject-keylinks` request for one or more data subjects

In [111]:
!strm list data-subject-keylinks 12346 12348

 DATASUBJECT   KEYLINK                                EXPIRY                          
                                                                                
 12346         863cd2bf-60f7-4047-84b6-4d9f08e9868c   2011-01-19T01:00:00.000000+0100 
 12348         925a0397-628d-4495-9e60-c9d772677417   2010-12-17T01:00:00.000000+0100 
 12348         3ef801c7-8aec-49a5-89c4-f2a1543a9953   2011-01-26T01:00:00.000000+0100 
 12348         d96de0e7-36d0-4530-996a-a49ee553e5da   2011-04-06T02:00:00.000000+0200 
 12348         0039a177-37a9-4447-81da-dc3d1d36fcdc   2011-09-26T02:00:00.000000+0200 


So we see that datasubject 12346 has been active on one day, and 12347 on four different days in this dataset. In case of a Right To Be Forgotten request, the organization would have to
1. call the DSS to get all the keylinks associated with this datasubject
2. remove encryption keys from a keys database in the organization. Hopefully, the keys export file has long ago
   been deleted! If the organization has never stored any derived datasets with decrypted data then it has complied
   with the RTBF request. This mechanism is called _crypto-shredding_.
3. it would be useful to delete the datasubject from the DSS

In [112]:
!strm delete data-subjects 12348

4


In [113]:
!strm list data-subject-keylinks 12346 12348

 DATASUBJECT   KEYLINK                                EXPIRY                          
                                                                                
 12346         863cd2bf-60f7-4047-84b6-4d9f08e9868c   2011-01-19T01:00:00.000000+0100 


Note that deleting a datasubject from the DSS _does not delete it from your organization_. How could it? It doesn't even have access to where you would store the encryption keys.