In [4]:
# NOTE: you CAN change this cell
# If you want to use your own database, download it here
# !gdown ...

!noglob gdown --fuzzy https://drive.google.com/file/d/1vC5tuPSR769P3n5iHkVTKL6yclJW0l5X/view?usp=sharing -O data-vn.csv

Downloading...
From: https://drive.google.com/uc?id=1vC5tuPSR769P3n5iHkVTKL6yclJW0l5X
To: /home/phuocho/Projects/advance_algorithm/data-vn.csv
100%|██████████████████████████████████████| 5.82M/5.82M [00:00<00:00, 75.7MB/s]


In [None]:
# NOTE: you CAN change this cell
# Add more to your needs
# you must place ALL pip install here
# !pip install pandas
!pip install Unidecode

In [3]:
# NOTE: you CAN change this cell
# import your library here
import time
import re
import heapq
import pandas as pd
from concurrent.futures import TimeoutError
from unidecode import unidecode


In [4]:
# NOTE: you MUST change this cell
# New methods / functions must be written under class Solution.

class Solution:
    
    def __init__(self):
        # list provice, district, ward for private test, do not change for any reason

        self.province_path = 'list_province.txt'
        self.district_path = 'list_district.txt'
        self.ward_path = 'list_ward.txt'

        # write your preprocess here, add more method if needed
        self.trie = TrieNode()

        cities_df = pd.read_csv('data-vn.csv', sep=',', header=0)
        cities_df.fillna('', inplace=True)
        data = cities_df[['combination', 'ward', 'district', 'province']].values.tolist()
        self.trie.build(data)

    def process(self, s: str):
      
        # try:
        return self.find_ans(s)
        # except OSError:
        #   return {"province": "", "district": "", "ward": ""}
        # except TimeoutError:
        #   return {"province": "", "district": "", "ward": ""}
        # except Exception as e:
        #   return {"province": "", "district": "", "ward": ""}

    def find_ans(self, s: str):
      # write your process string here
      raw_input = Input(s)
      processed_input = raw_input.clean_noise_input()

      t_start_search = time.perf_counter_ns()

      edit_distance = EditDistance(processed_input, t_start_search)
      areas_results = edit_distance.search(self.trie, 4)
      while not areas_results and processed_input:
        processed_input = remove_word_from_left(processed_input)
        edit_distance = EditDistance(processed_input, t_start_search)
        areas_results = edit_distance.search(self.trie, 4)
        

      if areas_results:
        province, district, ward = areas_results[0][1]
        return {
          "province": province,
          "district": district,
          "ward": ward,
        }
      return {"province": "", "district": "", "ward": ""}

def remove_word_from_left(input_string):
    return ' '.join(input_string.split()[1:])

class TrieNode:
    node_count = 0

    def __init__(self):
        self.word = None
        self.province = None
        self.district = None
        self.ward = None
        self.children = {}
        TrieNode.node_count += 1

    def insert(self, word_set):
        node = self
        word, ward, district, province = word_set
        reversed_word = unidecode(word[::-1].lower().replace('.', ''))

        for letter in reversed_word:
            if letter not in node.children:
                node.node_count += 1
                node.children[letter] = TrieNode()
            node = node.children[letter]

        node.word = word
        node.province = province
        node.district = district
        node.ward = ward

    def print(self, level=0):
        if self.word:
            print(f"{' ' * level}Word: {self.word}, Province: {self.province}, District: {self.district}, Ward: {self.ward}")

        for letter, child_node in self.children.items():
            print(f"{' ' * level}Letter: {letter}")
            child_node.print(level + 2)

    def build(self, word_sets):
        words_count = 0
        build_start_time = time.time()

        for word_set in word_sets:
            self.insert(word_set)
            words_count += 1

        print(f"Build {words_count} input into {TrieNode.node_count} nodes in {time.time() - build_start_time} seconds")


