In [None]:
import re

import pandas as pd
from tqdm import tqdm

In [None]:
import sys
from pathlib import Path
project_root = Path.cwd().parent  # 找出根目錄：Path.cwd()找出現在所在目錄(/run).parent(上一層是notebook).parent(再上層一層business_district_discovery)
print(project_root)
sys.path.append(str(project_root))

In [None]:
from utils.addr_parser import AddressParser
from utils.addr_splitter import AddressSplitter

from utils.id_category import MediumCategory, Subcategory, DetailedSubcategory

In [None]:
# 讀取儲存的原始資料
def csv_extractor(file_name: str) -> pd.DataFrame:
    try:
        total_rows = sum(1 for _ in open(file_name, 'r', encoding='utf8'))
        print(f'total_rows: {total_rows}')
        
        df_list = []
        with tqdm(total=total_rows, desc="Extracting rows") as pbar:
            for chunk in pd.read_csv(file_name, encoding='utf8', chunksize=10000):
                df_list.append(chunk)
                pbar.update(len(chunk))
        
        df = pd.concat(df_list, ignore_index=True)
        print('\nExtracting finished')
        return df
    except FileNotFoundError:
        print(f'Error: The file {file_name} was not found.')
    except Exception as e:
        print(f'An error occurred: {e}')
    return pd.DataFrame()

In [None]:
#新增及轉換欄位   
def convert_columns( df: pd.DataFrame) -> pd.DataFrame:
    # 新增「縣市」欄位
    # 假設前3個字為縣市，只要其中含有「市」或「縣」
    df['縣市'] = df['營業地址'].apply(
        lambda x: x[:3] if ('市' in x[:3] or '縣' in x[:3]) else None
    )
    

    # 新增行政區欄位
    def extract_district(address):
        """
        從地址中提取行政區
        args:
            address(str): 完整地址

        returns:
            str: 行政區名稱，若無匹配則回傳none
        """
        if not address:
            return None
        
        # 縣市列表
        counties = [
        '台北市', '臺北市', '新北市', '桃園市', '台中市', '臺中市', 
        '台南市', '臺南市', '高雄市', '基隆市', '新竹市', '嘉義市',
        '新竹縣', '苗栗縣', '彰化縣', '南投縣', '雲林縣', '嘉義縣',
        '屏東縣', '宜蘭縣', '花蓮縣', '台東縣', '臺東縣', '澎湖縣',
        '金門縣', '連江縣'
        ]

        # 移除縣市名稱
        remaining = address
        for county in counties:
            if address.startswith(county):
                remaining = address[len(county): ]
                break

        # 使用正則表達式匹配行政區
        # 考慮多可能的模式
        patterns = [
            r'^(.{1,3}[鄉鎮市區])',
            r'^(.{1,2}[鄉鎮市區])'
        ]

        for pattern in patterns:
            match = re.match(pattern, remaining)
            if match:
                district = match.group(1)
                
                # 移除所有空格（全形和半形）和特殊字元
                district = district.replace(' ', '')  # 半形空格
                district = district.replace('　', '')  # 全形空格
                district = district.replace('\t', '')  # Tab
                district = district.replace('\n', '')  # 換行
                district = district.replace('\r', '')  # 回車

                return district

        return None

        
    df['行政區'] = df['營業地址'].apply(extract_district)

    # 轉換格式
    
    df['總機構統一編號'] = pd.to_numeric(df['總機構統一編號']).astype('Int64')  # 欄位中有空值
    df['資本額'] = pd.to_numeric(df['資本額']).astype('Int64')
    df['組織別名稱'] = pd.Categorical(df['組織別名稱'])
    df['使用統一發票'] = pd.Categorical(df['使用統一發票'])
    df['名稱'] = pd.Categorical(df['名稱'])

    # 先轉換為數值
    df['行業代號'] = pd.to_numeric(df['行業代號'], errors='coerce')
    df['統一編號'] = pd.to_numeric(df['統一編號'], errors='coerce')
    # 補0到6/8位數
    df['行業代號'] = df['行業代號'].apply(lambda x: str(int(x)).zfill(6) if pd.notna(x) else x)
    df['統一編號'] = df['統一編號'].apply(lambda x: str(int(x)).zfill(8) if pd.notna(x) else x)
    # 轉為 Categorical
    df['行業代號'] = pd.Categorical(df['行業代號'])
    df['統一編號'] = pd.Categorical(df['統一編號'])

    # 「設立日期」欄位
    def parse_roc_date(val):
        """
        將民國年月日數值（如 1040413.0、400711.0、1130503.0）轉成西元日期。
        - 自動處理 float / int / str 類型
        - 不合法值轉為 NaT
        """
        if pd.isna(val):
            return pd.NaT
        try:
            # 先轉字串並補零到 7 碼（例如 400711 -> '0400711'）
            s = str(int(val)).zfill(7)
            y, m, d = int(s[:3]) + 1911, int(s[3:5]), int(s[5:7])
            return pd.Timestamp(f"{y}-{m:02d}-{d:02d}")
        except Exception:
            return pd.NaT
        
    df['設立日期'] = df['設立日期'].apply(parse_roc_date)
    df['設立季度'] = df['設立日期'].dt.to_period('Q')

    
    return df

