In [22]:
import pandas as pd
from elasticsearch import Elasticsearch, helpers
from elasticsearch_dsl import Search, Q, A
import time
import logging
import sys

## All information is randomly generated, contains no PII whatsoever

# Import CSV to df
df_clients_with_duplicates = pd.read_csv('csv/clients_with_duplicates.csv')

# Replace all NaN values with an empty string
df_clients_with_duplicates.fillna('', inplace=True)

# Reduce number of entries
df_clients_with_duplicates = df_clients_with_duplicates.head(3000)

def setup_elasticsearch_index(df, index_name):
    # Initialize Elasticsearch client
    es = Elasticsearch(
        'https://elastic:9200',
        basic_auth=('elastic', 'password'),
        verify_certs=False,
        ssl_show_warn=False
    )
    # Delete the index if it exists
    if es.indices.exists(index=index_name):
        es.indices.delete(index=index_name)
        print(f"Deleted existing index {index_name}")

    # Create the index with optimized settings for client fields analysis
    settings = {
        "settings": {
            "analysis": {
                "analyzer": {
                    "name_folding_analyzer": {
                        "type": "custom",
                        "tokenizer": "standard",  # separate words by a space character
                        "filter": ["lowercase", "asciifolding"]
                    },
                    "legal_folding_analyzer": {
                        "type": "custom",
                        "tokenizer": "standard",
                        "filter": ["lowercase", "asciifolding"],
                        "char_filter": ["hyphen_removal"]
                    }
                },
                "normalizer": {
                    "name_folding_normalizer": {
                        "type": "custom",
                        "filter": ["lowercase", "asciifolding"]
                    },
                    "legal_folding_normalizer": {
                        "type": "custom",
                        "filter": ["lowercase", "asciifolding"],
                        "char_filter": ["hyphen_removal"]
                    }
                },
                "char_filter": {
                    "hyphen_removal": {
                        "type": "pattern_replace",
                        "pattern": "[\\-\\s]",  # regex targets both hyphens and whitespace characters
                        "replacement": ""
                    }
                }                
            }
        },
        "mappings": {
            "properties": {
                "name": {
                    "type": "text",
                    "analyzer": "name_folding_analyzer",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        },
                        "folded": {
                            "type": "keyword",
                            "normalizer": "name_folding_normalizer"
                        }
                    }
                },
                "legal": {
                    "type": "text",
                    "analyzer": "legal_folding_analyzer",
                    "fields": {
                        "keyword": {
                            "type": "keyword",
                            "ignore_above": 256
                        },
                        "folded": {
                            "type": "keyword",
                            "normalizer": "legal_folding_normalizer"
                        }
                    }
                },
                "email1": {
                    "type": "keyword",
                    "ignore_above": 256
                },
                "email2": {
                    "type": "keyword",
                    "ignore_above": 256
                }
            }
        }
    }
    es.indices.create(index=index_name, body=settings)
    print(f"Created index {index_name} with custom settings and mappings")

    # Create an iterable of actions to be executed in bulk
    actions = [
        {
            "_index": index_name,
            "_id": str(record['id']),
            "_source": record,
        }
        for record in df.to_dict(orient='records')
    ]
    
    # Perform bulk insert
    helpers.bulk(es, actions)
    # Refresh the index to make the changes searchable
    es.indices.refresh(index=index_name)
    print(f"Data uploaded to index {index_name}")

setup_elasticsearch_index(df_clients_with_duplicates, 'es_clients_with_duplicates')

Deleted existing index es_clients_with_duplicates
Created index es_clients_with_duplicates with custom settings and mappings
Data uploaded to index es_clients_with_duplicates


In [23]:
# Perform a cross-comparison among all documents in the index for any given field

# Clear previous logging handlers if any
for handler in logging.root.handlers[:]:
    logging.root.removeHandler(handler)

# Configure logging to show the time, level, and message without the logger's name
logging.basicConfig(level=logging.INFO,
                    format='%(asctime)s - %(levelname)s - %(message)s')

