In [3]:
from typing import List, Set, Tuple
import numpy as np
from collections import defaultdict
import re
import pandas as pd

def remove_exact_duplicates(df,column_name):
    df=df.drop_duplicates(subset=[column_name])
    return df

class MinHashLSH:
    def __init__(self, num_perm: int = 100, num_bands: int = 20):
        """
        Initialize LSH with MinHash for text similarity detection
        
        Args:
            num_perm: Number of permutations for MinHash
            num_bands: Number of bands for LSH (affects performance/accuracy tradeoff)
        """
        self.num_perm = num_perm
        self.num_bands = num_bands
        self.rows_per_band = self.num_perm // self.num_bands
        self.hash_tables = [defaultdict(list) for _ in range(self.num_bands)]
        self.documents = {}
        
    def _preprocess_text(self, text: str) -> str:
        """Clean and normalize text."""
        text = text.lower()
        text = re.sub(r'[^\w\s]', '', text)
        return text
    
    def _get_shingles(self, text: str, k: int = 3) -> Set[str]:
        """Convert text to k-shingles."""
        text = self._preprocess_text(text)
        return set(text[i:i+k] for i in range(len(text)-k+1))
    
    def _minhash_signature(self, shingles: Set[str]) -> np.ndarray:
        """Generate MinHash signature for a set of shingles."""
        signature = np.full(self.num_perm, np.inf)
        
        for shingle in shingles:
            for i in range(self.num_perm):
                hash_val = hash(f"{i}_{shingle}")
                signature[i] = min(signature[i], hash_val)
        
        return signature
    
    def _get_bands(self, signature: np.ndarray) -> List[Tuple]:
        """Split signature into bands."""
        return [tuple(signature[i:i + self.rows_per_band]) 
                for i in range(0, len(signature), self.rows_per_band)]
    
    def _jaccard_similarity(self, sig1: np.ndarray, sig2: np.ndarray) -> float:
        """Calculate Jaccard similarity between two MinHash signatures."""
        return np.mean(sig1 == sig2)
    
    def add_document(self, doc_id: str, text: str):
        """Add a document to the LSH index."""
        shingles = self._get_shingles(text)
        signature = self._minhash_signature(shingles)
        self.documents[doc_id] = signature
        
        for band_idx, band in enumerate(self._get_bands(signature)):
            self.hash_tables[band_idx][band].append(doc_id)
    
    def find_duplicates(self, threshold: float) -> List[Tuple[str, str, float]]:
        """Find all pairs of documents with similarity above threshold."""
        candidate_pairs = set()
        
        for hash_table in self.hash_tables:
            for bucket in hash_table.values():
                if len(bucket) > 1:
                    for i in range(len(bucket)):
                        for j in range(i + 1, len(bucket)):
                            candidate_pairs.add(tuple(sorted([bucket[i], bucket[j]])))
        
        duplicates = []
        for doc1_id, doc2_id in candidate_pairs:
            similarity = self._jaccard_similarity(
                self.documents[doc1_id],
                self.documents[doc2_id]
            )
            if similarity >= threshold:
                duplicates.append((doc1_id, doc2_id, similarity))
                
        return duplicates
    
    def deduplicate(self, texts: List[str], threshold: float) -> List[str]:
        """Remove similar texts based on a given similarity threshold."""
        self.hash_tables = [defaultdict(list) for _ in range(self.num_bands)]
        self.documents = {}
        
        for i, text in enumerate(texts):
            self.add_document(str(i), text)
            
        duplicates = self.find_duplicates(threshold)
        
        to_remove = set()
        for doc1_id, doc2_id, _ in duplicates:
            to_remove.add(int(doc2_id))
        
        return [text for i, text in enumerate(texts) if i not in to_remove]

In [None]:
FILE_PATH="../data.csv"
COMPLAIN_COLUMN="crimeaditionalinfo"
df=pd.read_csv(FILE_PATH)[[COMPLAIN_COLUMN]]

# Remove exact duplicates
print("Length of df before removing exact duplicates: ",len(df))
df = remove_exact_duplicates(df,COMPLAIN_COLUMN)
print("Length of df after removing exact duplicates: ",len(df))

# Remove similar texts using MinHashLSH
minhash_lsh = MinHashLSH(num_perm=100, num_bands=20)
threshold = 0.9
unique_texts = minhash_lsh.deduplicate(df[COMPLAIN_COLUMN].tolist(), threshold)
df=pd.DataFrame(unique_texts,columns=[COMPLAIN_COLUMN])
print("Length of df after removing similar texts: ",len(df))
df.to_csv(FILE_PATH,index=False)