In [1]:
# saves you having to use print as all exposed variables are printed in the cell
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

# core libraries
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# matpolitlib config
%matplotlib inline
plt.style.use('ggplot')

# Pandas setup
pd.set_option('max_colwidth', 200)
pd.set_option('display.max_rows', 500)
pd.set_option('display.max_columns', 99)
pd.set_option('expand_frame_repr', True)

---------------------------------

----------------------
## Environment Variables

In [2]:
import os

In [3]:
# clean out any old environments
for key in os.environ.keys():
    if key.startswith('HADRON'):
        del os.environ[key]

In [4]:
os.environ['HADRON_PM_PATH'] = './hadron/contracts'
os.environ['HADRON_PM_TYPE'] = 'json'

os.environ['HADRON_DEFAULT_PATH'] = './hadron/data'

In [5]:
# Origin
os.environ['HADRON_CLEANER_ORIGIN_SOURCE_URI'] = '../services/source/hadron_synth_origin.pq'
os.environ['HADRON_CLEANER_ORIGIN_HEADER_URI'] = '../services/source/mapping.csv'

# Other
os.environ['HADRON_CLEANER_OTHER_SOURCE_URI'] = '../services/source/hadron_synth_other.pq'
os.environ['HADRON_CLEANER_OTHER_HEADER_URI'] = '../services/source/mapping.csv'

# difference on key
os.environ['HADRON_DIFFERENCE_ON_KEY'] = 'identifier'

---------------------------------

----------------------------
## Create the Cleaner Other Component Instance

In [6]:
from ds_discovery import Transition

In [7]:
# create instance
tr = Transition.from_env('align_other_task', has_contract=False)

In [8]:
# setup using the environment variable
tr.setup_bootstrap('Telecoms', description='Aligns the headers and reinstate nulls')
tr.set_source_uri('${HADRON_CLEANER_OTHER_SOURCE_URI}')
file = tr.pm.file_pattern(name='cleaned', file_type='parquet')
tr.set_persist(file)

### add the URI to be mapped

In [9]:
# add target source
tr.add_connector_uri('header_map', '${HADRON_CLEANER_OTHER_HEADER_URI}')

In [10]:
# load the files
df = tr.load_source_canonical()

### create component intent actions

In [11]:
# run the method that calculates differences
df = tr.tools.auto_clean_header(df, rename_map='header_map')
df = tr.tools.auto_reinstate_nulls(df)
df = tr.tools.auto_to_date(df)

In [12]:
df

Unnamed: 0,identifier,date,bool,float,poly,cat,int,str
0,67,2023-03-16,1,5.409,30.3049,ACTIVE,485,Lelah
1,219,2022-12-09,1,3.913,21.311569,ACTIVE,-14,Foxholes
2,246,2023-02-20,1,4.819,29.222761,ACTIVE,96,Swaffham
3,252,2023-01-26,1,4.703,28.118209,ACTIVE,806,Tam
4,46,2023-03-18,1,1.004,7.008016,ACTIVE,92,Melksham
5,294,2023-02-19,1,4.816,29.193856,ACTIVE,-865,Dormansland
6,19,2022-12-06,1,2.214,10.901796,ACTIVE,2229,Norton Malreward
7,52,2023-02-19,1,4.254,24.096516,ACTIVE,768,Painted Valley
8,267,2023-03-19,1,3.258,16.614564,INACTIVE,145,Emerald Ridge
9,58,2023-03-21,1,0.011,30.571849,INACTIVE,-929,Havana


### run the task

In [13]:
# run the component task
tr.run_component_pipeline()

----------------------------
## Create the Cleaner Origin Component Instance

In [14]:
# create instance
tr = Transition.from_env('align_origin_task', has_contract=False)

In [15]:
# setup using the environment variable
tr.setup_bootstrap('Telecoms', description='Aligns the headers and reinstate nulls')
tr.set_source_uri('${HADRON_CLEANER_ORIGIN_SOURCE_URI}')
file = tr.pm.file_pattern(name='cleaned', file_type='parquet')
tr.set_persist(file)

### add the URI to be mapped

