#### file locations

In [30]:
import sys

# dictionary file path
dictionary_filepath = 'resources/curated_word_list.txt'

# slangification map 
# slangCharMap()
slangification_map_filepath = 'resources/slangification_map.txt'

# input file containing username pair to match
input_filename = 'data/test.csv'

# output file
output_filename = 'output/sim_score.csv'

#### set minimum length of dictionary word to consider as a token (English word/Token)

In [2]:
# configurable
min_length = 3

#### read word list from dictionary
##### all words in the dictionary are already in lowercase

In [45]:
word_list = set();
with open(dictionary_filepath) as file:
    lines = file.readlines()

    for line in lines:
        word_list.add(line.strip())

print('number of words/phrases in the dictionary', len(word_list))

number of words/phrases in the dictionary 46019


#### read slangCharMap

In [47]:
with open(slangification_map_filepath, 'r') as f:
    lines = [list(line.strip()) for line in f.readlines()]

slangification_map = {}
for line in lines:
    
    slangification_map[line[0]] = line[1:]

    
print('slanCharMap', slangification_map)

slanCharMap {'0': ['o', 'd'], '1': ['i', 'l', 't', 'j'], '2': ['z'], '3': ['e', 's'], '4': ['a', 'r'], '5': ['s'], '6': ['g'], '7': ['t', 'l', 'j', 'r'], '8': ['b'], '9': ['g'], '@': ['a'], '!': ['i'], '$': ['s']}


#### create alphabet list

In [48]:
import string

alphabet_lower = list(string.ascii_lowercase)
alphabet_upper = list(string.ascii_uppercase)
digits = list(string.digits)
slang_alphabet = list(slangification_map.keys())

print(alphabet_lower[:5], alphabet_upper[:5], digits[:5], slang_alphabet)

['a', 'b', 'c', 'd', 'e'] ['A', 'B', 'C', 'D', 'E'] ['0', '1', '2', '3', '4'] ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '@', '!', '$']


#### Weighted Interval Scheduling (https://github.com/aladdinpersson/Algorithms-Collection-Python)

In [49]:
import bisect


class WeightedIntervalScheduling(object):
    def __init__(self, I):
        self.I = sorted(I, key=lambda tup: tup[1])  # (key = lambda tup : tup[1])
        self.OPT = []
        self.solution = []

    def previous_intervals(self):
        start = [task[0] for task in self.I]
        finish = [task[1] for task in self.I]
        p = []

        for i in range(len(self.I)):
            # finds idx for which to input start[i] in finish times to still be sorted
            idx = bisect.bisect(finish, start[i]) - 1
            p.append(idx)

        return p

    def find_solution(self, j):
        if j == -1:
            return

        else:
            if (self.I[j][2] + self.compute_opt(self.p[j])) > self.compute_opt(j - 1):
                self.solution.append(self.I[j])
                self.find_solution(self.p[j])

            else:
                self.find_solution(j - 1)

    def compute_opt(self, j):
        if j == -1:
            return 0

        elif (0 <= j) and (j < len(self.OPT)):
            return self.OPT[j]

        else:
            return max(
                self.I[j][2] + self.compute_opt(self.p[j]), self.compute_opt(j - 1)
            )

    def weighted_interval(self):
        if len(self.I) == 0:
            return 0, self.solution

        self.p = self.previous_intervals()

        for j in range(len(self.I)):
            opt_j = self.compute_opt(j)
            self.OPT.append(opt_j)

        self.find_solution(len(self.I) - 1)

        return self.OPT[-1], self.solution[::-1]

#### find meaningful words/name phrases from the dictionary

In [50]:
"""
Input: username
Output: 
    dict_chunk_count: count of dictionary chunks
    coverage: sum of the length of the dictionary chunks only
    chunks: all chunks including dictionary chunks
"""
def get_dict_chunks(username):

    
    n = len(username)
    next_list = [(-1, -1, -1) for _ in range(n)]
    

    i = n-1
    while i >= 0:

        next_list[i] = (i, i + 1, 1)
        

        if i + min_length <=n and username[i] in alphabet_lower:
            
            local_optimal = 0
            
            curr_length = min_length
            for end in range(i + min_length -1, n):
                
                if username[i: i+curr_length] in word_list:
                    

                    next_length = 0
                    if i + curr_length < n:
                        
                        next_end = next_list[i + curr_length][1]
                        next_length = next_end - (i+curr_length)
                    
                    if curr_length + next_length >= local_optimal:
                        local_optimal = curr_length + next_length
                        next_list[i] = (i, i + curr_length, curr_length)
                
                curr_length += 1

        i = i-1

        
    weightedinterval = WeightedIntervalScheduling(next_list)
    _, chunks = weightedinterval.weighted_interval()
    
    
    dict_chunks = [chunk[2] for chunk in chunks if chunk[2] > 1]
    dict_chunk_count = len(dict_chunks)
    coverage = sum(dict_chunks)
    
    return (dict_chunk_count, chunks, coverage)