class EditDistance:
    def __init__(self, word, t_start_search):
        self.word = word
        self.word_length = len(word)
        self.word_reversed = word[::-1]
        self.current_row = range(self.word_length + 1)
        self.t_start_search = t_start_search

    def search(self, node, max_cost):
        results = []
        for letter in node.children:
            if self.search_recursive(node.children[letter], letter, self.word_reversed, self.current_row, results, max_cost) == 0:
              return results

        return results

    def _should_timeout(self):
        t = (time.perf_counter_ns() - self.t_start_search)  / 1_000_000_000
        return t > 0.0969

    def search_recursive(self, node, letter, word, previous_row, results, max_cost):
        if self._should_timeout():
            raise TimeoutError
        columns = len(word) + 1
        current_row = [previous_row[0] + 1]
        for column in range(1, columns):
            insert_cost = current_row[column - 1] + 1
            delete_cost = previous_row[column] + 1
            if word[column - 1] != letter:
                replace_cost = previous_row[column - 1] + 1
            else:
                replace_cost = previous_row[column - 1]
            current_row.append(min(insert_cost, delete_cost, replace_cost))

        # if current_row[-1] <= max_cost and node.word is not None:
        #   if not results or current_row[-1] < results[0][0]:
        #       results.clear()
        #       results.append((current_row[-1], (node.province, node.district, node.ward)))
        if current_row[-1] == 0 and node.word is not None:
            heapq.heappush(results, (current_row[-1], (node.province, node.district, node.ward)))
            return 0

        if current_row[-1] <= max_cost and node.word is not None:
            heapq.heappush(results, (current_row[-1], (node.province, node.district, node.ward)))

        if min(current_row) <= max_cost:
            for letter in node.children:
                if self.search_recursive(node.children[letter], letter, word, current_row, results, max_cost) == 0:
                  return 0

        # return results
        return min(current_row)


class Input:
    def __init__(self, input_str):
        self.input_str = input_str
        self.special_chars = [';', ':', '!', "*", "(", ")", "[", "]", "{", "}", "\\", "|", "/",
                              "<", ">", "?", "@", "#", "$", "%", "^", "&", "_", "=", "+", "~", "`", "-", ","]

        self.unit_strings = ['Thành Phố', 'Thành Fhố', 'Thành Phô', 'T Phố', 'T. Phố', 'T. Phw', 'Tỉnh', 'Tỉnwh',
                             'Quận', 'Huyện', 'Huyen', 'Huyên', 'Hyện', 'HZuyện',
                             'Thị Xã', 'Thị Trấn', 'Phường', 'Xã', 'T. P ', 'TP.', 'TP ', 'TT ', 'TX ', 'Tt.', 'T. X.',
                             'TTr. ', 'T. X. ', 'TX.',
                             'Tp ', 'Tp. ', 'Tt ', 'Tx ', 'T.', 'Q.', 'H.', 'P.', ' P ', 'X.', 'G.', ' H ', ' X ',
                             ' T ', ' Q ', 'F']

        self.cut_strings = ['Số Nhà', 'Số ', 'Ngõ ', 'Ngách ', 'Thôn ', 'Ấp ',
                            'Tổ ', 'Khu Phố', 'Khu ', 'TDP', 'DTM', 'Cụm', 'ĐTM']
        self.raw_input = []

    def add_space_before_uppercase(self):
        result = ''
        for i, char in enumerate(self.input_str):
            if i != 0 and char.isupper() and self.input_str[i - 1] != ' ' and not self.input_str[i - 1].isupper():
                result += ' ' + char
            else:
                result += char
        self.input_str = result

    def capitalize_first_letters(self):
        words = self.input_str.split(' ')
        capitalized_words = [word[:1].upper() + word[1:] for word in words]
        self.input_str = ' '.join(capitalized_words)

    def remove_inner_numbers(self):
        words = self.input_str.split()
        result = []
        for word in words:
            cleaned_word = re.sub(r'(?<=\D)\d+(?=\D)', '', word)
            result.append(cleaned_word)
        self.input_str = ' '.join(result)
        if re.search(r'\d+', self.input_str):
            if int(re.search(r'\d+', self.input_str).group(0)) > 21:
                digit = re.search(r'\d+', self.input_str).group(0)
                self.input_str = self.input_str[self.input_str.find(digit) + len(digit) + 1:]
                self.input_str = self.input_str[self.input_str.find(' ') + 1:]

    def clean_noise_input(self):
        words = [word.strip() for word in self.input_str.split(',')]
        self.input_str = ', '.join([word for word in words[-3:]])

        for char in self.special_chars:
            self.input_str = self.input_str.replace(char, '')
        # phước sửa
        # self.input_str = ''.join(char for char in self.input_str if char not in self.special_chars)

        self.remove_inner_numbers()
        self.capitalize_first_letters()
        self.add_space_before_uppercase()

        pattern = re.compile(r'.*?((?<!\w)P\.|Phường .+|Thị Trấn .+|TT .+|TT\.|(?<!Thị )Xã.+|(?<![T,. ])X\.+)')

        # Search for any of the combined patterns
        match = pattern.search(self.input_str)
        if match:
            self.input_str = self.input_str[match.start(1):]

        for string in self.unit_strings:
            self.input_str = self.input_str.replace(string, ' ')

        for string in self.cut_strings:
            if string in self.input_str:
                self.input_str = self.input_str[self.input_str.find(string) + len(string) + 1:]
                self.input_str = self.input_str[self.input_str.find(' ') + 1:]

        while '  ' in self.input_str:
            self.input_str = self.input_str.replace('  ', ' ')

        self.input_str = self.input_str.replace('. ', '.')

        self.input_str = self.input_str.strip().strip('.')

        # return self.input_str
        return  unidecode(self.input_str.lower())

