In [1]:
# pip install --upgrade google-cloud-documentai

In [2]:
# pip install --upgrade google-cloud-documentai-toolbox

In [3]:
# pip install --upgrade google-cloud-storage

In [1]:
import os

# 設置環境變數，指向服務帳戶密鑰檔案

os.environ["GOOGLE_APPLICATION_CREDENTIALS"] =  "dabamien-2228a0999974.json" #替換成你的檔案名稱 

In [2]:
PROJECT_ID =  "dabamien"  #<你的project_id>
LOCATION = "us"  # Format is "us" or "eu"
PROCESSOR_ID = "ef432a9e6acb161f" #<你建立的你建立的processor_id>
GCS_INPUT_URL = "gs://bamien/hsindian"                                   # Must end with a trailing slash `/`. Format: gs://bucket/directory/subdirectory/
GCS_OUTPUT_URL =  "gs://bamien/results" #<Google Storage上儲存結果的路徑> # Must end with a trailing slash `/`. Format: gs://bucket/directory/subdirectory/
BUCKET_NAME = "bamien"                 #Google Storage上的Bucket Name

In [None]:
# for page in document_object.pages:
#     for paragraph in page.paragraphs:
#       start_index = paragraph.layout.text_anchor.text_segments[0].start_index
#       end_index = paragraph.layout.text_anchor.text_segments[0].end_index
#       paragraph_text = document_object.text[start_index:end_index]
#       print(f"段落: {paragraph_text}")
#     print("==========")

In [3]:
import re
import pandas as pd
from datetime import datetime
from openpyxl import Workbook

def parse_document(document_object):
    # 初始化結果列表
    addresses = []
    birth_dates = []
    id_numbers = []

    # 遍歷每一頁
    for page_number, page in enumerate(document_object.pages, start=1):
        # 遍歷每一段落
        for paragraph in page.paragraphs:
            start_index = paragraph.layout.text_anchor.text_segments[0].start_index
            end_index = paragraph.layout.text_anchor.text_segments[0].end_index
            paragraph_text = document_object.text[start_index:end_index].strip()
            print(paragraph_text)
            print("================")

            # 初始化當前段落的資料
            current_data = {
                "地址": None,
                "里": None,
                "鄰": None,
                "路/街": None,
                "出生年月日": None,
                "年齡": None,
                "身分證字號": None,
                "性別": None,
                "頁碼": page_number
            }

            # 提取地址
            if "里" in paragraph_text and "鄰" in paragraph_text:
                current_data["地址"] = paragraph_text
                current_data["里"] = extract_specific_field(paragraph_text, r"([一-龥]{1,2})里")
                current_data["鄰"] = extract_specific_field(paragraph_text, r"(\d{1,2})鄰")
                current_data["路/街"] = extract_specific_field(paragraph_text, r"([一-龥]{1,3})(路|街)(?!鄰)")

            # 提取出生年月日
            birth_date = extract_birth_date(paragraph_text)
            if birth_date:
                current_data["出生年月日"] = birth_date
                current_data["年齡"] = calculate_age(birth_date)

            # 提取身分證字號
            if re.search(r"[A-Z][0-9]+", paragraph_text):
                current_data["身分證字號"] = paragraph_text
                current_data["性別"] = determine_gender(paragraph_text)

            # 如果該段落有地址，存入地址列表
            if current_data["地址"]:
                addresses.append({
                    "地址": current_data["地址"],
                    "里": current_data["里"],
                    "鄰": current_data["鄰"],
                    "路/街": current_data["路/街"],
                    "頁碼": current_data["頁碼"]
                })

            # 如果該段落有出生年月日，存入出生年月日列表
            if current_data["出生年月日"]:
                birth_dates.append({
                    "出生年月日": current_data["出生年月日"],
                    "年齡": current_data["年齡"],
                    "頁碼": current_data["頁碼"]
                })

            # 如果該段落有身分證字號，存入身分證字號列表
            if current_data["身分證字號"]:
                id_numbers.append({
                    "身分證字號": current_data["身分證字號"],
                    "性別": current_data["性別"],
                    "頁碼": current_data["頁碼"]
                })

    return addresses, birth_dates, id_numbers