#### generate all possible transformation using slangCharMap()

In [51]:
def slangify(username):
    
    slang_chars_found = []
    for char in slangification_map:
        
        if char in username:
            slang_chars_found.append(char)
    
    transformed_usernames = []
    if len(slang_chars_found):
        
        transformed_usernames.append(username)
        for slang_char in slang_chars_found:
            
            new_transformed_usernames = []
            replace_chars = slangification_map[slang_char]
            for replace_char in replace_chars:
                
                for transformed_username in transformed_usernames:
                    new_transformed_usernames.append(\
                        transformed_username.replace(slang_char, \
                            replace_char))
            transformed_usernames = new_transformed_usernames
            
    return set(transformed_usernames)

#### TokenBasedChunkification

In [52]:
"""
preprocess: we convert username in lowercase, discard unrelated characters
               unrelated means outside set(alphabet_lower + slang_alphabet)

Input: username
Output: chunkification

def: get_dict_chunks(username): (dict_chunk_count, chunks, coverage)
"""

def get_dict_token_based_chunkification(username):
    
    if username.isnumeric():
        return []
    
    preprocess = lambda uname : ''.join([char for char in list(uname.lower()) if char in set(alphabet_lower + slang_alphabet)])
    username = preprocess(username)
    
    chunkification_no_slang = get_dict_chunks(username)
    token_based_chunkification = [username[chunk[0]: chunk[1]] \
             for chunk in chunkification_no_slang[1] ]
    
    slangified_usernames = slangify(username)
    opt_chunkification_slang = chunkification_no_slang 
    opt_slangified_username = None
    if len(slangified_usernames):
        
        for slangified_username in slangified_usernames:
            
            chunkification_slang = get_dict_chunks(slangified_username)
            
            optimal_cond = \
                (chunkification_slang[2] > opt_chunkification_slang[2]) or \
                (chunkification_slang[2] == opt_chunkification_slang[2] and \
                    chunkification_slang[0] < opt_chunkification_slang[0]) 
            
            if  optimal_cond:
                opt_chunkification_slang = chunkification_slang
                opt_slangified_username = slangified_username
        
        if opt_slangified_username:
            token_based_chunkification = \
            [opt_slangified_username[chunk[0]: chunk[1]] \
                 if chunk[2] > 1 else username[chunk[0]: chunk[1]] \
                     for chunk in opt_chunkification_slang[1] ]
    
    return [token for token in token_based_chunkification if len(token) >= min_length]

#### DigitBasedChunkification

In [53]:

import re

def get_digit_based_chunkification(username):
    
    username = username.lower()
    letter_groups = re.findall("([^0-9]+)", username)
    digit_groups = re.findall("([0-9]+)", username)
    
    letter_groups = [ chunk.strip() for chunk in letter_groups ]
    digit_groups = [ chunk.strip() for chunk in digit_groups ]
    
    return (letter_groups, digit_groups)

#### SymbolBasedChunkification

In [54]:
import re

def get_symbol_based_chunkification(username):
    
    username = username.lower()
    chunks = re.split('[^a-zA-Z0-9]+', username) 
    
    return [ chunk for chunk in chunks if len(chunk) > 0]

#### CapitalizationBasedChunkification

In [55]:
def get_capital_letter_based_chunkification(username):
    
    if username.islower():
        return []
    
    start = 0
    chunks = []
    for i in range(1, len(username)):
        
        if username[i] in alphabet_upper:
            chunks.append(username[start:i].lower().strip())
            start = i
    
    chunks.append(username[start:len(username)].lower())
    
    return chunks

#### findUsernameWithoutSymbolDigit

In [56]:
import re

def get_username_without_symbol_digit(username):
    
    username = username.lower()
    chunks = re.split('[^a-z]+', username)
    
    return ''.join(chunks)

#### getChunkListLength (|L|)

In [58]:
def get_chunkification_length(chunks):
    
    return sum([len(chunk) for chunk in chunks])

#### function to apply for creating column of list attributes

In [59]:
def create_column_value_for_list_attrs(list_vals):
    
    return ','.join(list_vals)

#### new columns

