# End to End Dedupe
## Pruning pipeline using Elastic search

## 1. Loading & transforming the data

In [35]:
from tutorial.main.stepbystep.stepbysteputils.pgconnector import create_engine_ready
from suricate.data.companies import getsource, gettarget
import pandas as pd
import  numpy as np

engine = create_engine_ready()

# filefolder = '~/'
# leftpath = 'left.csv'
# rightpath = 'right.csv'
# df_source = pd.read_csv(filefolder + leftpath, index_col=0, sep='|', encoding='utf-8')
# df_target = pd.read_csv(filefolder + rightpath, index_col=0, sep='|', encoding='utf-8')
df_source_raw = getsource(nrows=1000)
df_target_raw = gettarget(nrows=None)

def rebuild_ytrue(ix):
    y_true_saved = pd.read_sql(sql="SELECT * FROM y_true WHERE y_true.y_true = 1", con=engine).set_index(
        ['ix_source', 'ix_target'],
        drop=True)['y_true']
    y = pd.Series(index=ix, data = np.zeros(shape=len(ix)), name='y_true')
    ix_common = y_true_saved.index.intersection(ix)
    y.loc[ix_common] = y_true_saved.loc[ix_common]
    return y



def prepare_source(df):
    """

    Args:
        df:

    Returns:
        pd.DataFrame
    """
    df2 = df
    return df2


def prepare_target(df):
    """

    Args:
        df:

    Returns:
        pd.DataFrame
    """
    df2 = df
    return df2


df_source = prepare_source(df_source_raw)
df_target = prepare_target(df_target_raw)
assert df_source.columns.equals(df_target.columns)
print(pd.datetime.now(),' | ', 'number of rows on left:{}'.format(df_source.shape[0]))
print(pd.datetime.now(),' | ', 'number of rows on right:{}'.format(df_target.shape[0]))

2020-04-13 21:41:25.607139  |  number of rows on left:1000
2020-04-13 21:41:25.607667  |  number of rows on right:3177


## 2. Pushing the data from the target (right) dataframe to Elastic Search

In [2]:
## Push the data to ES
import elasticsearch
import pandas as pd
from suricate.dbconnectors.esconnector import index_with_es
import time


## Put the data to ES, drop the index first and then re create
esclient = elasticsearch.Elasticsearch()
es_indice = 'df_target'

print(pd.datetime.now(),' | ', 'Start pushing to ES')

if True:
    try:
        esclient.indices.delete(index=es_indice)
    except:
        pass
    request_body = {
        "settings": {
            "number_of_shards": 5,
            "number_of_replicas": 5
        },

        "mappings": {
            "_doc": {
                "properties": {
                    "ix": {"type": "keyword"},
                    "name": {"type": "text"},
                    "street": {"type": "text"},
                    "city": {"type": "text"},
                    "postalcode": {"type": "text"},
                    "countrycode": {"type": "keyword"}
                }
            }
        }
    }
    esclient.indices.create(index=es_indice, body=request_body)
    index_with_es(client=esclient, df=df_target, index=es_indice, ixname="ix", reset_index=True, doc_type='_doc')
    time.sleep(5)
pass
catcount = esclient.count(index=es_indice)['count']
assert catcount == df_target.shape[0]
print(pd.datetime.now(),' | ', 'pushed to es_sql indice {}'.format(es_indice))
print(pd.datetime.now(),' | ', 'number of docs: {}'.format(catcount))

2020-04-13 21:35:17.697749  |  Start pushing to ES
2020-04-13 21:35:40.865960  |  pushed to es_sql indice df_target
2020-04-13 21:35:40.866152  |  number of docs: 3177


## 3. Connecting the data
* For each row in the left dataframe, launch a search request on ES df_target indice
* Output:
    * Xst: Similarity Matrix
    * Xsbs: Side-by-side view of the data


In [36]:
## Connect the data
from tutorial.main.stepbystep.stepbysteputils.esconnector import getesconnector

print(pd.datetime.now(),' | ', 'Starting connection')
escon = getesconnector()

