In [1]:
# NOTE: you CAN change this cell
# If you want to use your own database, download it here
!gdown --fuzzy https://drive.google.com/file/d/16DjApJs5CMpTh5FySV7Qok8JA95dWs94/view?usp=sharing -O file.json

Downloading...
From: https://drive.google.com/uc?id=16DjApJs5CMpTh5FySV7Qok8JA95dWs94
To: /content/file.json
  0% 0.00/2.11M [00:00<?, ?B/s]100% 2.11M/2.11M [00:00<00:00, 66.8MB/s]


In [2]:
# NOTE: you CAN change this cell
# Add more to your needs
# you must place ALL pip install here
!pip install editdistance
!pip install numpy



In [3]:
# NOTE: you CAN change this cell
# import your library here
import time
import numpy as np
import editdistance
import math
import json
import re


In [4]:
# NOTE: you MUST change this cell
# New methods / functions must be written under class Solution.
class Solution:
    def __init__(self):
        # list provice, district, ward for private test, do not change for any reason
        self.province_path = 'list_province.txt'
        self.district_path = 'list_district.txt'
        self.ward_path = 'list_ward.txt'

        # write your preprocess here, add more method if needed
        class BM25:
            def __init__(self, k1=1.5, b=0.75):
                self.k1 = k1
                self.b = b
                self.tf = []
                self.df = {}
                self.idf = {}
                self.doc_len = []
                self.corpus_size = 0
                self.avg_doc_len = 0
                self.corpus = []

            def fit(self, corpus):
                self.corpus = corpus
                self._compute_corpus_stats()
                return self

            def search(self, query):
                scores = []
                for index in range(self.corpus_size):
                    scores.append(self._score(query, index))
                return scores

            def _compute_corpus_stats(self):
                for document in self.corpus:
                    self.corpus_size += 1
                    self.doc_len.append(len(document))
                    frequencies = {}
                    for term in document:
                        term_count = frequencies.get(term, 0) + 1
                        frequencies[term] = term_count
                    self.tf.append(frequencies)
                    for term in frequencies:
                        self.df[term] = self.df.get(term, 0) + 1
                for term, freq in self.df.items():
                    self.idf[term] = math.log(1 + (self.corpus_size - freq + 0.5) / (freq + 0.5))
                self.avg_doc_len = sum(self.doc_len) / self.corpus_size

            def _score(self, query, index):
                score = 0.0
                doc_len = self.doc_len[index]
                frequencies = self.tf[index]

                for term in query:
                    if term in frequencies:
                        freq = frequencies[term]
                        numerator = self.idf[term] * freq * (self.k1 + 1)
                        denominator = freq + self.k1 * (1 - self.b + self.b * doc_len / self.avg_doc_len)
                        score += numerator / denominator

                return score

        self.k = 5
        self.threshold = 2
        self.addresses = []
        self.stop_word = ['thị trấn', 'quận', 'tỉnh', 'tp', 'xã', 'thị xã', 'thành phố', 'x ', 'thị trấn', 'q ', 'huyện', 'f', 'tt', 'tx', 'phường', ' t ', 'tnh', ' h ',',','.']
        # Load our dataset
        with open("/content/file.json") as f:
              json_data = f.read()
              data = json.loads(json_data)
              text = []
              for address in data:
                  full = json.loads(address)['full_address']
                  self.addresses.append(eval(address))

                  for rm in self.stop_word:
                     full = full.replace(rm, " ")
                  txt = []
                  for item in full.split(" "):
                     if item == "":
                        continue
                     else:
                        txt.append(item)
                  text.append(txt)

        self.bm25 = BM25()
        self.bm25.fit(text)

    def get_best(self, query,text, k=5):
        scores = self.bm25.search(query)
        scores_index = np.argsort(scores)[::-1]
        top_results = np.array([self.addresses[i] for i in scores_index][:k])

        # Calculate edit distance for each result and select the one with the smallest distance
        final_result = min(top_results, key=lambda result: editdistance.eval(result['full_address'].replace(",",""), text))
        return final_result


    def process(self, s: str):
        text, query = self.preprocess(s)
        result = self.get_best( query,text, k=5)
        result = self.postprocess(text, result)
        return result

    def detect_quan_huyen(self, text):
        quan_huyen = []
        i = 0
        while i < len(text) - 1:
            if text[i].upper() in 'QXHTP' and text[i+1] in 'BCDFGHJKLMNPQRSTVWXYZAĂÂEÊIOÔƠUƯY0123456789':
                quan_huyen.append(text[i:i+2])
                i += 2
            else:
                i += 1
        return [item for item in quan_huyen if item not in ["TP", "TT", "TX"]]

    def preprocess(self, text: str):
        t = self.detect_quan_huyen(text)
        replace_dict = {temp: f"{temp[0]} {temp[1]}" for temp in t}

        for char, replacement in replace_dict.items():
            text = text.replace(char, replacement)

        text = text.lower()

        rm = ['thị trấn', 'quận', 'tỉnh', 'tp', 'xã', 'thị xã', 'thành phố', 'x ', 'thị trấn', 'q ', 'huyện', 'f', 'tt', 'tx', 'phường', ' t ', 'tnh', ' h ',',','.']


        for r in rm:
            text = text.replace(r, " ")

        query = [item.strip() for item in text.split(" ") if item != ""]

        return text, query

    def util_edit_distance(self, text, name):
        similar_word = None
        min_distance = float('inf')

        for i in range(len(text) - len(name) + 1):
            res_string = text[i:i + len(name)]
            distance = editdistance.eval(res_string, name)

            if distance <= self.threshold and distance < min_distance:
                similar_word = res_string
                min_distance = distance

        return similar_word

    def remove_address_component(self, text, component):
        if component in text:
            return text.replace(component, "", 1)
        return text

    def postprocess(self, text: str, best_result):
        levels = ["province", "district", "ward"]
        for level in levels:
            name = best_result[level].lower()
            if name == "":
                continue

            if name in text:
                text = self.remove_address_component(text, name)
            else:
                same = self.util_edit_distance(text, name)
                if same:
                    text = self.remove_address_component(text, same)
                else:
                    best_result[level] = ""

        return best_result