In [16]:
# add target source
tr.add_connector_uri('header_map', '${HADRON_CLEANER_ORIGIN_HEADER_URI}')

In [17]:
# load the files
df = tr.load_source_canonical()

### create component intent actions

In [18]:
# run the method that calculates differences
df = tr.tools.auto_clean_header(df, rename_map='header_map')
df = tr.tools.auto_reinstate_nulls(df)
df = tr.tools.auto_to_date(df)

### run the task

In [19]:
# run the component task
tr.run_component_pipeline()

---------------------------------

--------------------
## Create the Difference Component Instance

In [20]:
from ds_discovery import Wrangle

In [21]:
# create instance
wr = Wrangle.from_env('difference_task', has_contract=False)

In [22]:
# setup using the environment variable
wr.setup_bootstrap('Telecoms', description='Compare two sources and identify if there are differences')
wr.set_source_contract(Transition.from_env('align_origin_task').get_persist_contract(), template_aligned=True)

out = wr.pm.file_pattern(name='flagged', prefix='hadron_difference_', file_type='csv', stamped='hours')
wr.set_persist(uri_file=out)

### add the URI to be compared with the source

In [23]:
# add target source
wr.add_connector_contract('align_other', Transition.from_env('align_other_task').get_persist_contract(), template_aligned=True)

In [24]:
# add summary connector
out = wr.pm.file_pattern(name='summary', prefix='hadron_difference_', file_type='csv', stamped='hours')
wr.add_connector_persist('summary', out)

# add detail connector
out = wr.pm.file_pattern(name='detail', prefix='hadron_difference_', file_type='csv', stamped='hours')
wr.add_connector_persist('detail', out)

# add unmatched connector
out = wr.pm.file_pattern(name='unmatched', prefix='hadron_difference_', file_type='csv', stamped='hours')
wr.add_connector_persist('unmatched', out)

In [25]:
# load the files to see if there are observable differences
df = wr.load_source_canonical()
other = wr.load_canonical('align_other')

### create component intent actions

In [26]:
# run the method that calculates differences
df = wr.tools.model_difference(df, other='align_other', on_key='${HADRON_DIFFERENCE_ON_KEY}', drop_zero_sum=True, column_name='difference', 
                               summary_connector='summary', unmatched_connector='unmatched', detail_connector='detail')

### run the task

In [27]:
# run the component task
wr.run_component_pipeline()

---------------------------------

## Show The Result

In [28]:
wr.load_persist_canonical()

Unnamed: 0,identifier,float,int
0,13,1,0
1,19,0,1
2,52,0,1
3,58,1,0
4,64,1,0
5,67,1,0
6,94,1,0


In [29]:
wr.load_canonical('detail')

Unnamed: 0,identifier,float_x,float_y,int_x,int_y
0,13,4.833,6.072,-,-
1,19,-,-,61,2229
2,52,-,-,187,768
3,58,4.957,0.011,-,-
4,64,4.57,7.777,-,-
5,67,4.93,5.409,-,-
6,94,3.646,6.824,-,-


In [30]:
wr.load_canonical('summary')

Unnamed: 0,Attribute,Summary
0,matching,10
1,right_only,5
2,left_only,2
3,float,5
4,int,2


In [31]:
wr.load_canonical('unmatched')

Unnamed: 0,found_in,identifier,bool,bernoulli,str,gumbel,float,date,int,normal,cat,poly
0,left_only,146,1,0.0,Swaffham,-0.085,4.476,2022-12-09,806,-0.395,ACTIVE,
1,left_only,194,1,1.0,Dormansland,0.266,4.946,2023-03-19,96,0.264,ACTIVE,
2,right_only,219,1,,Foxholes,,3.913,2022-12-09,-14,,ACTIVE,21.311569
3,right_only,246,1,,Swaffham,,4.819,2023-02-20,96,,ACTIVE,29.222761
4,right_only,252,1,,Tam,,4.703,2023-01-26,806,,ACTIVE,28.118209
5,right_only,294,1,,Dormansland,,4.816,2023-02-19,-865,,ACTIVE,29.193856
6,right_only,267,1,,Emerald Ridge,,3.258,2023-03-19,145,,INACTIVE,16.614564
