In [1]:
import geopandas as gpd
import json
import pandas as pd
from typing import Literal
import re
from shapely import Point

HOUSENUMBER_PATTERN = r'[０-９之臨]+號'
POI_COLUMNS = ['Name', 'Pure', 'Informal', 'Street', 'Lane', 'Alley', 'Housenumber', 'X', 'Y', 'Category']

In [2]:
df = pd.read_csv('../data/demand/registration.csv',
                 dtype={
                     '行業代號': str,
                     '行業代號1': str,
                     '行業代號2': str,
                     '行業代號3': str
                 },
                 encoding='utf-8',
                 usecols=['營業地址', '營業人名稱', '行業代號', '行業代號1', '行業代號2', '行業代號3'])
df = df.loc[df['營業地址'].str.contains('臺北市')].reset_index(drop=True).rename(columns={
    '營業地址': 'Address',
    '營業人名稱': 'Name',
    '行業代號': 'Code_1',
    '行業代號1': 'Code_2',
    '行業代號2': 'Code_3',
    '行業代號3': 'Code_4',
})
df['Codes'] = df[['Code_1', 'Code_2', 'Code_3', 'Code_4']].apply(
    lambda row: [x for x in row if pd.notna(x)],
    axis=1
)
all_codes = pd.concat([df['Code_1'], df['Code_2'], df['Code_3'], df['Code_4']])
df = df.drop(['Code_1', 'Code_2', 'Code_3', 'Code_4'], axis=1)

# Drop the records whose codes are not required
def drop_invalid_records(registration: pd.DataFrame) -> pd.DataFrame:
    valid_codes = pd.read_csv('../data/demand/mof-to-simple-cats.csv', #'../data/demand/mof-to-categories.csv'
                              dtype={'Prefix': str},
                              encoding='utf-8',
                              usecols=['Prefix', 'Code'])
    refs = valid_codes.set_index('Prefix')['Code'].to_dict()
    registration['Category'] = registration['Codes'].apply(lambda x: get_category(refs, x))
    registration = registration[registration['Category'] != '']
    return registration.reset_index(drop=True)
    
def get_category(references: dict, codes: list[str]):
    for code in codes:
        for key, value in references.items():
            if code.startswith(key):
                return value
            
    return ''

df = drop_invalid_records(df).drop(['Codes'], axis=1)
display(df.head(10))
print(f'{len(df)} rows x {len(df.columns)} cols')

Unnamed: 0,Address,Name,Category
0,臺北市中山區一江街３８之１號１樓,一江鎖印店,零售
1,臺北市中山區下埤里五常街８９號１樓及龍江路３６７號,凱迪亞餐廳,餐飲
2,臺北市中山區下埤里五常街９１號２樓,得商有限公司,批發
3,臺北市中山區下埤里五常街９７號１樓,滴二石文創有限公司,餐飲
4,臺北市中山區下埤里五常街９７號１樓,碧達拉有限公司,批發
5,臺北市中山區下埤里五常街９９號１樓,大馨花藝工作室,零售
6,臺北市中山區下埤里復興北路４２０號１１樓,嘉橙有限公司,批發
7,臺北市中山區下埤里復興北路４２０號１１樓,萬安國際股份有限公司,批發
8,臺北市中山區下埤里復興北路４２０號１２樓,洪裕國際股份有限公司,批發
9,臺北市中山區下埤里復興北路４２０號１３樓,旺普電通股份有限公司,批發


142858 rows x 3 cols


In [3]:
reg = pd.DataFrame(df)