def extract_names(document_object, file_name):
    """提取名字並記錄所在檔案和頁碼"""
    names = []  # 用於存儲名字及其位置

    # 遍歷每一頁
    for page_number, page in enumerate(document_object.pages, start=1):
        for paragraph in page.paragraphs:
            start_index = paragraph.layout.text_anchor.text_segments[0].start_index
            end_index = paragraph.layout.text_anchor.text_segments[0].end_index
            paragraph_text = document_object.text[start_index:end_index].strip()

            # 檢測是否是名字（假設名字是兩到三個中文字）
            if re.match(r"^[\u4e00-\u9fa5]{2,3}$", paragraph_text):  # 中文姓名
                names.append({
                    "檔案名稱": file_name,
                    "頁碼": page_number,
                    "姓名": paragraph_text
                })

    return names


def extract_specific_field(text, pattern):
    """提取地址中的特定字段（如里、鄰、路/街）"""
    match = re.search(pattern, text)
    if match:
        result = match.group(0)
        return result.replace("鄰", "")  # 確保「鄰」字被移除
    return None

def extract_birth_date(text):
    """提取生日，支持多種格式並修正錯誤年份"""
    match = re.search(r"(\d{2,3}\.\d{1,2}\.\d{1,2})|民國(\d{2,3})年(\d{1,2})月(\d{1,2})日|(\d{2,3})年(\d{1,2})月(\d{1,2})日", text)
    if match:
        if match.group(1):  # 簡寫格式
            year, month, day = match.group(1).split(".")
            year = correct_year(year)
            return f"{year}.{month}.{day}"
        elif match.group(2):  # 民國完整格式
            year = correct_year(match.group(2))
            month = match.group(3)
            day = match.group(4)
            return f"{year}.{month}.{day}"
        elif match.group(5):  # 未標註民國的格式
            year = correct_year(match.group(5))
            month = match.group(6)
            day = match.group(7)
            return f"{year}.{month}.{day}"
    return None

def correct_year(year):
    """修正年份，確保符合規則"""
    year = int(year)
    if len(str(year)) == 3 and str(year)[0] != "1":
        year = int(str(year)[-2:])
    return year

def calculate_age(birth_date):
    """根據出生年月日計算年齡"""
    try:
        if re.match(r"^\d{2,3}\.\d{1,2}\.\d{1,2}$", birth_date):
            year, month, day = map(int, birth_date.split("."))
            year += 1911  # 民國轉西元
        else:
            return None

        birth_date = datetime(year, month, day)
        today = datetime.today()
        age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day))
        return age
    except Exception as e:
        print(f"無法計算年齡: {e}")
        return None

def determine_gender(id_number):
    """根據身分證字號判斷性別"""
    try:
        match = re.search(r"[A-Z]([0-9])", id_number)
        if match:
            gender_digit = int(match.group(1))
            return "男性" if gender_digit == 1 else "女性"
        else:
            return None
    except Exception as e:
        print(f"無法判斷性別: {e}")
        return None

# 主程式
def main(document_object, FILE_PATH):
    file_name = FILE_PATH.split("/")[-1]  # 提取檔案名稱
    # 提取地址、出生年月日、身分證字號
    addresses, birth_dates, id_numbers = parse_document(document_object)

    # 提取名字
    names = extract_names(document_object, file_name)

    # 將結果存入 DataFrame
    df_addresses = pd.DataFrame(addresses)
    df_birth_dates = pd.DataFrame(birth_dates)
    df_id_numbers = pd.DataFrame(id_numbers)
    df_names = pd.DataFrame(names)

    # 將 DataFrame 存入 Excel 的不同分頁
    output_excel_path = "parsed_results.xlsx"
    with pd.ExcelWriter(output_excel_path, engine="openpyxl") as writer:
        df_addresses.to_excel(writer, sheet_name="地址", index=False)
        df_birth_dates.to_excel(writer, sheet_name="出生年月日", index=False)
        df_id_numbers.to_excel(writer, sheet_name="身分證字號", index=False)
        df_names.to_excel(writer, sheet_name="名字", index=False)

    print(f"結果已存入 {output_excel_path}")

