In [1]:
# NOTE: you CAN change this cell
# If you want to use your own database, download it here
# !gdown ...
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
# NOTE: you CAN change this cell
# Add more to your needs
# you must place ALL pip install here
!pip install rapidfuzz

Collecting rapidfuzz
  Downloading rapidfuzz-3.14.1-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (12 kB)
Downloading rapidfuzz-3.14.1-cp312-cp312-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (3.2 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m3.2/3.2 MB[0m [31m34.8 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: rapidfuzz
Successfully installed rapidfuzz-3.14.1


In [3]:
# NOTE: you CAN change this cell
# import your library here
import time
import re
import json
import unicodedata
from collections import defaultdict
from rapidfuzz import fuzz, process

# Algorithm

In [6]:
class Solution:
    def __init__(self):
        base_path = '/content/drive/MyDrive/Test/'
        self.province_path = f'{base_path}/list_province.txt'
        self.district_path = f'{base_path}/list_district.txt'
        self.ward_path = f'{base_path}/list_ward.txt'
        self.master_json_path = f'{base_path}/dvhcvn.json'

        self.threshold = 88

        print("Initializing Solution...")
        start_time = time.time()
        self.provinces = self._load_valid_list(self.province_path)
        self.districts = self._load_valid_list(self.district_path)
        self.wards = self._load_valid_list(self.ward_path)
        self._build_address_map_and_lookup()
        print(f"Solution initialized in {time.time() - start_time:.2f}s.")

    def _clean_name_prefix(self, name: str) -> str:
        if not name: return ""
        prefixes = ["Thành phố", "Tỉnh", "Thị xã", "Huyện", "Quận", "Phường", "Xã", "Thị trấn", "TP"]
        prefixes.sort(key=len, reverse=True)
        name_lower = name.lower()
        for prefix in prefixes:
            if name_lower.startswith(prefix.lower() + ' '): return name[len(prefix):].strip()
        return name.strip()

    def _normalize(self, s: str) -> str:
        if not s: return ""
        s = s.lower()
        s = re.sub(r'd\.', 'd', s)
        s = re.sub(r'\b(tp|thanh pho|tinh)\b', ' Tỉnh ', s)
        s = re.sub(r'\b(q|quan|h|huyen|tx|thi xa)\b', ' Huyện ', s)
        s = re.sub(r'\b(p|phuong|x|xa|tt|thi tran)\b', ' Xã ', s)
        s = unicodedata.normalize('NFD', s)
        s = ''.join(ch for ch in s if unicodedata.category(ch) != 'Mn')
        s = unicodedata.normalize('NFC', s)
        s = re.sub(r'[^a-z0-9\s]', '', s)
        s = re.sub(r'\s+', ' ', s).strip()
        return s

    def _load_valid_list(self, file_path):
        try:
            with open(file_path, 'r', encoding='utf-8') as f:
                lines = [self._clean_name_prefix(line.strip()) for line in f if line.strip()]
                return {self._normalize(line): line for line in lines}
        except FileNotFoundError: return {}

    def _build_address_map_and_lookup(self):
        with open(self.master_json_path, 'r', encoding='utf-8') as f:
            full_data = json.load(f)['data']
        self.address_map = {}
        self.district_to_province = {}
        self.ward_to_district = {}
        for p_data in full_data:
            p_norm = self._normalize(self._clean_name_prefix(p_data['name']))
            if p_norm not in self.address_map: self.address_map[p_norm] = {}
            for d_data in p_data.get('level2s', []):
                d_norm = self._normalize(self._clean_name_prefix(d_data['name']))
                self.district_to_province[d_norm] = p_norm
                if d_norm not in self.address_map[p_norm]: self.address_map[p_norm][d_norm] = set()
                for w_data in d_data.get('level3s', []):
                    w_norm = self._normalize(self._clean_name_prefix(w_data['name']))
                    self.address_map[p_norm][d_norm].add(w_norm)
                    if w_norm not in self.ward_to_district: self.ward_to_district[w_norm] = w_norm

    def _urban_handler(self, normalized_ocr, p_cand):
        d_match = re.search(r'huyen\s+(\d+)', normalized_ocr)
        w_match = re.search(r'xa\s+(\d+)', normalized_ocr)
        d_num = d_match.group(1) if d_match else None
        w_num = w_match.group(1) if w_match else None

        if p_cand and d_num and d_num in self.address_map.get(p_cand, {}):
            w_result = w_num if w_num and w_num in self.address_map.get(p_cand, {}).get(d_num, set()) else None
            return {'p': p_cand, 'd': d_num, 'w': w_result, 'score': 300}
        return None

    def process(self, s: str):
        if not s.strip(): return {"province": "", "district": "", "ward": ""}
        normalized_ocr = self._normalize(s)
        if not normalized_ocr: return {"province": "", "district": "", "ward": ""}

        hypotheses = []
        p_candidates = process.extract(normalized_ocr, self.provinces.keys(), scorer=fuzz.WRatio, limit=3, score_cutoff=self.threshold - 5)

        # Tinh > Huyen > Xa
        for p_cand, p_score, _ in p_candidates:
            urban_res = self._urban_handler(normalized_ocr, p_cand)
            if urban_res: hypotheses.append(urban_res)

            hypotheses.append({'p': p_cand, 'd': None, 'w': None, 'score': p_score})

            ocr_after_p = normalized_ocr.replace(p_cand, ' ', 1)
            valid_districts = list(self.address_map.get(p_cand, {}).keys())
            if not valid_districts: continue

            d_candidates = process.extract(ocr_after_p, valid_districts, scorer=fuzz.WRatio, limit=2, score_cutoff=self.threshold)
            for d_cand, d_score, _ in d_candidates:
                hypotheses.append({'p': p_cand, 'd': d_cand, 'w': None, 'score': p_score + d_score})

                ocr_after_d = ocr_after_p.replace(d_cand, ' ', 1)
                valid_wards = list(self.address_map.get(p_cand, {}).get(d_cand, set()))
                if not valid_wards: continue

                w_match = process.extractOne(ocr_after_d, valid_wards, scorer=fuzz.partial_ratio, score_cutoff=90)
                if w_match:
                    w_cand, w_score = w_match[0], w_match[1]
                    hypotheses.append({'p': p_cand, 'd': d_cand, 'w': w_cand, 'score': (p_score + d_score + w_score) * 1.2})

        # Huyen > Xa
        if not hypotheses or max(h.get('score', 0) for h in hypotheses) < 180:
            d_candidates = process.extract(normalized_ocr, self.districts.keys(), scorer=fuzz.WRatio, limit=2, score_cutoff=self.threshold)
            for d_cand, d_score, _ in d_candidates:
                p_inferred = self.district_to_province.get(d_cand)
                p_score_inferred = fuzz.partial_ratio(normalized_ocr, p_inferred)
                if p_score_inferred > 70:
                    hypotheses.append({'p': p_inferred, 'd': d_cand, 'w': None, 'score': d_score + p_score_inferred})

        if not hypotheses: return {"province": "", "district": "", "ward": ""}

        # Chon kq
        best_hypo = max(hypotheses, key=lambda x: x.get('score', 0))

        return {
            "province": self.provinces.get(best_hypo.get('p'), ""),
            "district": self.districts.get(best_hypo.get('d'), ""),
            "ward": self.wards.get(best_hypo.get('w'), "")
        }

# Run

In [7]:
# CORRECT TESTS
groups_province = {}
groups_district = {'hòa bình': ['Hoà Bình', 'Hòa Bình'], 'kbang': ['Kbang', 'KBang'], 'quy nhơn': ['Qui Nhơn', 'Quy Nhơn']}
groups_ward = {'ái nghĩa': ['ái Nghĩa', 'Ái Nghĩa'], 'ái quốc': ['ái Quốc', 'Ái Quốc'], 'ái thượng': ['ái Thượng', 'Ái Thượng'], 'ái tử': ['ái Tử', 'Ái Tử'], 'ấm hạ': ['ấm Hạ', 'Ấm Hạ'], 'an ấp': ['An ấp', 'An Ấp'], 'ẳng cang': ['ẳng Cang', 'Ẳng Cang'], 'ẳng nưa': ['ẳng Nưa', 'Ẳng Nưa'], 'ẳng tở': ['ẳng Tở', 'Ẳng Tở'], 'an hòa': ['An Hoà', 'An Hòa'], 'ayun': ['Ayun', 'AYun'], 'bắc ái': ['Bắc ái', 'Bắc Ái'], 'bảo ái': ['Bảo ái', 'Bảo Ái'], 'bình hòa': ['Bình Hoà', 'Bình Hòa'], 'châu ổ': ['Châu ổ', 'Châu Ổ'], 'chư á': ['Chư á', 'Chư Á'], 'chư rcăm': ['Chư Rcăm', 'Chư RCăm'], 'cộng hòa': ['Cộng Hoà', 'Cộng Hòa'], 'cò nòi': ['Cò  Nòi', 'Cò Nòi'], 'đại ân 2': ['Đại Ân  2', 'Đại Ân 2'], 'đak ơ': ['Đak ơ', 'Đak Ơ'], "đạ m'ri": ["Đạ M'ri", "Đạ M'Ri"], 'đông hòa': ['Đông Hoà', 'Đông Hòa'], 'đồng ích': ['Đồng ích', 'Đồng Ích'], 'hải châu i': ['Hải Châu  I', 'Hải Châu I'], 'hải hòa': ['Hải Hoà', 'Hải Hòa'], 'hành tín đông': ['Hành Tín  Đông', 'Hành Tín Đông'], 'hiệp hòa': ['Hiệp Hoà', 'Hiệp Hòa'], 'hòa bắc': ['Hoà Bắc', 'Hòa Bắc'], 'hòa bình': ['Hoà Bình', 'Hòa Bình'], 'hòa châu': ['Hoà Châu', 'Hòa Châu'], 'hòa hải': ['Hoà Hải', 'Hòa Hải'], 'hòa hiệp trung': ['Hoà Hiệp Trung', 'Hòa Hiệp Trung'], 'hòa liên': ['Hoà Liên', 'Hòa Liên'], 'hòa lộc': ['Hoà Lộc', 'Hòa Lộc'], 'hòa lợi': ['Hoà Lợi', 'Hòa Lợi'], 'hòa long': ['Hoà Long', 'Hòa Long'], 'hòa mạc': ['Hoà Mạc', 'Hòa Mạc'], 'hòa minh': ['Hoà Minh', 'Hòa Minh'], 'hòa mỹ': ['Hoà Mỹ', 'Hòa Mỹ'], 'hòa phát': ['Hoà Phát', 'Hòa Phát'], 'hòa phong': ['Hoà Phong', 'Hòa Phong'], 'hòa phú': ['Hoà Phú', 'Hòa Phú'], 'hòa phước': ['Hoà Phước', 'Hòa Phước'], 'hòa sơn': ['Hoà Sơn', 'Hòa Sơn'], 'hòa tân': ['Hoà Tân', 'Hòa Tân'], 'hòa thuận': ['Hoà Thuận', 'Hòa Thuận'], 'hòa tiến': ['Hoà Tiến', 'Hòa Tiến'], 'hòa trạch': ['Hoà Trạch', 'Hòa Trạch'], 'hòa vinh': ['Hoà Vinh', 'Hòa Vinh'], 'hương hòa': ['Hương Hoà', 'Hương Hòa'], 'ích hậu': ['ích Hậu', 'Ích Hậu'], 'ít ong': ['ít Ong', 'Ít Ong'], 'khánh hòa': ['Khánh Hoà', 'Khánh Hòa'], 'krông á': ['Krông Á', 'KRông á'], 'lộc hòa': ['Lộc Hoà', 'Lộc Hòa'], 'minh hòa': ['Minh Hoà', 'Minh Hòa'], 'mường ải': ['Mường ải', 'Mường Ải'], 'mường ẳng': ['Mường ẳng', 'Mường Ẳng'], 'nậm ét': ['Nậm ét', 'Nậm Ét'], 'nam hòa': ['Nam Hoà', 'Nam Hòa'], 'na ư': ['Na ư', 'Na Ư'], 'ngã sáu': ['Ngã sáu', 'Ngã Sáu'], 'nghi hòa': ['Nghi Hoà', 'Nghi Hòa'], 'nguyễn úy': ['Nguyễn Uý', 'Nguyễn úy', 'Nguyễn Úy'], 'nhân hòa': ['Nhân Hoà', 'Nhân Hòa'], 'nhơn hòa': ['Nhơn Hoà', 'Nhơn Hòa'], 'nhơn nghĩa a': ['Nhơn nghĩa A', 'Nhơn Nghĩa A'], 'phúc ứng': ['Phúc ứng', 'Phúc Ứng'], 'phước hòa': ['Phước Hoà', 'Phước Hòa'], 'sơn hóa': ['Sơn Hoá', 'Sơn Hóa'], 'tạ an khương đông': ['Tạ An Khương  Đông', 'Tạ An Khương Đông'], 'tạ an khương nam': ['Tạ An Khương  Nam', 'Tạ An Khương Nam'], 'tăng hòa': ['Tăng Hoà', 'Tăng Hòa'], 'tân hòa': ['Tân Hoà', 'Tân Hòa'], 'tân hòa thành': ['Tân Hòa  Thành', 'Tân Hòa Thành'], 'tân khánh trung': ['Tân  Khánh Trung', 'Tân Khánh Trung'], 'tân lợi': ['Tân lợi', 'Tân Lợi'], 'thái hòa': ['Thái Hoà', 'Thái Hòa'], 'thiết ống': ['Thiết ống', 'Thiết Ống'], 'thuận hòa': ['Thuận Hoà', 'Thuận Hòa'], 'thượng ấm': ['Thượng ấm', 'Thượng Ấm'], 'thụy hương': ['Thuỵ Hương', 'Thụy Hương'], 'thủy xuân': ['Thuỷ Xuân', 'Thủy Xuân'], 'tịnh ấn đông': ['Tịnh ấn Đông', 'Tịnh Ấn Đông'], 'tịnh ấn tây': ['Tịnh ấn Tây', 'Tịnh Ấn Tây'], 'triệu ái': ['Triệu ái', 'Triệu Ái'], 'triệu ẩu': ['Triệu ẩu', 'Triệu Ẩu'], 'trung hòa': ['Trung Hoà', 'Trung Hòa'], 'trung ý': ['Trung ý', 'Trung Ý'], 'tùng ảnh': ['Tùng ảnh', 'Tùng Ảnh'], 'úc kỳ': ['úc Kỳ', 'Úc Kỳ'], 'ứng hòe': ['ứng Hoè', 'Ứng Hoè'], 'vĩnh hòa': ['Vĩnh Hoà', 'Vĩnh Hòa'], 'vũ hòa': ['Vũ Hoà', 'Vũ Hòa'], 'xuân ái': ['Xuân ái', 'Xuân Ái'], 'xuân áng': ['Xuân áng', 'Xuân Áng'], 'xuân hòa': ['Xuân Hoà', 'Xuân Hòa'], 'xuất hóa': ['Xuất Hoá', 'Xuất Hóa'], 'ỷ la': ['ỷ La', 'Ỷ La']}
groups_ward.update({1: ['1', '01'], 2: ['2', '02'], 3: ['3', '03'], 4: ['4', '04'], 5: ['5', '05'], 6: ['6', '06'], 7: ['7', '07'], 8: ['8', '08'], 9: ['9', '09']})
def to_same(groups):
    same = {ele: k for k, v in groups.items() for ele in v}
    return same
same_province = to_same(groups_province)
same_district = to_same(groups_district)
same_ward = to_same(groups_ward)
def normalize(text, same_dict):
    return same_dict.get(text, text)

In [8]:

TEAM_NAME = 'HK251'
EXCEL_FILE = f'{TEAM_NAME}.xlsx'

import json
import time
with open('/content/drive/MyDrive/Test/test.json') as f:
    data = json.load(f)

summary_only = False

df = []
solution = Solution()
timer = []
correct = 0
for test_idx, data_point in enumerate(data):
    address = data_point["text"]

    ok = 0
    try:
        answer = data_point["result"]
        answer["province_normalized"] = normalize(answer["province"], same_province)
        answer["district_normalized"] = normalize(answer["district"], same_district)
        answer["ward_normalized"] = normalize(answer["ward"], same_ward)

        start = time.perf_counter_ns()
        result = solution.process(address)
        finish = time.perf_counter_ns()
        timer.append(finish - start)
        result["province_normalized"] = normalize(result["province"], same_province)
        result["district_normalized"] = normalize(result["district"], same_district)
        result["ward_normalized"] = normalize(result["ward"], same_ward)

        province_correct = int(answer["province_normalized"] == result["province_normalized"])
        district_correct = int(answer["district_normalized"] == result["district_normalized"])
        ward_correct = int(answer["ward_normalized"] == result["ward_normalized"])
        ok = province_correct + district_correct + ward_correct

        if not summary_only and ok !=3:
            print(f"--- Trường hợp #{test_idx+1} ---")
            print(f"Input    : '{address}'")
            print(f"Expected : {answer}")
            print(f"Actual   : {result}")
            print(f"Correct  : {ok}/3")
            print(f"Time     : {timer[-1] / 1_000_000_000:.6f}s")
            print("-" * 30)
        # ----------------------------------------------------

        df.append([
            test_idx,
            address,
            answer["province"],
            result["province"],
            answer["province_normalized"],
            result["province_normalized"],
            province_correct,
            answer["district"],
            result["district"],
            answer["district_normalized"],
            result["district_normalized"],
            district_correct,
            answer["ward"],
            result["ward"],
            answer["ward_normalized"],
            result["ward_normalized"],
            ward_correct,
            ok,
            timer[-1] / 1_000_000_000,
        ])
    except Exception as e:
        print(f"Lỗi tại trường hợp #{test_idx+1} với input: '{address}'")
        print(f"Exception: {e}")
        df.append([
            test_idx,
            address,
            answer.get("province", "N/A"), "EXCEPTION", answer.get("province_normalized", "N/A"), "EXCEPTION", 0,
            answer.get("district", "N/A"), "EXCEPTION", answer.get("district_normalized", "N/A"), "EXCEPTION", 0,
            answer.get("ward", "N/A"), "EXCEPTION", answer.get("ward_normalized", "N/A"), "EXCEPTION", 0,
            0, 0,
        ])
        pass
    correct += ok


print(f"--- TỔNG KẾT ---")
total = len(data) * 3
score_scale_10 = round(correct / total * 10, 2)
if len(timer) == 0:
    timer = [0]
max_time_sec = round(max(timer) / 1_000_000_000, 4)
avg_time_sec = round((sum(timer) / len(timer)) / 1_000_000_000, 4)

import pandas as pd

df2 = pd.DataFrame(
    [[correct, total, score_scale_10, max_time_sec, avg_time_sec]],
    columns=['correct', 'total', 'score / 10', 'max_time_sec', 'avg_time_sec',],
)


columns = [
    'ID', 'text',
    'province', 'province_student', 'province_normalized', 'province_student_normalized', 'province_correct',
    'district', 'district_student', 'district_normalized', 'district_student_normalized', 'district_correct',
    'ward', 'ward_student', 'ward_normalized', 'ward_student_normalized', 'ward_correct',
    'total_correct', 'time_sec',
]

df = pd.DataFrame(df)
df.columns = columns

print(f'{TEAM_NAME = }')
print(f'{EXCEL_FILE = }')
print(df2)

!pip install -q xlsxwriter
writer = pd.ExcelWriter(EXCEL_FILE, engine='xlsxwriter')
df2.to_excel(writer, index=False, sheet_name='summary')
df.to_excel(writer, index=False, sheet_name='details')
writer.close()

print(f"\nĐã xuất kết quả chi tiết ra file {EXCEL_FILE}")

Initializing Solution...
Solution initialized in 1.74s.
--- Trường hợp #1 ---
Input    : 'TT Tân Bình Huyện Yên Sơn, Tuyên Quang'
Expected : {'province': 'Tuyên Quang', 'district': 'Yên Sơn', 'ward': 'Tân Bình', 'province_normalized': 'Tuyên Quang', 'district_normalized': 'Yên Sơn', 'ward_normalized': 'Tân Bình'}
Actual   : {'province': 'Tuyên Quang', 'district': 'Yên Sơn', 'ward': '', 'province_normalized': 'Tuyên Quang', 'district_normalized': 'Yên Sơn', 'ward_normalized': ''}
Correct  : 2/3
Time     : 0.000796s
------------------------------
--- Trường hợp #2 ---
Input    : '357/28,Ng-T- Thuật,P1,Q3,TP.HồChíMinh.'
Expected : {'province': 'Hồ Chí Minh', 'district': '', 'ward': '', 'province_normalized': 'Hồ Chí Minh', 'district_normalized': '', 'ward_normalized': ''}
Actual   : {'province': 'Bình Định', 'district': '', 'ward': '', 'province_normalized': 'Bình Định', 'district_normalized': '', 'ward_normalized': ''}
Correct  : 2/3
Time     : 0.000942s
------------------------------
--