In [23]:
import re
import os
import json
import docx
import time
import requests
import datetime
import urllib.parse as urlparse
from bs4 import BeautifulSoup
from tqdm import tqdm

def timestamp():
    now = datetime.datetime.now()
    # 将datetime对象转换为时间戳（以秒为单位）
    timestamp_seconds = time.mktime(now.timetuple())
    # 再将秒转换为毫秒，乘以1000得到13位的时间戳
    timestamp_milliseconds = int(timestamp_seconds * 1000)
    return timestamp_milliseconds


def get_data(url, headers, data=None):
    first_time = True
    while first_time or resp is None or resp.status_code != 200:
        first_time = False
        try:
            if data is None:
                resp = requests.get(url, headers=headers, timeout=5)
            else:
                resp = requests.post(url, headers=headers, data=data, timeout=5)
        except requests.exceptions.Timeout:
            resp = None
            continue
        time.sleep(1)
        print(f"failed {url} data: {data}")
    return resp.json()


base_dir = "/data/peitian/Data/legal/flk"
raw_dir = os.path.join(base_dir, "raw")
output_dir = os.path.join(base_dir, "output")

In [2]:
host = "https://flk.npc.gov.cn"
download_host = "https://wb.flk.npc.gov.cn"
api_url = urlparse.urljoin(host, "./api/detail")

headers = {
    "accept": "application/json, text/javascript, */*; q=0.01",
    "accept-language": "zh-CN,zh;q=0.9,en;q=0.8,en-GB;q=0.7,en-US;q=0.6",
    "sec-ch-ua": "\"Microsoft Edge\";v=\"131\", \"Chromium\";v=\"131\", \"Not_A Brand\";v=\"24\"",
    "sec-ch-ua-mobile": "?0",
    "sec-ch-ua-platform": "\"macOS\"",
    "sec-fetch-dest": "empty",
    "sec-fetch-mode": "cors",
    "sec-fetch-site": "same-origin",
    "x-requested-with": "XMLHttpRequest",
    "cookie": "wzws_sessionid=gmZhYjg1ZqBnZNRogDM2LjExMC4xNjMuNjWBY2E5ZDdi; Hm_lvt_54434aa6770b6d9fef104d146430b53b={timestamp}; HMACCOUNT=6B3CB05578CC9581; Hm_lpvt_54434aa6770b6d9fef104d146430b53b={timestamp}",
    "referer": "https://flk.npc.gov.cn/fl.html",
    "referrer-policy": "strict-origin-when-cross-origin"
}

In [None]:
ts = timestamp()
headers["cookie"] = headers["cookie"].format(timestamp=ts)

num_files = len(os.listdir(raw_dir))

page_path = os.path.join(raw_dir, "pages.txt")

with open(page_path, "r") as f:
    start_idx = int(f.read().strip().split("\n")[-1])

with open(page_path, "a+") as g:
    for i in range(start_idx, 70):
        print(f"Curling page {i}...")

        page_url = f"https://flk.npc.gov.cn/api/?type=flfg&searchType=title%3Bvague&sortTr=f_bbrq_s%3Bdesc&gbrqStart=&gbrqEnd=&sxrqStart=&sxrqEnd=&sort=true&page={i}&size=10&_={ts}"
        page_headers = headers
        data = get_data(page_url, page_headers)

        for item in tqdm(data["result"]["data"], desc="Enumerating Items"):
            # item = detail_list[0]
            title = item["title"]

            detail_url = urlparse.urljoin(host, item["url"])

            api_headers = headers.copy()
            api_headers["origin"] = host
            api_headers["referer"] = detail_url
            api_headers["content-type"] = "application/x-www-form-urlencoded; charset=UTF-8"

            data = get_data(api_url, api_headers, data=urlparse.urlencode({"id": item["id"]}))
            word_path = [x for x in data["result"]["body"] if x["type"] == "WORD"]
            if len(word_path):
                path = word_path[0]["path"]
                ext = "docx"
            else:
                path = [x for x in data["result"]["body"] if x["type"] == "HTML"][0]["url"]
                ext = "html"
            download_url = urlparse.urljoin(download_host, path)

            download_resp = requests.get(download_url)
            try:
                with open(os.path.join(raw_dir, f"{title}.{ext}"), "wb") as f:
                    f.write(download_resp.content)
            except OSError:
                pass
            time.sleep(1)

        time.sleep(1)
        
        g.write(str(i) + "\n")