# 假設 document_object 是 Document AI 的解析結果
# main(document_object, "example_file.pdf")

## Batch Processing Request to Document AI

In [5]:
"""
Makes a Batch Processing Request to Document AI
"""

import re

from google.api_core.client_options import ClientOptions
from google.api_core.exceptions import InternalServerError
from google.api_core.exceptions import RetryError
from google.cloud import documentai
from google.cloud import storage

# TODO(developer): Fill these variables before running the sample.

project_id = PROJECT_ID
location = "us"  # Format is "us" or "eu"
processor_id = PROCESSOR_ID  # Create processor before running sample
gcs_output_uri = GCS_OUTPUT_URL  # Must end with a trailing slash `/`. Format: gs://bucket/directory/subdirectory/
# processor_version_id = (
#     "YOUR_PROCESSOR_VERSION_ID"  # Optional. Example: pretrained-ocr-v1.0-2020-09-23
# )

# TODO(developer): If `gcs_input_uri` is a single file, `mime_type` must be specified.
gcs_input_uri = GCS_INPUT_URL  # Format: `gs://bucket/directory/file.pdf` or `gs://bucket/directory/`
input_mime_type = "application/pdf"
field_mask = "text,entities,pages.pageNumber"  # Optional. The fields to return in the Document object.


def batch_process_documents(
    project_id: str,
    location: str,
    processor_id: str,
    gcs_input_uri: str,
    gcs_output_uri: str,
    processor_version_id: str = None,
    input_mime_type: str = None,
    field_mask: str = None,
    timeout: int = 400,
):
    # You must set the api_endpoint if you use a location other than "us".
    opts = ClientOptions(api_endpoint=f"{location}-documentai.googleapis.com")

    client = documentai.DocumentProcessorServiceClient(client_options=opts)

    if not gcs_input_uri.endswith("/") and "." in gcs_input_uri:
        # Specify specific GCS URIs to process individual documents
        gcs_document = documentai.GcsDocument(
            gcs_uri=gcs_input_uri, mime_type=input_mime_type
        )
        # Load GCS Input URI into a List of document files
        gcs_documents = documentai.GcsDocuments(documents=[gcs_document])
        input_config = documentai.BatchDocumentsInputConfig(gcs_documents=gcs_documents)
    else:
        # Specify a GCS URI Prefix to process an entire directory
        gcs_prefix = documentai.GcsPrefix(gcs_uri_prefix=gcs_input_uri)
        input_config = documentai.BatchDocumentsInputConfig(gcs_prefix=gcs_prefix)

    # Cloud Storage URI for the Output Directory
    gcs_output_config = documentai.DocumentOutputConfig.GcsOutputConfig(
        gcs_uri=gcs_output_uri, field_mask=field_mask
    )

    # Where to write results
    output_config = documentai.DocumentOutputConfig(gcs_output_config=gcs_output_config)

    if processor_version_id:
        # The full resource name of the processor version, e.g.:
        # projects/{project_id}/locations/{location}/processors/{processor_id}/processorVersions/{processor_version_id}
        name = client.processor_version_path(
            project_id, location, processor_id, processor_version_id
        )
    else:
        # The full resource name of the processor, e.g.:
        # projects/{project_id}/locations/{location}/processors/{processor_id}
        name = client.processor_path(project_id, location, processor_id)

    request = documentai.BatchProcessRequest(
        name=name,
        input_documents=input_config,
        document_output_config=output_config,
    )

    # BatchProcess returns a Long Running Operation (LRO)
    operation = client.batch_process_documents(request)

    # Continually polls the operation until it is complete.
    # This could take some time for larger files
    # Format: projects/{project_id}/locations/{location}/operations/{operation_id}
    try:
        print(f"Waiting for operation {operation.operation.name} to complete...")
        operation.result(timeout=timeout)
    # Catch exception when operation doesn"t finish before timeout
    except (RetryError, InternalServerError) as e:
        print(e.message)

    # NOTE: Can also use callbacks for asynchronous processing
    #
    # def my_callback(future):
    #   result = future.result()
    #
    # operation.add_done_callback(my_callback)

    # Once the operation is complete,
    # get output document information from operation metadata
    metadata = documentai.BatchProcessMetadata(operation.metadata)

    if metadata.state != documentai.BatchProcessMetadata.State.SUCCEEDED:
        raise ValueError(f"Batch Process Failed: {metadata.state_message}")

    storage_client = storage.Client()

    print("Output files:")
    # One process per Input Document
    for process in list(metadata.individual_process_statuses):
        # output_gcs_destination format: gs://BUCKET/PREFIX/OPERATION_NUMBER/INPUT_FILE_NUMBER/
        # The Cloud Storage API requires the bucket name and URI prefix separately
        matches = re.match(r"gs://(.*?)/(.*)", process.output_gcs_destination)
        if not matches:
            print(
                "Could not parse output GCS destination:",
                process.output_gcs_destination,
            )
            continue

        output_bucket, output_prefix = matches.groups()

        # Get List of Document Objects from the Output Bucket
        output_blobs = storage_client.list_blobs(output_bucket, prefix=output_prefix)

        # Document AI may output multiple JSON files per source file
        for blob in output_blobs:
            # Document AI should only output JSON files to GCS
            if blob.content_type != "application/json":
                print(
                    f"Skipping non-supported file: {blob.name} - Mimetype: {blob.content_type}"
                )
                continue

            # Download JSON File as bytes object and convert to Document Object
            print(f"Fetching {blob.name}")
            document = documentai.Document.from_json(
                blob.download_as_bytes(), ignore_unknown_fields=True
            )

            # For a full list of Document object attributes, please reference this page:
            # https://cloud.google.com/python/docs/reference/documentai/latest/google.cloud.documentai_v1.types.Document

            # Read the text recognition output from the processor
            # print("The document contains the following text:")
            # print(document.text)