Xst = escon.fit_transform(X=df_source)
print(pd.datetime.now(),' | ', 'Finished connection')
print(pd.datetime.now(),' | ', 'number of pairs {}'.format(Xst.shape[0]))
print(pd.datetime.now(),' | ', 'Connection scores sample:')
print(Xst.sample(5))

ix_con_multi = Xst.index
print(pd.datetime.now(),' | ', 'Starting side-by-side build')
Xsbs = escon.getsbs(X=df_source, on_ix=ix_con_multi)
print(pd.datetime.now(),' | ', 'Finished side-by-side build')
print(pd.datetime.now(),' | ', 'Side by side pairs sample:')
print(Xsbs.sample(5))

2020-04-13 21:41:42.884479  |  Starting connection
2020-04-13 21:42:14.615772  |  Finished connection
2020-04-13 21:42:14.616046  |  number of pairs 10000
2020-04-13 21:42:14.616155  |  Connection scores sample:
                    es_score es_rank
ix_source  ix_target                   
6cf59033 0be2de31  32.697815       0
6d738788 6d738788  34.087070       0
054aa318 738643f2  27.739553       2
1bd2d98a 97cd92c7  32.699120       0
e8b86147 90dffaf1  12.542312       2
2020-04-13 21:42:14.620979  |  Starting side-by-side build
2020-04-13 21:42:30.623783  |  Finished side-by-side build
2020-04-13 21:42:30.624259  |  Side by side pairs sample:
                                   name_source                      name_target  \
ix_source  ix_target                                                              
4b61a6e1 5db25854  c stiefelmayer gmbh co kg  keck energieservice gmbh co kg   
3e254c40 e5cfe1a5    hahnkolb werkzeuge gmbh        hahn kolb werkzeuge gmbh   
bd2cf1f0 89f97a50   ing 

## 4. Exploring the data
### Clustering part
* Cluster scores to find pairs with same similarity
* Output:
    * X_cluster: Score matrix
### 4.1.1. Fit the cluster to non-supervized data

In [38]:
from suricate.explore import Explorer
n_questions = 200
print(pd.datetime.now(),' | ', 'Starting questions with n_questions = {}'.format(n_questions))
print(pd.datetime.now(),' | ', 'Starting cluster fit with unsupervized data')
exp = Explorer(n_simple=n_questions, n_hard=n_questions)
exp.fit_cluster(X=Xst[['es_score']])
y_cluster = pd.Series(data=exp.pred_cluster(X=Xst), index=Xst.index, name='y_cluster')
X_cluster = pd.DataFrame(y_cluster)
print(pd.datetime.now(),' | ', 'Done')

2020-04-13 21:43:01.580884  |  Starting questions with n_questions = 200
2020-04-13 21:43:01.581328  |  Starting cluster fit with unsupervized data
2020-04-13 21:43:01.604762  |  Done


### 4.1.2. Visualizing the clusters and the average score

#### 4.2. Loading the cheatsheet of labelled pairs
* In real life, you would manually label each of those pairs.
* For this demo, I already created a y_true vector

In [39]:
y_true = rebuild_ytrue(ix= ix_con_multi)
print(pd.datetime.now(),' | ', 'Content of y_true:\n{} non-matching pairs\n{} matching pairs'.format(y_true.value_counts().loc[0], y_true.value_counts().loc[1]))

2020-04-13 21:43:11.084201  |  Content of y_true:
7869 non-matching pairs
2131 matching pairs


### 4.3. Asking Questions to label the data
#### 4.3.1 Asking Simple Questions
* Simple questions are sample of pairs from each cluster in the similarity matrix
* They are used as representative sample of each cluster and thus of the general population
* We are using non-supervized techniques to generate the clusters
* These samples can then be manually labelled, giving way for supervized methods

In [40]:
ix_simple = exp.ask_simple(X=Xst)
Sbs_simple = Xsbs.loc[ix_simple]
y_simple = y_true.loc[ix_simple]
print(pd.datetime.now(),' | ', 'Sample of simple questions:')
print(Sbs_simple.sample(10))

