# 案例：中文文本大数据Python程序处理全流程——以年报为例
## 案例背景
以实际案例来讲解如何利用Python对中文文本数据进行处理，整个案例以中文的年报数据为基础，处理流程主要分为三个主要部分：数据预处理、文档表示和特征抽取

## 数据预处理

对中文文本进行预处理，对文本信息内容进行筛选和标注，将不必要的内容进行剔除。

In [2]:
import re
import os
import pdb
import tqdm
from pathlib import Path
import re
import pickle
from itertools import chain
from functools import reduce
from multiprocessing import pool, cpu_count
import jieba

### 文本定位

定义抽取的规则，其中head_of_MDA中的关键词一般作为年报中MD&A部分的标题，同样这些关键词也会出现在年报的标题部分，这会对提取MD&A部分造成干扰，但如果这些关键词出现在目录中，那么它们一定是成对出现的，可以利用这个规律对目录部分进行清洗。

pattern_of_MDA是用来提取MD&A部分的规则，按照开始关键词和结束关键词进行两两配对，提取年报中的MD&A部分。

In [3]:
head_of_MDA = ["管理层讨论与分析", "经营情况讨论与分析", "董事会报告", "董事局报告"]
pattern_of_MDA = [
    "第.节\s*?管理层讨论与分析(?!\s*?\.)(?!\w)(?!”)[\s\S]+?(?=第.*?股份变动及股东情况(?!\”)(?!\s*?\.)\s+?)",
    "第.节\s*?管理层讨论与分析(?!\s*?\.)(?!\w)(?!”)[\s\S]+?(?=第.*?重要事项(?!\”)(?!\s*?\.)\s+?)",
    "第.节\s*?经营情况讨论与分析(?!\s*?\.)(?!\w)(?!”)[\s\S]+?(?=第.*?重要事项(?!\”)(?!\s*?\.)\s+?)",
    "第.节\s*?董事会报告(?!\s*?\.)(?!\w)(?!”)[\s\S]+?(?=第.*?重要事项(?!\”)(?!\s*?\.)\s+?)",
    "第.节\s*?董事局报告(?!\s*?\.)(?!\w)(?!”)[\s\S]+?(?=第.*?重要事项(?!\”)(?!\s*?\.)\s+?)",
    "第.节\s*?董事会报告(?!\s*?\.)(?!\w)(?!”)[\s\S]+?(?=第.*?监事会报告(?!\”)(?!\s*?\.)\s+?)",
    "第.节\s*?董事局报告(?!\s*?\.)(?!\w)(?!”)[\s\S]+?(?=第.*?监事会报告(?!\”)(?!\s*?\.)\s+?)",
]

 用来清除年报的目录部分，因目录部分也会存在我们想要的提取的结构，

In [4]:
def remove_catalogue(path, dealed_path, file_list):  # 清除年报的目录，以方便截取关键部分
    for filename in tqdm.tqdm(file_list, desc="清除年报目录"):
        try:
            with open(filename, "r", encoding="utf-8") as f1, open(dealed_path.joinpath(filename.name), "w", encoding="utf-8") as f2:
                text = f1.read()
                for i in range(len(head_of_MDA)):
                    target = head_of_MDA[i] + "[\s\S]+?" + head_of_MDA[i]
                    content = re.search(target, text)
                    if content != None:
                        content = content.group()[: -len(head_of_MDA[i])]
                        text = text.replace(content, "")
                        break
                f2.write(text + "\n")
                f2.flush()
        except:
            pass

在完成对目录的清除后，按照规则对年报的内容进行提取

In [5]:
def extract(dealed_path, caught_path, file_list):  # 根据规则抓取数据
    for filename in tqdm.tqdm(file_list, desc="提取MD&A"):
        pattern_of_MDA_file = filename.name
        try:
            with open(dealed_path.joinpath(filename.name), "r", encoding="utf-8") as f1, open(caught_path.joinpath(pattern_of_MDA_file), "w", encoding="utf-8") as f2:
                text = f1.read()
                for i in range(len(pattern_of_MDA)):
                    target = pattern_of_MDA[i]
                    data = re.search(target, text)
                    if data != None:
                        f2.write(data.group())
                        f2.flush()
                        break
        except:
            pass

 获取提取失败的年报列表，判断方法为读取匹配文件的内容，如该文件内容过小，则判断其为匹配失败

In [6]:
def get_todo_list(caught_path):
    caught_file = caught_path.glob("*.txt")
    error_list = []
    for file in caught_file:
        if file.stat().st_size < 5 * 1024:  # 如果文件大小不足5KB，就判断为匹配失败
            error_list.append(file)
    return error_list

 再次对截取失败的年报进行处理，大部分失败的原因是没有成功的清除掉目录部分