In [None]:
# 全國營業(稅籍)登記資料集原始資料
csv_path = r"C:\labs\geo-grid\data\raw\National_Business_Registration_Dataset_202510.csv"
extracted =csv_extractor(csv_path)
print(f" 逐筆交易資料載入成功: {extracted.shape}  記憶體使用: {extracted.memory_usage(deep=True).sum() / 1024**2:.2f} MB")

In [None]:
df = extracted.loc[extracted['統一編號'].notna(),
    ['營業地址', '統一編號', '總機構統一編號', '營業人名稱', '資本額', '設立日期', '組織別名稱', '使用統一發票', '行業代號', '名稱']
].copy()

In [None]:
df = convert_columns(df)

In [None]:
# 全形轉半形
def fullwidth_to_halfwidth(text: str) -> str:
    """
    將文字中的全形字元轉為半形，並清理非法字元與空白。
    適用於欄位如「營業地址」「公司名稱」等。

    處理內容：
    1. 全形空白（12288）→ 半形空白（32）
    2. 一般全形字元（65281–65374）→ 對應半形
    3. 移除非法 Unicode 字元
    4. 將「―」替換為 "-"
    5. 移除多餘空白
    """
    if pd.isna(text):
        return text

    # 全形轉半形
    converted = ''.join(
        chr(32) if ord(uchar) == 12288 else
        chr(ord(uchar) - 65248) if 65281 <= ord(uchar) <= 65374 else
        uchar
        for uchar in text
    )

    # 處理特殊字元和格式
    converted = re.sub(r'[^\u0000-\uFFFF]', '', converted)  # 移除非法 Unicode
    converted = converted.replace("―", "-")  # 處理破折號
    converted = re.sub(r'\s+', '', converted)  # 移除所有空白（含全形空格）

    return converted

In [None]:
df["營業地址"] = df["營業地址"].apply(fullwidth_to_halfwidth)

In [None]:
splitter = AddressSplitter(
    input_column='營業地址',
    output_column='正規化營業地址',
    status_column='地址狀態',
    delimiters=None,             # 使用預設分隔符
    batch_size=10000,
    print_report=True,
    preprocessor=None          # 預設不做任何前處理
)

df_proc, changes = splitter.process(df)

In [None]:
parser = AddressParser()
df_proc = parser.process(df_proc)