2020-04-13 21:43:29.663065  |  Sample of simple questions:
                                                           name_source  \
ix_source  ix_target                                                      
d0fec07a 3ac3d036                       honeywell international sarl   
46a13500 eabdefc6                                        ge aviation   
32571bf3 e7cee213                                lapp gmbh und co kg   
ccd0b6df 2279f732                                           luces sl   
c906f2e3 e2a2da69                              botschaft afghanistan   
4af3a76c 253ce464  hamilton sundstrand hamilton sundstrand hamilt...   
b52c7c8e d48eca66                                       selex es spa   
816d262e fc8bf3d0                             ge measurement control   
73a56e4d a9b7c40b                 selex sensors and airborne systems   
dd2d6cef dc52465a  snap on industrial germany snap on industrial ...   

                                           name_target  \
ix_source  ix

#### 4.3.2. How many positive matches per cluster

In [41]:
from suricate.explore import cluster_stats
X_pivot = cluster_stats(X=Xst[['es_score']], y_cluster=y_cluster, y_true=y_true)
print(pd.datetime.now(),' | ', 'Content of cluster after simple questions:')
print(X_pivot)

2020-04-13 21:43:52.265994  |  Content of cluster after simple questions:
           avg_score  pct_match
y_cluster                      
7.0        68.981955   1.000000
8.0        78.712065   1.000000
9.0        87.478827   1.000000
4.0        41.725311   0.945946
5.0        50.130134   0.934426
6.0        59.285728   0.820513
3.0        31.462654   0.668026
2.0        19.752465   0.172759
1.0         9.933944   0.003017
0.0         6.678583   0.000000


#### 4.3.2. Fit the cluster classifier with supervized data
* For each cluster, we measure:
    * If all pairs from the sample are a match
    * If some pairs are a match, some aren't
    * If there is no pairs in the sample that match
* These results can be used for pruning

In [42]:
print(pd.datetime.now(),' | ', 'Start fitting the cluster classifier with supervized data:')
exp.fit(X=Xst, y=y_simple, fit_cluster=False)
print(pd.datetime.now(),' | ', 'Done')

2020-04-13 21:44:26.710956  |  Start fitting the cluster classifier with supervized data:
2020-04-13 21:44:26.811834  |  Done


In [43]:
### Ask hard (pointed) questions
ix_hard = exp.ask_hard(X=Xst, y=y_simple)
Sbs_hard = Xsbs.loc[ix_hard]
y_hard = y_true.loc[ix_hard]
print(pd.datetime.now(),' | ', 'Result of hard questions:')
print(Sbs_hard.sample(10))

### Obtain the results of the labels
y_questions = y_true.loc[ix_hard.union(ix_simple)]
X_questions = Xsbs.loc[y_questions.index].copy()

2020-04-13 21:44:35.397453  |  Result of hard questions:
                                               name_source  \
ix_source  ix_target                                          
5fae6160 5fae6160         ads analyse detection securite   
501ad89f 8c071814                ge aviation systems ltd   
35b7cba1 be329903                 clinter traducciones e   
51f411d4 51f411d4               knuerr gmbh sales office   
28c9bf9a e6f50811  performance plastics products 3p gmbh   
f7da5ee1 484866fb             e a elektro automatik gmbh   
28f0feb9 e29b4c7f                         blumenhof frey   
7d49ff77 7d49ff77          bundesanstalt f arbeitsschutz   
ba852050 536d4a08    ge aviation ge aviation ge aviation   
f6d4c1fe 3afab667            honeywell international inc   

                                                          name_target  \
ix_source  ix_target                                                      
5fae6160 5fae6160                     ads analyse detection securite 

### Make the pruning step


In [44]:
print(pd.datetime.now(),' | ', 'Start Pruning')
pruning_clusters = X_pivot.loc[X_pivot['pct_match'] > 0.0].index
ix_further = y_cluster.loc[y_cluster.isin(pruning_clusters)].index
Xst_further = Xst.loc[ix_further]
Xsbs_further = Xsbs.loc[ix_further]
y_true_further = y_true.loc[ix_further]
print(pd.datetime.now(),' | ', 'Pruning ratio: {}'.format(1-len(ix_further)/Xst.shape[0]))