if __name__ == "__main__":
    batch_process_documents(
        project_id=PROJECT_ID,
        location=LOCATION,
        processor_id=PROCESSOR_ID,
        gcs_input_uri=GCS_INPUT_URL,
        gcs_output_uri=GCS_OUTPUT_URL,
        input_mime_type=input_mime_type,
        field_mask=field_mask,
    )

RetryError: Timeout of 120.0s exceeded, last exception: 503 failed to connect to all addresses; last error: UNKNOWN: ipv4:74.125.23.95:443: tcp handshaker shutdown

### 步驟 1：處理批量處理的結果

batch_process_documents() 方法的輸出結果會存儲在 Google Cloud Storage 中，通常是 JSON 格式的文件。您需要下載這些 JSON 文件，然後對每個文件進行資料萃取。

### 步驟 2：下載批量處理的結果
使用 Google Cloud Storage 的 Python 客戶端庫下載處理結果：

下載結果的程式碼

In [8]:
from google.cloud import storage
import os
import json

def download_batch_results(bucket_name, output_prefix, local_output_dir):
    """下載批量處理的結果到本地"""
    # 初始化 GCS 客戶端
    storage_client = storage.Client()
    bucket = storage_client.bucket(bucket_name)

    # 列出所有輸出文件
    blobs = bucket.list_blobs(prefix=output_prefix)
    os.makedirs(local_output_dir, exist_ok=True)

    downloaded_files = []
    for blob in blobs:
        # 只下載 JSON 文件
        if blob.name.endswith(".json"):
            local_file_path = os.path.join(local_output_dir, os.path.basename(blob.name))
            blob.download_to_filename(local_file_path)
            downloaded_files.append(local_file_path)
            print(f"下載文件: {blob.name} 到 {local_file_path}")

    return downloaded_files

# 使用範例
bucket_name = BUCKET_NAME                          #Google Storage上的Bucket Name
output_prefix = "results/15560794818710564337/"  # 批量處理的輸出目錄 #從Google Storage上選擇
local_output_dir = "./batch_results"             # 本地存儲目錄

downloaded_files = download_batch_results(bucket_name, output_prefix, local_output_dir)