In [19]:
SECTION_PATTERN = "(\s*第[一二三四五六七八九十百千]+章\s+.*?\n)"
ARTICLE_PATTERN = "(\s*第[一二三四五六七八九十百千]+条)"

def read_docx_file(file_path):
    try:
        doc = docx.Document(file_path)
    except:
        return ""
    full_text = []
    for paragraph in doc.paragraphs:
        full_text.append(paragraph.text)
    return '\n'.join(full_text)

def process_html(path):
    with open(path, "rb") as f:
        content = f.read()

    soup = BeautifulSoup(content, "lxml")
    result = []
    rules = soup.find_all("p", class_="law-rule-text")
    current_article = None
    
    for rule in rules:
        num_span = rule.find("span", class_="law-rule-num")
        text_span = rule.find("span", class_="rule-text")
        if num_span and text_span:
            # 新条目
            if current_article:
                result.append(current_article)
            current_article = {"content": text_span.text.strip(), "law_section": None, "law_article": num_span.text.strip()}
        elif current_article:
            # 合并到当前条目
            current_article["content"] += "\n" + rule.text.strip()
    
    if current_article:
        result.append(current_article)
    
    return result

def process_docx(path):
    content = read_docx_file(path)
    content = content.replace("\u3000", " ")

    # 判断是否包含章节信息
    has_sections = bool(re.search(SECTION_PATTERN, content))
    result = []
    current_article = None
    current_section = None
        
    if has_sections:
        # 按章节和条文划分
        sections = re.split(SECTION_PATTERN, content)
        for section in sections:
            section_match = re.search(SECTION_PATTERN, section)
            if section_match:
                current_section = section_match.group().strip()
            # ['第一条', 'xxx', '第二条', 'xxx', ...]
            articles = re.split(ARTICLE_PATTERN, section)
            article_contents = []
            is_first_match = True
            # print(f"Fuck {current_section} {articles}")
            # NOTE: manually add an auxiliary article so that all articles are processed by the same logic
            articles.append("第四条")
            for article in articles:
                article = article.strip()
                if len(article) == 0:
                    continue
                article_match = re.search(ARTICLE_PATTERN, article)
                # NOTE: the first article_match indicates an empty article_contents, ignore it
                if article_match:
                    if not is_first_match:
                        result.append({"content": "\n".join(article_contents), "law_section": current_section, "law_article": current_article})
                    article_contents.clear()
                    current_article = article_match.group().strip()
                    is_first_match = False
                # NOTE: split text to unify formats
                article_contents.extend([x for x in re.split("\s+", article.strip()) if len(x)])
                # print(f"Shit {article} article_contents {article_contents}")
    else:
        # 仅按条文划分
        # ['第一条', 'xxx', '第二条', 'xxx', ...]
        articles = re.split(ARTICLE_PATTERN, content)
        # NOTE: manually add an auxiliary article so that all articles are processed by the same logic
        articles.append("第四条")
        article_contents = []
        is_first_match = True
        # print(f"Fuck {current_section} {articles}")
        for article in articles:
            article = article.strip()
            if len(article) == 0:
                continue
            article_match = re.search(ARTICLE_PATTERN, article)
            if article_match:
                # NOTE: the first article_match indicates an empty article_contents, ignore it
                if not is_first_match:
                    result.append({"content": "\n".join(article_contents), "law_section": current_section, "law_article": current_article})
                article_contents.clear()
                current_article = article_match.group().strip()
                is_first_match = False
            # NOTE: split text to unify formats
            article_contents.extend([x for x in re.split("\s+", article.strip()) if len(x)])
            # print(f"Shit {article} article_contents {article_contents}")
    return result

In [22]:
os.makedirs(output_dir, exist_ok=True)
for file_name in tqdm(os.listdir(raw_dir)):
    src_path = os.path.join(raw_dir, file_name)
    law_name, ext = file_name.split(".")
    dest_path = os.path.join(output_dir, f"{law_name}.jsonl")
    
    if not law_name.endswith("法"):
        continue
    
    with open(dest_path, "w", encoding="utf-8") as f:
        # if ext == "html":
        #     results = process_html(src_path)
        # else:
        results = process_docx(src_path)
        
        for res in results:
            res["law_name"] = law_name
            f.write(json.dumps(res, ensure_ascii=False) + "\n")

100%|██████████| 559/559 [00:06<00:00, 80.95it/s] 