In [60]:
lower_1, lower_2 = 'lower_1', 'lower_2'
w_o_sym_dig_1, w_o_sym_dig_2 = 'w_o_sym_dig_1', 'w_o_sym_dig_2'
cap_ltr_1, cap_ltr_2 = 'cap_ltr_1', 'cap_ltr_2'
sym_1, sym_2 = 'sym_1', 'sym_2'
dig_1, dig_2 = 'dig_1', 'dig_2'
dict_1, dict_2 = 'dict_1', 'dict_2'

sim_score = 'sim_score'
username_1, username_2 = 'username_1', 'username_2'

#### calculate similarity

In [61]:
from py_stringmatching.similarity_measure.levenshtein import Levenshtein
from py_stringmatching.similarity_measure.monge_elkan import MongeElkan
from py_stringmatching.similarity_measure.jaccard import Jaccard

def get_similarity_score(pair_row):
    
    scores = []
    monge_elkan_method = MongeElkan(sim_func=Levenshtein().get_sim_score)
    levenshtein_method = Levenshtein()
    
    ########### username_lower
    src_lower_len, target_lower_len = len(pair_row[lower_1]), len(pair_row[lower_2])
    if src_lower_len == 0 or target_lower_len == 0:
        score_on_username_lower = 0.0
    
    else:
        score_on_username_lower = levenshtein_method.get_sim_score(pair_row[lower_1], 
                                                              pair_row[lower_2])
    scores.append(score_on_username_lower)
    
    ########### without_sym_dig
    src_feature_len, target_feature_len = len(pair_row[w_o_sym_dig_1]), len(pair_row[w_o_sym_dig_2])
    if src_feature_len == 0 or target_feature_len == 0:
        score_on_without_sym_dig = 0.0
    
    else:
        score_on_without_sym_dig = levenshtein_method.get_sim_score(pair_row[w_o_sym_dig_1], 
                                                              pair_row[w_o_sym_dig_2])
        
#         weight = (src_feature_len + target_feature_len)/(src_lower_len + target_lower_len)
#         score_on_without_sym_dig = score_on_without_sym_dig * weight
        
    scores.append(score_on_without_sym_dig)
    
    ########### cap_chunks
    chunk_list_1, chunk_list_2 = pair_row[cap_ltr_1].split(','), pair_row[cap_ltr_2].split(',')
    src_feature_len, target_feature_len = get_chunkification_length(chunk_list_1), \
                                            get_chunkification_length(chunk_list_2)
    
    if src_feature_len == 0 or target_feature_len == 0:
        
        score_on_cap_chunks = 0.0
    else:
        if len(chunk_list_1) < len(chunk_list_2):
            chunk_list_1, chunk_list_2 = chunk_list_2, chunk_list_1
        score_on_cap_chunks = Jaccard().\
                                    get_raw_score(chunk_list_1, chunk_list_2)
        
#         weight = (src_feature_len + target_feature_len)/(src_lower_len + target_lower_len)
#         score_on_cap_chunks = score_on_cap_chunks * weight
        
    scores.append(score_on_cap_chunks)
    
    ########### sym_chunks
    chunk_list_1, chunk_list_2 = pair_row[sym_1].split(','), pair_row[sym_2].split(',')
    src_feature_len, target_feature_len = get_chunkification_length(chunk_list_1), \
                                            get_chunkification_length(chunk_list_2)
        
    if src_feature_len == 0 or target_feature_len == 0:
        
        score_on_sym_chunks = 0.0
    else:
        if len(chunk_list_1) < len(chunk_list_2):
            chunk_list_1, chunk_list_2 = chunk_list_2, chunk_list_1
        score_on_sym_chunks = monge_elkan_method.get_raw_score(chunk_list_1, chunk_list_2)
        
#         weight = (src_feature_len + target_feature_len)/(src_lower_len + target_lower_len)
#         score_on_sym_chunks = score_on_sym_chunks * weight
        
    scores.append(score_on_sym_chunks)
    
    ########### dig_chunks
    chunk_list_1, chunk_list_2 = pair_row[dig_1].split(','), pair_row[dig_2].split(',')
    src_feature_len, target_feature_len = get_chunkification_length(chunk_list_1), \
                                            get_chunkification_length(chunk_list_2)
        
    if src_feature_len == 0 or target_feature_len == 0:
        
        score_on_dig_chunks = 0.0
    else:
        if len(chunk_list_1) < len(chunk_list_2):
            chunk_list_1, chunk_list_2 = chunk_list_2, chunk_list_1
        score_on_dig_chunks = monge_elkan_method.get_raw_score(chunk_list_1, chunk_list_2)
        