下載文件: results/15560794818710564337/0/測試用連署書-0.json 到 ./batch_results\測試用連署書-0.json
下載文件: results/15560794818710564337/1/測試用連署書_手寫-0.json 到 ./batch_results\測試用連署書_手寫-0.json
下載文件: results/15560794818710564337/2/測試用連署書_打字3人-0.json 到 ./batch_results\測試用連署書_打字3人-0.json


### 步驟 3：解析批量處理的結果
每個 JSON 文件代表一個處理結果，您可以使用之前的 parse_document() 函式來解析這些結果。

解析 JSON 文件的程式碼

In [9]:
import os
import json
import re
import pandas as pd
from datetime import datetime
from openpyxl import Workbook


def parse_document_from_json(json_file_path, file_name):
    """從 JSON 文件中解析文檔內容，並判定頁碼"""
    with open(json_file_path, "r", encoding="utf-8") as f:
        document_data = json.load(f)

    # 獲取文檔的文字內容
    document_text = document_data.get("text", "")
    # 將文字內容按行分割
    lines = document_text.split("\n")

    # 初始化結果列表
    addresses = []
    birth_dates = []
    id_numbers = []
    names = []

    # 初始化頁碼
    current_page = 0
    noise_words = [
        "地址", "先生", "小姐", "姓名",
        "性別", "年齡", "民國", "新北市", "台北市", "中正區",
        "罷免案", "連署人", "名冊", "國統", "樓之","出生年","聯絡","電話","行政區","編姓"
    ]
    # 遍歷每一行
    for line_number, line_text in enumerate(lines, start=1):
        line_text = line_text.strip()
        if not line_text:
            continue  # 跳過空行

        # 判定是否是新的一頁
        if "罷免案連署人名冊" in line_text:
            current_page += 1  # 遇到標題時，頁碼加 1
            continue  # 跳過標題行

        # 初始化當前行的資料
        current_data = {
            "地址": None,
            "里": None,
            "鄰": None,
            "路/街": None,
            "出生年月日": None,
            "年齡": None,
            "身分證字號": None,
            "性別": None,
            "行號": line_number,
            "頁碼": current_page
        }

        # 提取名字

        # 提取名字
        if re.match(r"^[\u4e00-\u9fa5]{2,3}$", line_text) and line_text not in noise_words:
            names.append({
                "檔案名稱": file_name,
                "頁碼": current_page,
                "姓名": line_text
            })

        # 提取地址
        if "里" in line_text and "鄰" in line_text:
            current_data["地址"] = line_text
            current_data["里"] = extract_specific_field(line_text, r"([一-龥]{1,2})里")
            current_data["鄰"] = extract_specific_field(line_text, r"(\d{1,2})鄰")
            current_data["路/街"] = extract_specific_field(line_text, r"([一-龥]{1,3})(路|街)(?!鄰)")

        # 提取出生年月日
        birth_date = extract_birth_date(line_text)
        if birth_date:
            current_data["出生年月日"] = birth_date
            current_data["年齡"] = calculate_age(birth_date)

        # 提取身分證字號
        if re.search(r"[A-Z][0-9]+", line_text):
            current_data["身分證字號"] = line_text
            current_data["性別"] = determine_gender(line_text)

        # 如果該行有地址，存入地址列表
        if current_data["地址"]:
            addresses.append({
                "地址": current_data["地址"],
                "里": current_data["里"],
                "鄰": current_data["鄰"],
                "路/街": current_data["路/街"]
            })

        # 如果該行有出生年月日，存入出生年月日列表
        if current_data["出生年月日"]:
            birth_dates.append({
                "出生年月日": current_data["出生年月日"],
                "年齡": current_data["年齡"]
            })

        # 如果該行有身分證字號，存入身分證字號列表
        if current_data["身分證字號"]:
            id_numbers.append({
                "身分證字號前五碼": current_data["身分證字號"][:5],
                "性別": current_data["性別"]
            })

    return addresses, birth_dates, id_numbers, names


def extract_specific_field(text, pattern):
    """提取地址中的特定字段（如里、鄰、路/街）"""
    match = re.search(pattern, text)
    if match:
        result = match.group(0)
        return result.replace("鄰", "")  # 確保「鄰」字被移除
    return None

