In [None]:
!pip install boto3
import boto3
import argparse
import pandas as pd
import random
import string
import time
from datetime import datetime
from tqdm import tqdm
import io
import re

# Creating a batch job

1. Sign up to STRM Privacy
2. Create a schema and corresponding contract or use a publicly available one
3. Create a Sink/Data Connector (we use AWS S3 bucket)
4. Generate (or use real) data
5. Send data to input sink/data connector
6. Run batch job and save to output sink/data connector (possibly same sink as input)
7. Fetch data from sink/data connector and inspect

## 1. Sign up to STRM Privacy
See the [authentication docs](https://docs.strmprivacy.io/docs/latest/quickstart/authentication-cli.html) to sign up to STRM Privacy.

## 2. Create Schema and Contract
See [schemas and contract docs](https://docs.strmprivacy.io/docs/latest/concepts/schemas-and-contracts.html).

## 3. Create Sink/Data Connector

In this tutorial we use an AWS S3 sink. Either use the [console](https://console.strmprivacy.io/sinks) to create a sink or follow the steps in quickstart [docs](https://docs.strmprivacy.io/docs/latest/quickstart/receiving-s3.html).

The `s3.json` should follow this structure:

```json
{ 
    "AccessKey": {
        "UserName": "your-username",
        "AccessKeyId": "***",
        "Status": "Active",
        "SecretAccessKey": "***",
        "CreateDate": "***"
    }
}
```

## 4. Generate data

Let's generate some random user data.

In [2]:
class DataGenerator:
    def __init__(self, nrows):
        self.nrows = nrows
        self.user_name = "strm_demo_user"
        self.session_id = 0
        self.reset_counter_and_session_size()
        
    def get_random_value(self, value_type, field_name, iter):
        if value_type == "STRING":
            return f"{field_name.split(' ')[0]}_" + ''.join(random.choice(string.ascii_letters) for x in range(8))
        elif value_type == "INT":
            return f"{random.randint(0, 1e15)}"
        elif value_type == "FLOAT":
            return f"{random.random()}"
        elif value_type == "USER_NAME":
            return self.user_name
        elif value_type == "SESSION_ID":
            self.counter += 1
            if self.counter > self.max_session:
                self.session_id += 1
                self.reset_counter_and_session_size()
            return f'session_{self.session_id}'
        elif value_type == "TIMESTAMP":
            stamp = time.time() - (1- iter/self.nrows) * (3600 * 50) + 60 * (random.random() - 0.5)
            return datetime.fromtimestamp(stamp).astimezone().strftime('%Y-%m-%d %H.%M.%S.%f:%z')
        elif value_type == "EMAIL":
            name = self.rstring(8)
            host = self.rstring(6)
            return f'{name}@{host}.com'
        elif value_type == "PLANE":
            return random.choice([0,1,2])
    
    def reset_counter_and_session_size(self):
        self.counter = 0
        self.max_session = random.randint(0.01 * self.nrows,.1 * self.nrows)
    
    def rstring(self, n):
      return ''.join(random.choice(string.ascii_lowercase) for x in range(n))
    
    def generate(self): 
      col_names = {
          "SessionId"     : "SESSION_ID",
          "UserName"      : "USER_NAME",
          "Timestamp"     : "TIMESTAMP",
          "Email"         : "EMAIL",
          "PublicFieldA"  : "STRING",
          "PublicFieldB"  : "FLOAT",
          "PrivateFieldA" : "STRING",
          "PrivateFieldB" : "INT",
          "PrivacyPlane"  : "PLANE"
      }

      df = pd.DataFrame(columns=col_names)
      df.to_csv('./databert-demo.csv', index=None)
      
      for i in tqdm(range(int(self.nrows))):
          row = {k:[self.get_random_value(v, k, i)] for k,v in col_names.items()}
          df = pd.DataFrame(data=row)
          df.to_csv('./databert-demo.csv', mode='a', header=False, index=None)
      df = pd.read_csv('./databert-demo.csv').sort_values(by=['Timestamp'])
      df.to_csv('./databert-demo.csv', header=True, index=None)
      return df

In [3]:
user = DataGenerator(20000)
df = user.generate()
df

100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 20000/20000 [00:09<00:00, 2122.78it/s]


Unnamed: 0,SessionId,UserName,Timestamp,Email,PublicFieldA,PublicFieldB,PrivateFieldA,PrivateFieldB,PrivacyPlane
1,session_0,strm_demo_user,2022-02-05 15.25.30.707138:+0100,umecumne@xruncp.com,PublicFieldA_LBCHdWNq,0.904420,PrivateFieldA_DzmXYcPW,642849641896515,1
5,session_0,strm_demo_user,2022-02-05 15.26.00.140558:+0100,mafbntkb@tzjluk.com,PublicFieldA_ONHdiJLT,0.493443,PrivateFieldA_dOLyHVRm,596546711050516,1
0,session_0,strm_demo_user,2022-02-05 15.26.07.138747:+0100,jqvahoke@dmeqkt.com,PublicFieldA_xrjYUSGp,0.691985,PrivateFieldA_XfCKSVpj,145867177206568,0
6,session_0,strm_demo_user,2022-02-05 15.26.19.214910:+0100,qctrngmk@bbxnbl.com,PublicFieldA_yjOvCrei,0.432503,PrivateFieldA_DoykEPuo,299128511071601,0
2,session_0,strm_demo_user,2022-02-05 15.26.20.184704:+0100,jncdqbkq@pokfsv.com,PublicFieldA_FAbuQBew,0.130002,PrivateFieldA_vJUFgsXA,286499902989227,1
...,...,...,...,...,...,...,...,...,...
19992,session_20,strm_demo_user,2022-02-07 17.24.50.719596:+0100,vvrvwgmf@zlqrzv.com,PublicFieldA_BhCAolvO,0.269577,PrivateFieldA_oVKSyDyr,969218083415797,1
19999,session_20,strm_demo_user,2022-02-07 17.25.18.796371:+0100,iyqjlwfx@flxeea.com,PublicFieldA_bnpztBAC,0.301898,PrivateFieldA_NDEaxDdS,180662556115155,0
19997,session_20,strm_demo_user,2022-02-07 17.25.20.306019:+0100,qwirnumb@tpgyhj.com,PublicFieldA_yigVptag,0.674296,PrivateFieldA_SOEnlDBN,134052846775075,0
19996,session_20,strm_demo_user,2022-02-07 17.25.26.461653:+0100,ajodneyy@yvxlrx.com,PublicFieldA_qryqEXId,0.818250,PrivateFieldA_rJUqIkxJ,23556565238408,1


## 5. Send data to input sink

Send the data to the S3 bucket.

In [4]:
class AwsProperties(object):
    aws_access_key_id = '***'
    aws_secret_access_key = '***'
    region = 'eu-central-1'
    bucket = 'databert' 

In [5]:
AWS = AwsProperties()
s3 = boto3.resource(
      service_name='s3',
      region_name=AWS.region,
      aws_access_key_id=AWS.aws_access_key_id,
      aws_secret_access_key=AWS.aws_secret_access_key
    )

In [6]:
resp = s3.Object('databert', 'databert-demo.csv').put(Body=open('./databert-demo.csv', 'rb'))

## 6. Run batch job
We'll be running a batch job from the CLI. We need to pass the configuration of the batch job as an argument. An example configuration can be found below.


```json
{
    "ref": {
      "billing_id": "your_billing_id"
    },
    "source_data": {
      "data_connector_ref": {
        "billing_id": "your_billing_id",
        "name": "databert-demo"
      },
      "file_name": "databert-demo.csv",
      "data_type": {
        "csv": {
          "charset": "UTF-8"
        }
      }
    },
    "consent": {
      "default_consent_levels": [
        0
      ],
      "consent_level_extractor": {
        "field": "PrivacyPlane",
        "field_patterns": {
          "1": {
            "consent_levels": [
              1
            ]
          },
          "2": {
            "consent_levels": [
              2
            ]
          }
        }
      }
    },
    "encryption": {
      "timestamp_config": {
        "field": "Timestamp",
        "format": "yyyy-MM-dd HH.mm.ss.nnnnnn:Z",
        "default_time_zone": {
          "id": "UTC"
        }
      },
      "batch_job_group_id": null
    },
    "event_contract_ref": {
      "handle": "databert-handle",
      "name": "batch_job_public",
      "version": "1.0.0"
    },
    "encrypted_data": {
      "target": {
        "data_connector_ref": {
          "billing_id": "your_billing_id",
          "name": "databert-demo"
        },
        "data_type": {
          "csv": {
            "charset": "UTF-8"
          }
        },
        "file_name": "databert-demo-encrypted.csv"
      }
    },
    "encryption_keys_data": {
      "target": {
        "data_connector_ref": {
          "billing_id": "your_billing_id",
          "name": "databert-demo"
        },
        "data_type": {
          "csv": {
            "charset": "UTF-8"
          }
        },
        "file_name": "databert-demo-encryption-keys.csv"
      }
    },
    "derived_data": [
      {
        "target": {
          "data_connector_ref": {
            "billing_id": "your_billing_id",
            "name": "databert-demo"
          },
          "data_type": {
            "csv": {
              "charset": "UTF-8"
            }
          },
          "file_name": "databert-demo-derived.csv"
        },
        "consent_levels": [
          2
        ],
        "consent_level_type": "CUMULATIVE",
        "masked_fields": {
          "field_patterns": {
            "databert-handle/batch_job_public/1.0.0": {
              "field_patterns": [
                "Email",
                "UserName"
              ]
            }
          }
        }
      }
    ]
  }
```

Now call the batch job: `strm create batch-job -F batch-job.json`

Wait for the job to finish. Status can be checked via `strm list batch-jobs`


## 7. Fetch and explore Data

Now we fetch the encrypted and derived data from the sink/data connector.
We expect from our data contract that the pii-fields `Email`, `PrivateFieldA` and `PrivateFieldB` are encrypted in the encrypted file.
We expect that the fields `Email` and `UserName` are masked with a hash in the derived file. Let's investigate: 

In [7]:
# Get objects from bucket
encrypted = s3.Object('databert', 'databert-demo-encrypted.csv').get()
df_encrypted = pd.read_csv(io.BytesIO(encrypted['Body'].read()))

encryption_keys = s3.Object('databert', 'databert-demo-encryption-keys.csv').get()
df_encryption_keys = pd.read_csv(io.BytesIO(encryption_keys['Body'].read()))

derived = s3.Object('databert', 'databert-demo-derived.csv').get()
df_derived = pd.read_csv(io.BytesIO(derived['Body'].read()))

In [8]:
df_encrypted

Unnamed: 0,SessionId,UserName,Email,PublicFieldA,PublicFieldB,PrivateFieldA,PrivateFieldB,PrivacyPlane,strmMeta.eventContractRef,strmMeta.nonce,strmMeta.timestamp,strmMeta.keyLink,strmMeta.billingId,strmMeta.consentLevels
0,session_0,strm_demo_user,AXZkUZVRdD1B5tBGOibsLddz6BTt2pEzq+3YJbYXwTckhR...,PublicFieldA_LBCHdWNq,0.904420,AXZkUZVdsbPWmmk6t1wSV1z8ESyRNbZdNiq5AddUaqzj6c...,AXZkUZUqnu1C/LIn4egdfJEH8v3byx4Z2nO6lwTg6YHCL0Ak,1,databert-handle/batch_job_public/1.0.0,0,1644071130000,ed6fe44f-cff8-43e1-afbe-b1a2e5769bbc,databert986673817,1
1,session_0,strm_demo_user,AXZkUZVjmG+sJLqfFsstvjKJkpnceTIcMKtvEtUzOO6WwL...,PublicFieldA_ONHdiJLT,0.493443,AXZkUZXl3SKeySAsqN+QQim28qexIQPqV/Y6a6t9wi5ONZ...,AXZkUZXqnL2NEbPhhSbJ+H3bUF04KDDOEWzRhU8PFo3gSJ7k,1,databert-handle/batch_job_public/1.0.0,0,1644071160000,ed6fe44f-cff8-43e1-afbe-b1a2e5769bbc,databert986673817,1
2,session_0,strm_demo_user,AXZkUZWER9guG7wuiwXwwdS2kQ0Ln1EPFpd/v5+sB5I2C5...,PublicFieldA_xrjYUSGp,0.691985,AXZkUZWWfkN/R8gIW1h8id1JHvn9J0sMJ6Sn2EX7ecOtjD...,AXZkUZVfqVRELB1AyZtAnJE7xzYzqtmJe/GIEg1WIrqEXJVx,0,databert-handle/batch_job_public/1.0.0,0,1644071167000,ed6fe44f-cff8-43e1-afbe-b1a2e5769bbc,databert986673817,0
3,session_0,strm_demo_user,AXZkUZWe5UpLOlzta90VHrtm5w4UwQ7R3O2UPZlRCS+0Qo...,PublicFieldA_yjOvCrei,0.432503,AXZkUZWqpUEtGzGAoubq0kria6jzGoxxe0g66EykHRZJ3j...,AXZkUZUY4ziKGYo9bzqUiQoDLaCBf/4YBYfZv/MTwMnfYRrF,0,databert-handle/batch_job_public/1.0.0,0,1644071179000,ed6fe44f-cff8-43e1-afbe-b1a2e5769bbc,databert986673817,0
4,session_0,strm_demo_user,AXZkUZVnQ7DPTKk73n40vr5/529ERcVbobsprcqaVxBqiB...,PublicFieldA_FAbuQBew,0.130002,AXZkUZUVol4WuJ7OkkImRw5G4qJ4rnQzZrvZdr6snvxjJ7...,AXZkUZVsjnXBblNdXySciso5de1spDEHpjYZqBCjXZ6Z/MaH,1,databert-handle/batch_job_public/1.0.0,0,1644071180000,ed6fe44f-cff8-43e1-afbe-b1a2e5769bbc,databert986673817,1
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
19995,session_20,strm_demo_user,AVbl+lWADWcTi4xo9y6NRU33noeMx/6yMKo+48qJKt+BjE...,PublicFieldA_BhCAolvO,0.269577,AVbl+lVXUFIQNbw+Smc4yd65UzHqCmjtb7ibT6bM3cKcVb...,AVbl+lX6W6reQjSeugQWHSisWtHqktc3ZO6Gv26qYlUyc7gG,1,databert-handle/batch_job_public/1.0.0,0,1644251090000,5e90479e-438f-43ce-81ea-0867ef09f736,databert986673817,1
19996,session_20,strm_demo_user,AVbl+lWIgx3kR66BG+wdjeQSFV30W2K9exl8UAvTA84L1+...,PublicFieldA_bnpztBAC,0.301898,AVbl+lUnM+ypg7VM69/Y6jyIrbLSAbAg40EdKDjtir1PMu...,AVbl+lVuvQybA2/YyaP+XSTriInk3KFTb+DCuUEhkFueKYyL,0,databert-handle/batch_job_public/1.0.0,0,1644251118000,5e90479e-438f-43ce-81ea-0867ef09f736,databert986673817,0
19997,session_20,strm_demo_user,AVbl+lVDvMnXbSJ97ETnMRqHPh8yrXoir0NsEpLin7LF/3...,PublicFieldA_yigVptag,0.674296,AVbl+lUPQdiep+c56Uxuy1GI46hwrt3UmE+iM6ZowtfomL...,AVbl+lUNLyb/hJmZgvuyVH65iz+JDC1wZjWXyaamPdtI+SGN,0,databert-handle/batch_job_public/1.0.0,0,1644251120000,5e90479e-438f-43ce-81ea-0867ef09f736,databert986673817,0
19998,session_20,strm_demo_user,AVbl+lVAkNhn5x+qOBL3GWi8f5vp+rXZmL1RvYkD0NwiuT...,PublicFieldA_qryqEXId,0.818250,AVbl+lXGS4cDiqGBw55u4rTncAmrf0Xjwn7AyObaE7QwqH...,AVbl+lVubhjK58R4D8GecQvvyI0DQg4iKdpq6CyIbxRXA0g=,1,databert-handle/batch_job_public/1.0.0,0,1644251126000,5e90479e-438f-43ce-81ea-0867ef09f736,databert986673817,1


In [9]:
df_derived

Unnamed: 0,SessionId,UserName,Email,PublicFieldA,PublicFieldB,PrivateFieldA,PrivateFieldB,PrivacyPlane,strmMeta.eventContractRef,strmMeta.nonce,strmMeta.timestamp,strmMeta.keyLink,strmMeta.billingId,strmMeta.consentLevels
0,session_0,788d082a29fd07f61e1df95bbe98ef9f,6187cdf5e122eeb219802b8b999e6d3d,PublicFieldA_GfFLTQTn,0.708183,PrivateFieldA_GumYiEZL,36006380727449,2,databert-handle/batch_job_public/1.0.0,0,1644071180000,ed6fe44f-cff8-43e1-afbe-b1a2e5769bbc,databert986673817,2
1,session_0,788d082a29fd07f61e1df95bbe98ef9f,3aeb49d07ecfbf76fb42b9012c7bed7a,PublicFieldA_UcbALibM,0.351033,PrivateFieldA_RnRnFvui,806260380447121,2,databert-handle/batch_job_public/1.0.0,0,1644071186000,ed6fe44f-cff8-43e1-afbe-b1a2e5769bbc,databert986673817,2
2,session_0,788d082a29fd07f61e1df95bbe98ef9f,a344987f926060b6d9018cf9d88555a1,PublicFieldA_DYYePDNW,0.414442,PrivateFieldA_XuYYXzCT,805952205429810,2,databert-handle/batch_job_public/1.0.0,0,1644071225000,ed6fe44f-cff8-43e1-afbe-b1a2e5769bbc,databert986673817,2
3,session_0,788d082a29fd07f61e1df95bbe98ef9f,c9a4c7a93235ed649db92fb19fea2280,PublicFieldA_nTYggzqO,0.280909,PrivateFieldA_viErNbOE,525141177645540,2,databert-handle/batch_job_public/1.0.0,0,1644071229000,ed6fe44f-cff8-43e1-afbe-b1a2e5769bbc,databert986673817,2
4,session_0,788d082a29fd07f61e1df95bbe98ef9f,c0c7659009c0192ab819a2b98e7ed52a,PublicFieldA_GfSaKQiu,0.577391,PrivateFieldA_YePnfdoP,597045162768452,2,databert-handle/batch_job_public/1.0.0,0,1644071244000,ed6fe44f-cff8-43e1-afbe-b1a2e5769bbc,databert986673817,2
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
6603,session_20,788d082a29fd07f61e1df95bbe98ef9f,4cd9423c26fb4e85f42702f857583a13,PublicFieldA_xFelfIRI,0.382091,PrivateFieldA_lUzRtTFj,765948682410455,2,databert-handle/batch_job_public/1.0.0,0,1644250928000,5e90479e-438f-43ce-81ea-0867ef09f736,databert986673817,2
6604,session_20,788d082a29fd07f61e1df95bbe98ef9f,222dbf5a35fba4f08e09f37ab5b9b533,PublicFieldA_mKHmURHr,0.507715,PrivateFieldA_ZYwxyFyq,904128564471099,2,databert-handle/batch_job_public/1.0.0,0,1644250969000,5e90479e-438f-43ce-81ea-0867ef09f736,databert986673817,2
6605,session_20,788d082a29fd07f61e1df95bbe98ef9f,3e7589107ca9cf1b5d19a8771e550a64,PublicFieldA_gATktIgt,0.477453,PrivateFieldA_IfLXvFuv,14637484857404,2,databert-handle/batch_job_public/1.0.0,0,1644251052000,5e90479e-438f-43ce-81ea-0867ef09f736,databert986673817,2
6606,session_20,788d082a29fd07f61e1df95bbe98ef9f,8d85dce9cfb9066a4e51cee4e9f5294a,PublicFieldA_dJDaTHVd,0.999233,PrivateFieldA_ptAsTXqC,162010592284370,2,databert-handle/batch_job_public/1.0.0,0,1644251055000,5e90479e-438f-43ce-81ea-0867ef09f736,databert986673817,2


In [10]:
df_encryption_keys

Unnamed: 0,keyLink,encryptionKey
0,ed6fe44f-cff8-43e1-afbe-b1a2e5769bbc,"{""primaryKeyId"":1986285973,""key"":[{""keyData"":{..."
1,6a04fec3-b7b1-4c45-b5a8-fcb8a963e01f,"{""primaryKeyId"":691779183,""key"":[{""keyData"":{""..."
2,5e90479e-438f-43ce-81ea-0867ef09f736,"{""primaryKeyId"":1457912405,""key"":[{""keyData"":{..."