def sanitize_address(address: pd.Series) -> pd.DataFrame:
    df = pd.DataFrame(columns=['Original', 'Pure', 'Informal'])
    df['Original'] = address
    add = (address
                  # Remove the city name
                  .str.replace(r'(?:台|臺)北市', '', regex=True)

                  # Remove the district name
                  .str.replace(r'(?:北投|士林|大同|中山|萬華|中正|松山|大安|信義|內湖|南港|文山|龍山|延平|古亭|建成|城中|雙園|景美|木柵)區', '', regex=True)  

                  # Remove the village name  
                  .str.replace(r'\w\w里', '', regex=True)

                  # Re-format the scetion name
                  .str.replace('１段', '一段')
                  .str.replace('２段', '二段')
                  .str.replace('３段', '三段')
                  .str.replace('４段', '四段')
                  .str.replace('５段', '五段')
                  .str.replace('６段', '六段')
                  .str.replace('７段', '七段')
                  .str.replace('８段', '八段')
                  .str.replace('９段', '九段')  # Yanping North Rd. has the most sections in Taipei City, up to nine sections

                  # Remove the neighborhood name
                  .str.replace('０', '0')
                  .str.replace('１', '1')
                  .str.replace('２', '2')
                  .str.replace('３', '3')
                  .str.replace('４', '4')
                  .str.replace('５', '5')
                  .str.replace('６', '6')
                  .str.replace('７', '7')
                  .str.replace('８', '8')
                  .str.replace('９', '9')
                  .str.replace(r'\d+鄰', '', regex=True)

                  # Revert the conversion of full-width numbers
                  .str.replace('0', '０')
                  .str.replace('1', '１')
                  .str.replace('2', '２')
                  .str.replace('3', '３')
                  .str.replace('4', '４')
                  .str.replace('5', '５')
                  .str.replace('6', '６')
                  .str.replace('7', '７')
                  .str.replace('8', '８')
                  .str.replace('9', '９')
          )
    
    df['Pure'] = add

    # The addresses that need special care
    df['Informal'] = df['Original'].str.contains(r'（|）|\(|\)|\.|﹒|、|,|，|;|；|\-|―|－|\~|～|及|(?<!安|中|雲|萬|民|雙)和(?!平|豐|興)|至(?!善|誠)', regex=True) | (df['Pure'] == '')
    return df

reg_address = sanitize_address(reg['Address'])
display(reg_address.head(10))

Unnamed: 0,Original,Pure,Informal
0,臺北市中山區一江街３８之１號１樓,一江街３８之１號１樓,False
1,臺北市中山區下埤里五常街８９號１樓及龍江路３６７號,五常街８９號１樓及龍江路３６７號,True
2,臺北市中山區下埤里五常街９１號２樓,五常街９１號２樓,False
3,臺北市中山區下埤里五常街９７號１樓,五常街９７號１樓,False
4,臺北市中山區下埤里五常街９７號１樓,五常街９７號１樓,False
5,臺北市中山區下埤里五常街９９號１樓,五常街９９號１樓,False
6,臺北市中山區下埤里復興北路４２０號１１樓,復興北路４２０號１１樓,False
7,臺北市中山區下埤里復興北路４２０號１１樓,復興北路４２０號１１樓,False
8,臺北市中山區下埤里復興北路４２０號１２樓,復興北路４２０號１２樓,False
9,臺北市中山區下埤里復興北路４２０號１３樓,復興北路４２０號１３樓,False


In [4]:
# Create the hierachy structure of the addresses
def build_hierarchy(address_df: pd.DataFrame) -> dict | Literal['']:
    def clean_value(val: str) -> str:
        return '' if pd.isna(val) else val
    
    def collapse(d):
        if isinstance(d, dict):
            if not d:
                return ''
            return {k: collapse(v) for k, v in d.items()}
        return d
    
    def get_housenumber(val: str) -> str:
        match = re.match(HOUSENUMBER_PATTERN, str(val))
        return match[0] if match is not None else str(val)

    nested = {}

    for _, row in address_df.iterrows():
        street = clean_value(row['Street'])
        lane   = clean_value(row['Lane'])
        alley  = clean_value(row['Alley'])
        housenumber = get_housenumber(row['Housenumber'])
        coord  = [float(row['X']), float(row['Y'])]

        if street not in nested:
            nested[street] = {}

        if lane:
            if lane not in nested[street]:
                nested[street][lane] = {}

        if lane and alley:
            if alley not in nested[street][lane]:
                nested[street][lane][alley] = {}

        if lane and alley:
            nested[street][lane][alley].setdefault('numbers', dict())[housenumber] = coord

        elif lane:
            nested[street][lane].setdefault('numbers', dict())[housenumber] = coord
            
        else:
            nested[street].setdefault('numbers', dict())[housenumber] = coord

    return collapse(nested)

address = (pd.read_csv('../data/demand/address.csv', usecols=['街路段', '巷', '弄', '號', '橫座標', '縱座標'])
             .rename(columns={'街路段': 'Street', '巷': 'Lane', '弄': 'Alley', '號': 'Housenumber', '橫座標': 'X', '縱座標': 'Y'}))