def extract_birth_date(text):
    """提取生日，支持多種格式並修正錯誤年份"""
    match = re.search(r"(\d{2,3}\.\d{1,2}\.\d{1,2})|民國(\d{2,3})年(\d{1,2})月(\d{1,2})日|(\d{2,3})年(\d{1,2})月(\d{1,2})日", text)
    if match:
        if match.group(1):  # 簡寫格式
            year, month, day = match.group(1).split(".")
            year = correct_year(year)
            return f"{year}.{month}.{day}"
        elif match.group(2):  # 民國完整格式
            year = correct_year(match.group(2))
            month = match.group(3)
            day = match.group(4)
            return f"{year}.{month}.{day}"
        elif match.group(5):  # 未標註民國的格式
            year = correct_year(match.group(5))
            month = match.group(6)
            day = match.group(7)
            return f"{year}.{month}.{day}"
    return None

def correct_year(year):
    """修正年份，確保符合規則"""
    year = int(year)
    if len(str(year)) == 3 and str(year)[0] != "1":
        year = int(str(year)[-2:])
    return year

def calculate_age(birth_date):
    """根據出生年月日計算年齡"""
    try:
        if re.match(r"^\d{2,3}\.\d{1,2}\.\d{1,2}$", birth_date):
            year, month, day = map(int, birth_date.split("."))
            year += 1911  # 民國轉西元
        else:
            return None

        birth_date = datetime(year, month, day)
        today = datetime.today()
        age = today.year - birth_date.year - ((today.month, today.day) < (birth_date.month, birth_date.day))
        return age
    except Exception as e:
        print(f"無法計算年齡: {e}")
        return None

def determine_gender(id_number):
    """根據身分證字號判斷性別"""
    try:
        match = re.search(r"[A-Z]([0-9])", id_number)
        if match:
            gender_digit = int(match.group(1))
            return "男性" if gender_digit == 1 else "女性"
        else:
            return None
    except Exception as e:
        print(f"無法判斷性別: {e}")
        return None

# 主程式
def main(json_files, output_excel_path):
    os.makedirs(output_excel_path, exist_ok=True)
    all_addresses = []
    all_birth_dates = []
    all_id_numbers = []
    all_names = []

    for json_file in json_files:
        file_name = os.path.basename(json_file)  # 提取檔案名稱
        print(f"處理文件: {json_file}")
        addresses, birth_dates, id_numbers, names = parse_document_from_json(json_file, file_name)
        all_addresses.extend(addresses)
        all_birth_dates.extend(birth_dates)
        all_id_numbers.extend(id_numbers)
        all_names.extend(names)

    # 將結果存入 DataFrame
    df_addresses = pd.DataFrame(all_addresses)
    df_birth_dates = pd.DataFrame(all_birth_dates)
    df_id_numbers = pd.DataFrame(all_id_numbers)
    df_names = pd.DataFrame(all_names)
    current_time = datetime.now()
    # 將 DataFrame 存入 Excel 的不同分頁
    with pd.ExcelWriter(f"{output_excel_path}/parse_{current_time.strftime('%m_%d_%H_%M')}.xlsx", engine="openpyxl") as writer:
        df_addresses.to_excel(writer, sheet_name="地址", index=False)
        df_birth_dates.to_excel(writer, sheet_name="出生年月日", index=False)
        df_id_numbers.to_excel(writer, sheet_name="身分證字號", index=False)
        df_names.to_excel(writer, sheet_name="名字", index=False)
    
    print(f"結果已存入 {output_excel_path}")



### 步驟 4：存成 CSV 文件
將合併後的 DataFrame 存成 CSV 文件：

存成 CSV 的程式碼

In [10]:
# 取得當前的日期和時間


# 格式化日期和時間為 "月_日_時_分"

# 執行資料萃取並存入 Excel
output_excel_dir = f"parsed_results"
main(downloaded_files, output_excel_dir)

處理文件: ./batch_results\測試用連署書-0.json
處理文件: ./batch_results\測試用連署書_手寫-0.json
處理文件: ./batch_results\測試用連署書_打字3人-0.json
結果已存入 parsed_results