def find_and_display_duplicates():
    start_time = time.time()  # Start timing the analysis
    
    es = Elasticsearch(
        'https://elastic:9200',
        basic_auth=('elastic', 'password'),
        verify_certs=False,
        ssl_show_warn=False
    )

    # Count index documents for logs
    total_docs = es.count(index='es_clients_with_duplicates')['count']
    
    # Setup aggregation queries for names and legal fields
    s = Search(using=es, index='es_clients_with_duplicates').source(False).params(size=0)
    s.aggs.bucket('by_name', 'terms', field='name.folded', min_doc_count=2, size=10000)
    s.aggs.bucket('by_legal', 'terms', field='legal.folded', min_doc_count=2, size=10000)
    response = s.execute()

    # Extract keys from buckets
    folded_names = {bucket.key for bucket in response.aggregations.by_name.buckets}
    folded_legals = {bucket.key for bucket in response.aggregations.by_legal.buckets}

    data = []
    current_matched_set = 1
    processed_doc_ids = set()

    # Function to fetch and assign matched_set for documents
    def fetch_and_assign(query, matched_set):
        search = Search(using=es, index='es_clients_with_duplicates').query(query)
        search = search.source(['name', 'date', 'legal', 'email1', 'email2'])  # Added email fields
        results = search.execute()

        assigned_data = []
        for hit in results:
            doc_id = hit.meta.id
            if doc_id not in processed_doc_ids:
                processed_doc_ids.add(doc_id)
                assigned_data.append({
                    "id": doc_id,
                    "name": hit.name,
                    "date": hit.date,
                    "email1": hit.email1,
                    "email2": hit.email2,
                    "legal": hit.legal,
                    "matched_set": matched_set
                })

        return assigned_data

    # Process and assign matched_set for name and legal
    for field, values in [('name.folded', folded_names), ('legal.folded', folded_legals)]:
        for value in values:
            if value:  # Check for empty values to avoid processing them
                query = Q('term', **{field: value})
                results = fetch_and_assign(query, current_matched_set)
                if results:
                    data.extend(results)
                    current_matched_set += 1

    # Collect and process emails for duplicate detection
    s = Search(using=es, index='es_clients_with_duplicates').source(['email1', 'email2'])
    email_counts = {}
    for hit in s.scan():
        if hit.email1:
            email_counts[hit.email1] = email_counts.get(hit.email1, 0) + 1
        if hit.email2 and hit.email2 != hit.email1:
            email_counts[hit.email2] = email_counts.get(hit.email2, 0) + 1

    common_emails = {email for email, count in email_counts.items() if count > 1}
    for email in common_emails:
        query = Q('bool', should=[Q('term', email1=email), Q('term', email2=email)])
        results = fetch_and_assign(query, current_matched_set)
        if results:
            data.extend(results)
            current_matched_set += 1

    # Create and return the DataFrame
    df = pd.DataFrame(data)
    if not df.empty:
        df = df.sort_values(by=['matched_set'])

    logging.info(f"Analyzed {total_docs} client profiles in {time.time() - start_time:.2f} seconds.")
    logging.info(f"Found {len(df)-1} Client IDs with potential duplicates.")
    
    return df

# Display results
duplicates_df = find_and_display_duplicates()
duplicates_df

2024-10-20 14:27:27,147 - INFO - Analyzed 3000 client profiles in 0.70 seconds.
2024-10-20 14:27:27,147 - INFO - Found 1041 Client IDs with potential duplicates.


Unnamed: 0,id,name,date,email1,email2,legal,matched_set
0,45301,Nina Fuller,24/10/24,nin_ful@yahoo.123,nin_ful@yahoo.123,Y1079501,1
1,50301,Nìna Fullér,24/10/19,nin_ful@gmail.xyz,,Y-1079501,1
2,45173,Grace Banks,01/05/22,gracebanks77@hotmail.abc,gbanks@gmail.xyz,2065463418,2
3,50173,Gracé Banks,01/05/20,gra_ban@gmx.bbb,,0578890674,2
4,49836,Lèónel Keller,16/05/18,leo_kel@hotmail.abc,lkeller@gmx.bbb,X-6102347,3
...,...,...,...,...,...,...,...
1037,47782,Ira Blanchard,11/12/21,ira_bla@yahoo.123,iblanchard@hotmail.abc,X5056099,521
1038,46734,Joey Harrison,15/10/24,joe_har@hotmail.abc,jharrison@yahoo.123,Y8571854,522
1039,47449,Gwendolyn Jackson,27/05/22,gjackson@gmail.xyz,jharrison@yahoo.123,Z7981051,522
1040,43540,Magnus Garner,26/04/22,magnusgarner45@outlook.456,kmclean@gmx.bbb,X7274840,523