hierarchy = build_hierarchy(address)

# Save to the intermediate file
with open('../data/demand/address-hierarchy.json', 'w', encoding='utf-8') as f:
    json.dump(hierarchy, f, ensure_ascii=False)

del address

In [4]:
# Load the hierarchy file
with open('../data/demand/address-hierarchy.json', 'r', encoding='utf-8') as f:
    hierarchy = json.load(f)

# Perform the address match
def match_address(hierarchy: dict, address_str: str):
    address_str_copy = address_str
    street_str, lane_str, alley_str, housenumber_str = None, None, None, None
    coord: list[float] | None = None
    isValid, hasLane, hasAlley = False, False, False

    for street in hierarchy.keys():
        if re.search(street, address_str_copy):
            street_str = street
            address_str_copy = re.sub(street, '', address_str_copy)
            isValid = True
            break

    if isValid and isinstance(hierarchy.get(street_str), dict):
        for lane in hierarchy[street_str].keys():
            if lane and re.search(lane, address_str_copy):
                lane_str = lane
                address_str_copy = re.sub(lane, '', address_str_copy)
                hasLane = True
                break

    if hasLane and isinstance(hierarchy[street_str].get(lane_str), dict):
        for alley in hierarchy[street_str][lane_str].keys():
            if alley and re.search(alley, address_str_copy):
                alley_str = alley
                address_str_copy = re.sub(alley, '', address_str_copy)
                hasAlley = True
                break

    if isValid:
        housenumber_match = re.match(HOUSENUMBER_PATTERN, address_str_copy)
        if housenumber_match is not None:
            housenumber_str = housenumber_match[0]
        
    if housenumber_str is not None:
        if hasAlley:
            coord_dict: dict = hierarchy[street_str][lane_str][alley_str]

        elif hasLane:
            coord_dict: dict = hierarchy[street_str][lane_str]

        else:
            coord_dict: dict = hierarchy[street_str]

        number_dict = coord_dict.get('numbers', {})
        coord = number_dict.get(housenumber_str, None)

    return {
        'Street': street_str,
        'Lane': lane_str,
        'Alley': alley_str,
        'Housenumber': housenumber_str,
        'X': None if coord is None else coord[0],
        'Y': None if coord is None else coord[1]
    }

match_result = pd.DataFrame(reg_address['Pure'].apply(lambda addr: match_address(hierarchy, addr)).tolist())
match_locations = pd.concat([reg[['Name', 'Category']], reg_address[['Pure', 'Informal']], match_result], axis=1)
match_count = match_locations['X'].isna().value_counts().iloc[0]
print(f'Records with match: {match_count} ({round(match_count / len(reg) * 100)}%)')
print(f'Records without match: {len(reg) - match_count} ({round((len(reg) - match_count) / len(reg) * 100)}%)')
reg = match_locations[POI_COLUMNS].rename(columns={'Pure': 'Address'})

# Export match results.
valid_filter = ~reg['Informal'] & ~reg['X'].isna()
tmp = reg[valid_filter][['Name', 'Address', 'Category', 'X', 'Y']].reset_index(drop=True)
tmp['geometry'] = tmp.agg(lambda x: Point(float(x['X']), float(x['Y'])), axis=1)
tmp = gpd.GeoDataFrame(tmp[['Name', 'Address', 'Category', 'geometry']], crs=3826, geometry='geometry')
tmp.to_file('../data/demand/poi_mof.geojson', index=False)
display(tmp.head(10))

del match_count, match_locations, match_result, reg_address, valid_filter

Records with match: 126741 (89%)
Records without match: 16117 (11%)


Unnamed: 0,Name,Address,Category,geometry
0,一江鎖印店,一江街３８之１號１樓,零售,POINT (303619.526 2771844.873)
1,得商有限公司,五常街９１號２樓,批發,POINT (304602.2 2773024.951)
2,滴二石文創有限公司,五常街９７號１樓,餐飲,POINT (304623.681 2773024.845)
3,碧達拉有限公司,五常街９７號１樓,批發,POINT (304623.681 2773024.845)
4,大馨花藝工作室,五常街９９號１樓,零售,POINT (304637.096 2773019.919)
5,嘉橙有限公司,復興北路４２０號１１樓,批發,POINT (304867.453 2772823.414)
6,萬安國際股份有限公司,復興北路４２０號１１樓,批發,POINT (304867.453 2772823.414)
7,洪裕國際股份有限公司,復興北路４２０號１２樓,批發,POINT (304867.453 2772823.414)
8,旺普電通股份有限公司,復興北路４２０號１３樓,批發,POINT (304867.453 2772823.414)
9,旺普網路資訊股份有限公司,復興北路４２０號１３樓,批發,POINT (304867.453 2772823.414)


