## 0. Setup

In [None]:
import urllib3
urllib3.disable_warnings()
import os
import datetime

## 1. Kensu initialisation

To import the library, you need to add the kensu prefix to the library we need to monkeypatch:

$$\text{pandas} \rightarrow  \text{kensu.pandas}$$

Once imported, you can initialize the client with the `KensuProvider` object. Several parameters are available and you can find the list in the user documentation. 

The `Context` of the application is defined by its `process_name` (an identifier for your application), `project_name`(where the application is running), and `environment`.

`execution_timestamp` allows to define the execution timestamp of the notebook, and is set for demo purposes.

In [None]:

from kensu.utils.kensu_provider import KensuProvider
application1 = 'Join Data'
application2 = 'Compute Total'
project = 'DODD'
env = 'Lab'
user = os.getenv('USER')
ingestion_url = ""
ingestion_token = ""
api_token = ""
k = KensuProvider().initKensu(kensu_ingestion_url= ingestion_url,
                              kensu_ingestion_token = ingestion_token,
                              process_name= application1,
                              user_name= user, code_location='https://gitlab.example.com',
                              project_name=project, environment= env, pandas_support=True, sklearn_support=False,
                              tensorflow_support=False, bigquery_support=False,pyspark_support=False,logical_data_source_naming_strategy = 'File',
                              allow_reinit = True, execution_timestamp = round(datetime.datetime(2021,10,25).timestamp())*1000)




import kensu.pandas as pd

In [2]:
import datetime
round(datetime.datetime(2021,10,25).timestamp())*1000

1635112800000

## 2. Execution of the pipeline - 25/10/2021

In [None]:
df_customer = pd.read_csv('./demo/week1/custinfo.csv')

In [None]:
df_transaction = pd.read_csv('./demo/week1/transactions.csv',parse_dates = ['date'])

In [None]:
df = df_customer.merge(df_transaction,on='id')

In [None]:
df.to_csv('./demo/week1/joined.csv',index = False)

k = KensuProvider().initKensu(kensu_ingestion_url= ingestion_url,
                              kensu_ingestion_token = ingestion_token,
                              process_name= application2,
                              user_name= user, code_location='https://gitlab.example.com',
                              project_name=project, environment= env, pandas_support=True, sklearn_support=False,
                              tensorflow_support=False, bigquery_support=False,pyspark_support=False,logical_data_source_naming_strategy = 'File',
                              allow_reinit = True, execution_timestamp = round(datetime.datetime(2021,10,25).timestamp())*1000)

df = pd.read_csv('./demo/week1/joined.csv',parse_dates = ['date'])
df['Total'] = df['price']*df['quantity']
df.to_csv('./demo/week1/data_with_total.csv',index = False)

## 3. Execution of the pipeline - 30/10/2021

**Note:** we reinitialise the client in order to take the new timestamp into account 

In [None]:
k = KensuProvider().initKensu(kensu_ingestion_url= ingestion_url,
                              kensu_ingestion_token = ingestion_token,
                              process_name= application1,
                              user_name= user, code_location='https://gitlab.example.com',
                              project_name=project, environment= env, pandas_support=True, sklearn_support=False,
                              tensorflow_support=False, bigquery_support=False,pyspark_support=False,logical_data_source_naming_strategy = 'File',
                              allow_reinit = True, execution_timestamp = round(datetime.datetime(2021,10,30).timestamp()*1000))

In [None]:
df_customer = pd.read_csv('./demo/week2/custinfo.csv')

In [None]:
df_transaction = pd.read_csv('./demo/week2/transactions.csv',parse_dates = ['date'])

In [None]:
df = df_customer.merge(df_transaction,on='id')

In [None]:
df.to_csv('./demo/week2/joined.csv',index = False)

k = KensuProvider().initKensu(kensu_ingestion_url= ingestion_url,
                              kensu_ingestion_token = ingestion_token,
                              process_name= application2,
                              user_name= user, code_location='https://gitlab.example.com',
                              project_name=project, environment= env, pandas_support=True, sklearn_support=False,
                              tensorflow_support=False, bigquery_support=False,pyspark_support=False,logical_data_source_naming_strategy = 'File',
                              allow_reinit = True, execution_timestamp = round(datetime.datetime(2021,10,30).timestamp())*1000)

df = pd.read_csv('./demo/week2/joined.csv',parse_dates = ['date'])
df['Total'] = df['price']*df['quantity']
df.to_csv('./demo/week2/data_with_total.csv',index = False)

## 3. Execution of the pipeline - 06/11/2021

**Note:** we reinitialise the client in order to take the new timestamp into account 

In [None]:
k = KensuProvider().initKensu(kensu_ingestion_url= ingestion_url,
                              kensu_ingestion_token = ingestion_token,
                              process_name= application1,
                              user_name= user, code_location='https://gitlab.example.com',
                              project_name=project, environment= env, pandas_support=True, sklearn_support=False,
                              tensorflow_support=False, bigquery_support=False,pyspark_support=False,logical_data_source_naming_strategy = 'File',
                              allow_reinit = True, execution_timestamp = round(datetime.datetime(2021,11,6).timestamp())*1000,kensu_api_token=api_token)

import kensu.pandas as pd


In [None]:
df_customer = pd.read_csv('./demo/week3/custinfo.csv')

In [None]:
from kensu.utils.rule_engine import add_missing_value_rules
add_missing_value_rules('custinfo.csv',df_customer)

In [None]:
df_transaction = pd.read_csv('./demo/week3/transactions.csv',parse_dates = ['date'])

In [None]:
df = df_customer.merge(df_transaction, on ='id')

In [None]:
df.to_csv('./demo/week3/joined.csv',index = False)

k = KensuProvider().initKensu(kensu_ingestion_url= ingestion_url,
                              kensu_ingestion_token = ingestion_token,
                              process_name= application2,
                              user_name= user, code_location='https://gitlab.example.com',
                              project_name=project, environment= env, pandas_support=True, sklearn_support=False,
                              tensorflow_support=False, bigquery_support=False,pyspark_support=False,logical_data_source_naming_strategy = 'File',
                              allow_reinit = True, execution_timestamp = round(datetime.datetime(2021,11,6).timestamp())*1000)



df = pd.read_csv('./demo/week3/joined.csv',parse_dates = ['date'])
df['Total'] = df['price']*df['quantity']
df.to_csv('./demo/week3/data_with_total.csv',index = False)