In [9]:
def removecatalogue_and_catch(dealed_path, caught_path, try_num):
    print("try 3 more times")
    todo_list = get_todo_list(caught_path)
    while len(todo_list) > 0 and try_num > 0:
        for file in tqdm.tqdm(todo_list, desc="再次处理"):
            pattern_of_MDA_path = dealed_path.joinpath("_".join(file.name.split("_")[-2:]))
            with pattern_of_MDA_path.open(mode="r+", encoding="utf-8") as f:
                text = f.read()  # 读取文件的内容，再次清除目录
                f.truncate()  # 清除掉文件的内容
                for i in range(len(head_of_MDA)):
                    target = head_of_MDA[i] + "[\s\S]+?" + head_of_MDA[i]
                    content = re.search(target, text)
                    if content != None:
                        content = content.group()[: -len(head_of_MDA[i])]
                        text = text.replace(content, "")
                        break
                f.write(text + "\n")
        extract(dealed_path, caught_path, todo_list)
        todo_list = get_todo_list(caught_path)
        try_num -= 1

对1961个年报进行MD&A的提取

In [10]:
path = Path("../data/annual_report")  # 原始年报存放路径
dealed_path = Path("../data/catalogue_removed")  # 删除了目录部分的年报要存储的目录
caught_path = Path("../data/md&a")  # 年报截取的部分要存储的目录

dealed_path.mkdir(exist_ok=True)  # 创建需要的文件夹
caught_path.mkdir(exist_ok=True)  # 创建需要的文件夹

file_list = list(path.glob("*.txt"))  # 获得所有要提取的年报的名字

remove_catalogue(path, dealed_path, file_list)
extract(dealed_path, caught_path, file_list)
removecatalogue_and_catch(dealed_path, caught_path, 3)
for i in get_todo_list(caught_path):
    i.unlink()  # 删除掉提取失败的文本

清除年报目录: 100%|███████████████████████████████████████████████████████████████| 5227/5227 [00:10<00:00, 497.24it/s]
提取MD&A: 100%|███████████████████████████████████████████████████████████████████| 5227/5227 [00:11<00:00, 461.35it/s]


try 3 more times


再次处理: 100%|██████████████████████████████████████████████████████████████████| 2006/2006 [00:01<00:00, 1815.13it/s]
提取MD&A: 100%|███████████████████████████████████████████████████████████████████| 2006/2006 [00:09<00:00, 209.08it/s]
再次处理: 100%|██████████████████████████████████████████████████████████████████| 2004/2004 [00:01<00:00, 1294.94it/s]
提取MD&A: 100%|███████████████████████████████████████████████████████████████████| 2004/2004 [00:06<00:00, 319.45it/s]
再次处理: 100%|███████████████████████████████████████████████████████████████████| 2004/2004 [00:02<00:00, 904.95it/s]
提取MD&A: 100%|███████████████████████████████████████████████████████████████████| 2004/2004 [00:15<00:00, 125.95it/s]


 截取年报的MD&A部分，放入到md&a目录下， 供后续数据分析使用

### 中文的分词

In [11]:
all_mda_file = Path("../data/md&a")  # 声明放置匹配MD&A内容的文件夹
all_files = list(all_mda_file.glob("*.txt"))  # 获得所有MD&A文件的文件名
all_word = []  # 用来存储所有的分词后的单词
doc_word = []  # 用来存储每个文档对应的单词列表，每个文档为一个列表，每个列表对应一篇文档的分词结果
doc_sentences = []  # 用来存储每个文档对应的句子列表，每个文档为多个列表，每个列表对应一个句子的分词结果

In [12]:
from process_func import process_cut # 将函数放到另外的文件中，是为了能够在windows环境下运行multiprocessing进行加速

In [13]:
process_pool = pool.Pool(processes=cpu_count())  # 使用多进程加速处理,使用进程池
res = process_pool.map_async(process_cut, tqdm.tqdm(all_files, desc="分词中"))  # 使用进程池的异步map方法
process_pool.close()  # 关闭进程池，不再加入新的进程
process_pool.join()  # 等待子进程执行结束
res = res.get()  # 获得分词的结果

分词中: 100%|█████████████████████████████████████████████████████████████████████| 3223/3223 [00:24<00:00, 131.30it/s]


### 去停用词

In [14]:
from process_func import filter_word # 将函数放到另外的文件中，是为了能够在windows环境下运行multiprocessing进行加速

In [15]:
filter_pool = pool.Pool(processes=4)  # 使用多进程加速处理,
filter_res = filter_pool.starmap_async(filter_word, tqdm.tqdm(res, desc="去停用词"))
filter_pool.close()
filter_pool.join()
for sent_res, doc_word_res in tqdm.tqdm(filter_res.get(), desc="填充列表"):
    doc_sentences.append(sent_res)
    all_word.extend(doc_word_res)
    doc_word.append(doc_word_res)

# 将预处理之后的结果缓存起来，供后续数据分析使用
with open("../data/all_word.pth", "wb") as f1, open("../data/doc_word.pth", "wb") as f2, open("../data/doc_sentences.pth", "wb") as f3:
    pickle.dump(all_word, f1)
    pickle.dump(doc_word, f2)
    pickle.dump(doc_sentences, f3)

去停用词: 100%|████████████████████████████████████████████████████████████████████| 3223/3223 [00:39<00:00, 81.42it/s]
填充列表: 100%|██████████████████████████████████████████████████████████████████| 3223/3223 [00:00<00:00, 5634.39it/s]