#         weight = (src_feature_len + target_feature_len)/(src_lower_len + target_lower_len)
#         score_on_dig_chunks = score_on_dig_chunks * weight
        
    scores.append(score_on_dig_chunks)
    
    
    ########### dict_token_chunks
    chunk_list_1, chunk_list_2 = pair_row[dict_1].split(','), pair_row[dict_2].split(',')
    src_feature_len, target_feature_len = get_chunkification_length(chunk_list_1), \
                                            get_chunkification_length(chunk_list_2)
        
    if src_feature_len == 0 or target_feature_len == 0:
        
        score_on_dict_token_chunks = 0.0
    else:
        if len(chunk_list_1) < len(chunk_list_2):
            chunk_list_1, chunk_list_2 = chunk_list_2, chunk_list_1
        score_on_dict_token_chunks = monge_elkan_method.get_raw_score(chunk_list_1, chunk_list_2)
        
        weight = (src_feature_len + target_feature_len)/(src_lower_len + target_lower_len)
        score_on_dict_token_chunks = score_on_dict_token_chunks * weight
        
    scores.append(score_on_dict_token_chunks)
    
#     print(scores)
    
    return max(scores)

#### creating features

In [62]:
import glob
import pandas as pd

df = pd.read_csv(input_filename, names=[username_1, username_2])
print('total number of usernames', df.shape)

df[username_1] = df[username_1].astype(str)
df[username_2] = df[username_2].astype(str)

print('a few of the username pairs')
print(df.head(5))
print()

#### create features

# findLower
df[lower_1] = df.apply(lambda row: row.username_1.lower(), axis = 1)
df[lower_2] = df.apply(lambda row: row.username_2.lower(), axis = 1)

# findUsernameWithoutSymbolDigit
df[w_o_sym_dig_1] = df.apply(lambda row: get_username_without_symbol_digit(row.username_1), axis = 1)
df[w_o_sym_dig_2] = df.apply(lambda row: get_username_without_symbol_digit(row.username_2), axis = 1)

# findCapitalLetterBasedChunkification
df[cap_ltr_1] = df.apply(lambda row: create_column_value_for_list_attrs(\
                                        get_capital_letter_based_chunkification(row.username_1)), axis = 1)
df[cap_ltr_2] = df.apply(lambda row: create_column_value_for_list_attrs(\
                                        get_capital_letter_based_chunkification(row.username_2)), axis = 1)


# findSymbolBasedChunkification        
df[sym_1] = df.apply(lambda row: create_column_value_for_list_attrs(\
                                        get_symbol_based_chunkification(row.username_1)), axis = 1)
df[sym_2] = df.apply(lambda row: create_column_value_for_list_attrs(\
                                        get_symbol_based_chunkification(row.username_2)), axis = 1)

# findDigitBasedChunkification
df[dig_1] = df.apply(lambda row: create_column_value_for_list_attrs(\
                                        get_digit_based_chunkification(row.username_1)[0]), axis = 1)
df[dig_2] = df.apply(lambda row: create_column_value_for_list_attrs(\
                                        get_digit_based_chunkification(row.username_2)[0]), axis = 1)

# findDictionaryTokenBasedChunkification        
df[dict_1] = df.apply(lambda row: create_column_value_for_list_attrs(\
                                        get_dict_token_based_chunkification(row.username_1)), axis = 1)
df[dict_2] = df.apply(lambda row: create_column_value_for_list_attrs(\
                                        get_dict_token_based_chunkification(row.username_2)), axis = 1)

# find similarity
df[sim_score] = df.apply(lambda row: get_similarity_score(row), axis = 1)
df[sim_score] = df[sim_score].round(3)

print("matching is successful")

total number of usernames (48, 2)
a few of the username pairs
    username_1    username_2
0  0bscurec0de  ObscureCoder
1     0v3rride      override
2     0verl0ad      0verload
3    10ca1h0st     localh0st
4      3ndG4me       EndGam3

matching is successful


#### filtering columns to write in file

In [63]:
df = df[[username_1, username_2, sim_score]]

In [64]:
df.values.tolist()[:5]

[['0bscurec0de', 'ObscureCoder', 0.9],
 ['0v3rride', 'override', 1.0],
 ['0verl0ad', '0verload', 1.0],
 ['10ca1h0st', 'localh0st', 1.0],
 ['3ndG4me', 'EndGam3', 1.0]]

In [65]:
df.to_csv(output_filename, index = False, header = True)