In [None]:
# @dataclass
# class occurance:
#     exist: bool
#     positions: list[int]

#     def __repr__(self):
#         return f'{self.exist} ({len(self.positions)})'
    
#     def __str__(self):
#         return self.__repr__()

# def get_occurances(text: str, sub: str) -> list[int]:
#     res = list()
#     pos = 0
#     while (pos >= 0) and (pos < len(text)):
#         pos = text.find(sub, pos)
#         if pos >= 0:
#             res.append(pos)
#         elif pos == -1:
#             break
#         pos += 1
#     return res

# def is_complicated_address(address: str) -> tuple[bool, str]:
#     number_occ = get_occurances(address, '號')
#     number_pos = 0

#     def is_before_number(add: str, text: str, pos: int) -> occurance:
#         occs = get_occurances(add, text)
#         flag = False
#         if len(occs) == 0:
#             return occurance(flag, occs)
#         else:
#             for char_pos in occs:
#                 if char_pos <= pos:
#                     flag = True
#                     break
        
#         return occurance(flag, occs)

#     # If there are multiple address, only take the first address.
#     if len(number_occ) >= 1:
#         number_pos = number_occ[0]
    
#     # If there are no '號' in the address, remove it.
#     else:
#         return True, ''
    
#     # Replace all '-' before '號' with '之'.
#     add = address[:number_pos + 1]
#     add = add.replace('-', '之').replace('―', '之').replace('－', '之')

#     # Check whether the seperator characters present before the '號' character
#     base_regex = '(.*?(?:大道|路|街|段|巷|弄))([０-９臨之]+)(?={})'

#     ideo_comma = is_before_number(add, '、', number_pos)    # U+3001
#     if ideo_comma.exist:
#         ideo_comma_match = re.match(base_regex.format('、'), add)
#         if ideo_comma_match is not None:
#             add = f'{ideo_comma_match.group(1)}{ideo_comma_match.group(2)}號'
#         else:
#             return True, ''
    
#     ideo_dot = is_before_number(add, '丶', number_pos)  # U+4E36
#     if ideo_dot.exist:
#         ideo_dot_match = re.match(base_regex.format('丶'), add)
#         if ideo_dot_match is not None:
#             add = f'{ideo_dot_match.group(1)}{ideo_dot_match.group(2)}號'
#         else:
#             return True, ''

#     and_char = is_before_number(add, '及', number_pos)  # U+53CA
#     if and_char.exist:
#         and_char_match = re.search(base_regex.format('及'), add)
#         if and_char_match is not None:
#             add = f'{and_char_match.group(1)}{and_char_match.group(2)}號'
#         else:
#             return True, ''
        
#     full_comma = is_before_number(add, '，', number_pos)  # U+FF0C
#     if full_comma.exist:
#         full_comma_match = re.search(base_regex.format('，'), add)
#         if full_comma_match is not None:
#             add = f'{full_comma_match.group(1)}{full_comma_match.group(2)}號'
#         else:
#             return True, ''
        
#     full_f_stop = is_before_number(add, '．', number_pos)  # U+FF0E
#     if full_f_stop.exist:
#         full_f_stop_match = re.search(base_regex.format('．'), add)
#         if full_f_stop_match is not None:
#             add = f'{full_f_stop_match.group(1)}{full_f_stop_match.group(2)}號'
#         else:
#             return True, ''
        
#     full_semi = is_before_number(add, '；', number_pos)  # U+FF1B
#     if full_semi.exist:
#         full_semi_match = re.search(base_regex.format('；'), add)
#         if full_semi_match is not None:
#             add = f'{full_semi_match.group(1)}{full_semi_match.group(2)}號'
#         else:
#             return True, ''
    
#     return False, add

# n = second_batch_original.apply(lambda x: is_complicated_address(x['Address']), axis=1, result_type='expand')
# display(pd.concat([second_batch_original[['Address']], n], axis=1))