2020-04-13 21:44:41.704916  |  Start Pruning
2020-04-13 21:44:41.808677  |  Pruning ratio: 0.07099999999999995


## 5. Further scoring
* Use the selected subset of data after the pruning steps
* Do more detailed similarity comparison
* Prepare the final similarity matrix for the machine-learning classifier
* Use the FuncSbsComparator

In [45]:
from suricate.sbsdftransformers import FuncSbsComparator
from sklearn.pipeline import FeatureUnion

print(pd.datetime.now(),' | ', 'Starting further scoring')
_sbs_score_list = [
    ('name_fuzzy', FuncSbsComparator(on='name', comparator='fuzzy')),
    ('street_fuzzy', FuncSbsComparator(on='street', comparator='fuzzy')),
    ('name_token', FuncSbsComparator(on='name', comparator='token')),
    ('street_token', FuncSbsComparator(on='street', comparator='token')),
    ('city_fuzzy', FuncSbsComparator(on='city', comparator='fuzzy')),
    ('postalcode_fuzzy', FuncSbsComparator(on='postalcode', comparator='fuzzy')),
    ('postalcode_contains', FuncSbsComparator(on='postalcode', comparator='contains'))
]

scorer_sbs = FeatureUnion(transformer_list=_sbs_score_list)
scores_further = scorer_sbs.fit_transform(X=Xsbs_further)
scores_further = pd.DataFrame(data=scores_further, index=ix_further, columns=[c[0] for c in _sbs_score_list])
print(pd.datetime.now(),' | ', 'Done')
print(pd.datetime.now(),' | ', 'Scores:')
print(scores_further.sample(10))

2020-04-13 21:45:22.222607  |  Starting further scoring
2020-04-13 21:45:41.641162  |  Done
2020-04-13 21:45:41.641410  |  Scores:
                   name_fuzzy  street_fuzzy  name_token  street_token  \
ix_source  ix_target                                                       
9511612e 9511612e        1.00          1.00        1.00          1.00   
a8ec99a6 378f4784        0.21          1.00        0.21          1.00   
8946d1cc 891d9409        0.62          0.40        0.62          0.33   
f807543d e5e397cc        0.34          0.35        0.28          0.35   
a83fa5a8 48784f03        0.47          0.17        0.35          0.17   
d694d372 580ca91d        0.47          0.37        0.53          0.37   
b2bd34fb fe983f7b        0.25          0.57        0.12          0.57   
d1940797 d1940797        1.00          1.00        1.00          1.00   
f67ba778 65d77185        0.52          0.43        0.56          0.43   
251a3154 b04a972a        0.22          0.91        0.22        

### Add the scores from the previous step

In [46]:
scores_further = pd.concat([Xst_further, scores_further], axis=1, ignore_index=False)

## 6. Machine Learning Classifier

In [13]:
from suricate.pipeline import PartialClf
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import Normalizer
from sklearn.decomposition import PCA
from sklearn.ensemble import GradientBoostingClassifier

### 6.1 Make the pipeline

In [14]:
pipe = Pipeline(steps=[
    ('Impute', SimpleImputer(strategy='constant', fill_value=0)),
    ('Scaler', Normalizer()),
    ('PCA', PCA(n_components=4)),
    ('Predictor', GradientBoostingClassifier(n_estimators=500))
])

### 6.2. Launch the Pipeline on the reduced subset

In [47]:
print(pd.datetime.now(),' | ', 'Launch prediction pipeline fit')
pred = PartialClf(classifier=pipe)
pred.fit(X=scores_further, y=y_true_further)
print(pd.datetime.now(),' | ', 'Done')
y_pred_further = pred.predict(X=scores_further)

2020-04-13 21:46:13.982590  |  Launch prediction pipeline fit
2020-04-13 21:46:17.928956  |  Done


Complete the prediction vector by assigning a score of 0 for the pairs not considered
in the pruning step