In [5]:
# NOTE: DO NOT change this cell
# This cell is for downloading private test
!rm -rf test.json
!gdown --fuzzy https://drive.google.com/file/d/1PBt3U9I3EH885CDhcXspebyKI5Vw6uLB/view?usp=sharing -O test.json

Downloading...
From: https://drive.google.com/uc?id=1PBt3U9I3EH885CDhcXspebyKI5Vw6uLB
To: /content/test.json
  0% 0.00/79.4k [00:00<?, ?B/s]100% 79.4k/79.4k [00:00<00:00, 70.1MB/s]


In [6]:
# NOTE: DO NOT change this cell
# This cell is for scoring

TEAM_NAME = 'GROUP_1'  # This should be your team name
EXCEL_FILE = f'{TEAM_NAME}.xlsx'

import json
import time
with open('test.json') as f:
    data = json.load(f)

summary_only = True
df = []
solution = Solution()
timer = []
correct = 0
for test_idx, data_point in enumerate(data):
    address = data_point["text"]

    ok = 0
    try:
        start = time.perf_counter_ns()
        result = solution.process(address)
        answer = data_point["result"]
        finish = time.perf_counter_ns()
        timer.append(finish - start)
        ok += int(answer["province"] == result["province"])
        ok += int(answer["district"] == result["district"])
        ok += int(answer["ward"] == result["ward"])
        df.append([
            test_idx,
            address,
            answer["province"],
            result["province"],
            int(answer["province"] == result["province"]),
            answer["district"],
            result["district"],
            int(answer["district"] == result["district"]),
            answer["ward"],
            result["ward"],
            int(answer["ward"] == result["ward"]),
            ok,
            timer[-1] / 1_000_000_000,
        ])
    except Exception as e:
        df.append([
            test_idx,
            address,
            answer["province"],
            "EXCEPTION",
            0,
            answer["district"],
            "EXCEPTION",
            0,
            answer["ward"],
            "EXCEPTION",
            0,
            0,
            0,
        ])
        # any failure count as a zero correct
        pass
    correct += ok


    if not summary_only:
        # responsive stuff
        print(f"Test {test_idx:5d}/{len(data):5d}")
        print(f"Correct: {ok}/3")
        print(f"Time Executed: {timer[-1] / 1_000_000_000:.4f}")


print(f"-"*30)
total = len(data) * 3
score_scale_10 = round(correct / total * 10, 2)
if len(timer) == 0:
    timer = [0]
max_time_sec = round(max(timer) / 1_000_000_000, 4)
avg_time_sec = round((sum(timer) / len(timer)) / 1_000_000_000, 4)

import pandas as pd

df2 = pd.DataFrame(
    [[correct, total, score_scale_10, max_time_sec, avg_time_sec]],
    columns=['correct', 'total', 'score / 10', 'max_time_sec', 'avg_time_sec',],
)

columns = [
    'ID',
    'text',
    'province',
    'province_student',
    'province_correct',
    'district',
    'district_student',
    'district_correct',
    'ward',
    'ward_student',
    'ward_correct',
    'total_correct',
    'time_sec',
]

df = pd.DataFrame(df)
df.columns = columns

print(f'{TEAM_NAME = }')
print(f'{EXCEL_FILE = }')
print(df2)

!pip install xlsxwriter
writer = pd.ExcelWriter(EXCEL_FILE, engine='xlsxwriter')
df2.to_excel(writer, index=False, sheet_name='summary')
df.to_excel(writer, index=False, sheet_name='details')
writer.close()


------------------------------
TEAM_NAME = 'GROUP_1'
EXCEL_FILE = 'GROUP_1.xlsx'
   correct  total  score / 10  max_time_sec  avg_time_sec
0     1092   1350        8.09        0.0795         0.021
Collecting xlsxwriter
  Downloading XlsxWriter-3.2.0-py3-none-any.whl (159 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m159.9/159.9 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: xlsxwriter
Successfully installed xlsxwriter-3.2.0