In [5]:
# NOTE: DO NOT change this cell
# This cell is for downloading private test
# !rm -rf test.json
# this link is public test
# !gdown --fuzzy https://drive.google.com/file/d/1PBt3U9I3EH885CDhcXspebyKI5Vw6uLB/view?usp=sharing -O test.json

In [6]:
# NOTE: DO NOT change this cell
# This cell is for scoring

TEAM_NAME = 'Group4'  # This should be your team name
EXCEL_FILE = f'{TEAM_NAME}.xlsx'

import json
import time
with open('test.json') as f:
    data = json.load(f)

summary_only = True
df = []
solution = Solution()
timer = []
correct = 0
for test_idx, data_point in enumerate(data):
    address = data_point["text"]

    ok = 0
    try:
        start = time.perf_counter_ns()
        result = solution.process(address)
        answer = data_point["result"]
        finish = time.perf_counter_ns()
        timer.append(finish - start)
        ok += int(answer["province"] == result["province"])
        ok += int(answer["district"] == result["district"])
        ok += int(answer["ward"] == result["ward"])
        df.append([
            test_idx,
            address,
            answer["province"],
            result["province"],
            int(answer["province"] == result["province"]),
            answer["district"],
            result["district"],
            int(answer["district"] == result["district"]),
            answer["ward"],
            result["ward"],
            int(answer["ward"] == result["ward"]),
            ok,
            timer[-1] / 1_000_000_000,
        ])
    except Exception as e:
        df.append([
            test_idx,
            address,
            answer["province"],
            "EXCEPTION",
            0,
            answer["district"],
            "EXCEPTION",
            0,
            answer["ward"],
            "EXCEPTION",
            0,
            0,
            0,
        ])
        # any failure count as a zero correct
        pass
    correct += ok


    if not summary_only:
        # responsive stuff
        print(f"Test {test_idx:5d}/{len(data):5d}")
        print(f"Correct: {ok}/3")
        print(f"Time Executed: {timer[-1] / 1_000_000_000:.4f}")


print(f"-"*30)
total = len(data) * 3
score_scale_10 = round(correct / total * 10, 2)
if len(timer) == 0:
    timer = [0]
max_time_sec = round(max(timer) / 1_000_000_000, 4)
avg_time_sec = round((sum(timer) / len(timer)) / 1_000_000_000, 4)

import pandas as pd

df2 = pd.DataFrame(
    [[correct, total, score_scale_10, max_time_sec, avg_time_sec]],
    columns=['correct', 'total', 'score / 10', 'max_time_sec', 'avg_time_sec',],
)

columns = [
    'ID',
    'text',
    'province',
    'province_student',
    'province_correct',
    'district',
    'district_student',
    'district_correct',
    'ward',
    'ward_student',
    'ward_correct',
    'total_correct',
    'time_sec',
]

df = pd.DataFrame(df)
df.columns = columns

print(f'{TEAM_NAME = }')
print(f'{EXCEL_FILE = }')
print(df2)

!pip install xlsxwriter
writer = pd.ExcelWriter(EXCEL_FILE, engine='xlsxwriter')
df2.to_excel(writer, index=False, sheet_name='summary')
df.to_excel(writer, index=False, sheet_name='details')
writer.close()


Build 79662 input into 635801 nodes in 3.695081949234009 seconds
------------------------------
TEAM_NAME = 'Group4'
EXCEL_FILE = 'Group4.xlsx'
   correct  total  score / 10  max_time_sec  avg_time_sec
0     1151   1350        8.53        0.0947        0.0271