In [48]:
ix_pruned = y_cluster.loc[~(y_cluster.isin(pruning_clusters))].index
y_pred_pruned = pd.Series(index=ix_pruned, data=np.zeros(shape=len(ix_pruned)), name='y_pred')
y_pred = pd.concat([y_pred_pruned, y_pred_further], ignore_index=False)

### 6.3. Calculate the scores on the training data

In [49]:
y_train_true = y_true.loc[y_pred.index]
from sklearn.metrics import precision_score, recall_score, accuracy_score
print(pd.datetime.now(),' | ', 'Scores on training data')
print(pd.datetime.now(),' | ', 'accuracy: {}'.format(accuracy_score(y_true=y_train_true, y_pred=y_pred)))
print(pd.datetime.now(),' | ', 'precision: {}'.format(precision_score(y_true=y_train_true, y_pred=y_pred)))
print(pd.datetime.now(),' | ', 'recall: {}'.format(recall_score(y_true=y_train_true, y_pred=y_pred)))

2020-04-13 21:46:47.246647  |  Scores on training data
2020-04-13 21:46:47.247086  |  accuracy: 0.9814
2020-04-13 21:46:47.256749  |  precision: 0.9691268692715871
2020-04-13 21:46:47.271430  |  recall: 0.9427498826841858


## 7. Use with test data

### 7.1 Data Preparation

In [50]:
leftrawnew = getsource(nrows=None)
leftrawnew= leftrawnew.loc[leftrawnew.index.difference(df_source_raw)]
leftrawnew= prepare_source(leftrawnew)

### 7.2. Connection

In [51]:
Xst_test = escon.transform(X=leftrawnew)

In [52]:
Xsbs_test = escon.getsbs(X=leftrawnew, on_ix=Xst_test.index)

### 7.3. Cluster

In [53]:
y_cluster_test = pd.Series(data=exp.pred_cluster(X=Xst_test), index=Xst_test.index, name='y_cluster')

### 7.4. Pruning

In [54]:
ix_further_test = y_cluster_test.loc[y_cluster_test.isin(pruning_clusters)].index

In [55]:
Xst_further_test = Xst_test.loc[ix_further_test]
Xsbs_further_test = Xsbs_test.loc[ix_further_test]

### 7.5. Scoring

In [56]:
scores_further_test = scorer_sbs.fit_transform(X=Xsbs_further_test)
scores_further_test = pd.DataFrame(data=scores_further_test, index=ix_further_test, columns=[c[0] for c in _sbs_score_list])
scores_further_test = pd.concat([Xst_further_test, scores_further_test], axis=1, ignore_index=False)

### 7.6. Prediction

In [57]:
y_pred_further_test = pred.predict(X=scores_further_test)
ix_pruned_test = y_cluster_test.loc[~(y_cluster_test.isin(pruning_clusters))].index
y_pred_pruned_test = pd.Series(index=ix_pruned_test, data=np.zeros(shape=len(ix_pruned_test)), name='y_pred')
y_pred_test = pd.concat([y_pred_pruned_test, y_pred_further_test], ignore_index=False)

#### 7.7. Scoring on test data

In [58]:
y_true_test = rebuild_ytrue(ix=Xst_test.index)
print(pd.datetime.now(),' | ', 'Scores on test data')
print(pd.datetime.now(),' | ', 'accuracy: {}'.format(accuracy_score(y_true=y_true_test, y_pred=y_pred_test)))
print(pd.datetime.now(),' | ', 'precision: {}'.format(precision_score(y_true=y_true_test, y_pred=y_pred_test)))
print(pd.datetime.now(),' | ', 'recall: {}'.format(recall_score(y_true=y_true_test, y_pred=y_pred_test)))






2020-04-13 21:50:21.914017  |  Scores on training data
2020-04-13 21:50:21.914226  |  accuracy: 0.6954986149584488
2020-04-13 21:50:21.917839  |  precision: 0.22249911000355999
2020-04-13 21:50:21.926997  |  recall: 0.22022551092318535