In [None]:
corrections = {
    '36857915': ('嘉義市西區培元里仁愛路316號1樓', '西區'),
    '93938137': ('嘉義市西區磚里中興路626號1樓', '西區'),
    '47911319': ('屏東縣東港鎮東隆街76號', '東港鎮'),
    '60054638': ('新北市中和區忠孝街124巷8號7樓', '中和區'),
    '60242576': ('新北市中和區捷運路133號1樓', '中和區'),
    '46582802': ('新竹市香山區元培街226號', '香山區'),
    '47319843': ('新竹市香山區埔前里牛埔南路127號', '香山區'),
    '02876359': ('新竹市香山區大庄里5鄰大庄路237巷22號1樓', '香山區'),
    '53279073': ('新竹市東區志平路111號4樓', '東區'),
    '53805842': ('新竹市北區成德路271號', '北區'),
    '23521565': ('新竹市香山區牛埔北路257巷20號', '香山區'),
    '24431057': ('新竹市香山區牛埔南路61號', '香山區'),
    '25025936': ('新竹市北區石坊街1號', '北區'),
    '22702672': ('新竹市香山區茄苳里7鄰茄苳北街835巷16號', '香山區'),
    '26157694': ('新竹市香山區莊敬街49巷8號1樓', '香山區'),
    '27809890': ('新竹市香山區香北路240號3樓', '香山區'),
    '10586261': ('新竹市香山區香山里2鄰瑞光街13號', '香山區'),
    '18210268': ('新竹市香山區2鄰牛埔北路37巷63號', '香山區'),
    '13878328': ('新竹市香山區9鄰元培街179號', '香山區'),
    '88316911': ('臺北市南港區向陽路246號', '南港區'),
    '24573007': ('臺北市南港區重陽路203巷29號二樓', '南港區'),
    '53750375': ('臺北市南港區重陽路203巷29號二樓', '南港區'),
    '95465339': ('臺北市南港區重陽里重陽路125巷16號6樓', '南港區'),
    '43909164': ('臺北市南港區重陽里重陽路203巷29號七樓', '南港區'),
    '44043132': ('臺北市南港區重陽里重陽路203巷29號九樓', '南港區'),
    '50761362': ('臺北市南港區重陽里重陽路203巷29號九樓', '南港區'),
    '24932691': ('臺北市南港區重陽里重陽路223號4樓', '南港區'),
    '94212127': ('臺北市南港區重陽里重陽路263巷3號3樓', '南港區'),
    '60515415': ('臺北市南港區重陽里重陽路269號六樓', '南港區'),
    '88466755': ('雲林縣斗六市文生路120巷10號', '斗六市'),

    # 原 correction_adds
    '20973179': ('臺北市信義區松光里松山路615巷1號', '信義區'),
    '20974178': ('臺北市信義區松光里松山路615巷1號', '信義區'),
}

for uid, (address, district) in corrections.items():
    df_proc.loc[
        df_proc['統一編號'] == uid,
        ['正規化營業地址', '行政區']
    ] = [address, district]


In [None]:

def split_address_and_floor(address):
    """
    將地址拆分成正規化地址（不含樓層）和樓層兩個部分
    
    Parameters:
    address: 原始地址字串
    
    Returns:
    tuple: (正規化地址, 樓層)
    """
    if pd.isna(address) or not address:
        return (address, None)
    
    address = str(address).strip()
    
    # 轉換全形數字為半形
    address = address.translate(str.maketrans('０１２３４５６７８９', '0123456789'))
    
    # 先移除括號內容（如：前鎮第一臨時市場(D41號）
    address_clean = re.sub(r'[\(（〈\[《【][^)\]】》〉）]*[\)）〉\]》】]', '', address)

    # === 新增特殊處理 ===
    # 處理「號之X11樓」這種格式（X是個位數，11是樓層）
    special_pattern = re.search(r'(\d+號之\d)(\d{1,2}樓)', address_clean)
    if special_pattern:
        # 檢查是否為「號之5」+「11樓」的格式
        house_with_suffix = special_pattern.group(1)  # 例如：15號之5
        floor = special_pattern.group(2)               # 例如：11樓
        
        # 取得號之前的部分
        before_match = address_clean[:special_pattern.start()]
        
        # 組合正規化地址和樓層
        normalized_address = before_match + house_with_suffix
        return (normalized_address, floor)
    
    # 定義各種樓層模式（順序很重要，複雜的放前面）
    floor_patterns = [
        (r'(地下街[A-Z]\d+號?)', r'地下街[A-Z]\d+號?'),      # 地下街B70號
        (r'(地下街\d+號?)', r'地下街\d+號?'),                # 地下街70號


        (r'(\d+之\d+樓)', r'\d+之\d+樓'),                    # 4之3樓
        (r'(\d+樓之\d+)', r'\d+樓之\d+'),                    # 4樓之3
        (r'(\d+樓\-\d+)', r'\d+樓\-\d+'),                    # 4樓-3
        (r'([B地下]\d+樓)', r'[B地下]\d+樓'),                # B1樓
        (r'(\d+樓)', r'\d+樓'),                              # 4樓
        (r'([一二三四五六七八九十壹貳參肆伍陸柒捌玖拾百]+之\d+樓)', 
         r'[一二三四五六七八九十壹貳參肆伍陸柒捌玖拾百]+之\d+樓'),  # 四之3樓
        (r'([一二三四五六七八九十壹貳參肆伍陸柒捌玖拾百]+樓之\d+)', 
         r'[一二三四五六七八九十壹貳參肆伍陸柒捌玖拾百]+樓之\d+'),  # 四樓之3
        (r'([一二三四五六七八九十壹貳參肆伍陸柒捌玖拾百]+樓)', 
         r'[一二三四五六七八九十壹貳參肆伍陸柒捌玖拾百]+樓'),      # 三樓
        (r'(地下[一二三四五六七八九十]+樓)', r'地下[一二三四五六七八九十]+樓'),  # 地下一樓
        (r'(地下室)', r'地下室'),                             # 地下室
        (r'([B地下]\d+)', r'[B地下]\d+'),                     # B1（沒有樓字）
    ]
    
    # 處理地號格式（不會有樓層）
    if re.search(r'\d+地號', address_clean):
        # 地號地址，提取到地號為止
        match = re.match(r'^(.*?\d+地號)', address_clean)
        if match:
            return (match.group(1), None)
    
    # 處理一般地址格式
    # 先找到門牌號的位置
    house_no_match = re.search(r'(\d+之\d+|\d+\-\d+|\d+)號', address_clean)
    
    if house_no_match:
        base_address = address_clean[:house_no_match.end()]  # 包含號的基本地址
        after_house_no = address_clean[house_no_match.end():]  # 號之後的部分
        
        # 清理號後面可能的房間號（但不是樓層）
        after_house_no = re.sub(r'^[\s\-之]*(\d+[室房]|[A-Z]\d*室).*$', '', after_house_no)
        
        # 在號之後的部分尋找樓層
        floor_info = None
        clean_after = after_house_no
        
        for floor_capture, floor_pattern in floor_patterns:
            # 尋找樓層資訊
            floor_match = re.search(floor_capture, after_house_no)
            if floor_match:
                floor_info = floor_match.group(1)
                # 移除找到的樓層及其後的房間號
                clean_after = after_house_no[:floor_match.start()]
                # 只保留樓層資訊，移除樓層後的房間號
                remaining = after_house_no[floor_match.end():]
                if not re.match(r'^之\d+|^\-\d+', remaining):  # 不是樓之幾的格式
                    remaining = re.sub(r'^[\s\-之]*\d*[室房號A-Z].*$', '', remaining)
                break
        
        # 組合最終的正規化地址（不含樓層）
        normalized_address = base_address + clean_after.strip()
        normalized_address = normalized_address.strip().rstrip('、').rstrip('，')
        
        return (normalized_address, floor_info)
    
    # 如果沒有找到門牌號，返回原始地址
    return (address_clean, None)


def process_dataframe_addresses(df, address_column='地址', 
                               normalized_column='正規化地址', 
                               floor_column='樓層'):
    """
    處理DataFrame中的地址欄位，拆分成正規化地址和樓層
    
    Parameters:
    df: DataFrame
    address_column: 原始地址欄位名稱
    normalized_column: 正規化地址欄位名稱
    floor_column: 樓層欄位名稱
    
    Returns:
    DataFrame with new columns
    """
    # 應用拆分函數
    df[[normalized_column, floor_column]] = df[address_column].apply(
        lambda x: pd.Series(split_address_and_floor(x))
    )
    
    return df



In [None]:
df_processed = process_dataframe_addresses(
    df_proc,                    # 你的DataFrame
    address_column='正規化營業地址',  # 原始地址欄位名稱
    normalized_column='清理地址',    # 新增的正規化地址欄位名稱
    floor_column='樓層資訊'         # 新增的樓層欄位名稱
)