In [None]:
import re
from opencc import OpenCC
from bs4 import BeautifulSoup
import jieba
from glob import glob

import torch
from tqdm.auto import tqdm

import sys
!ls ../package/
sys.path.insert(0, "../package/")
from ltp import LTP
nlp = LTP(path="base")

class TextCleaner:
    '''
        批量清洗数据
    '''
    def __init__(self,
                 remove_space=True, # 去除空格
                 remove_suspension=True, # 转换省略号
                 only_zh=False, # 只保留汉子
                 remove_sentiment_character=True, # 去除表情符号
                 to_simple=True, # 转化为简体中文
                 remove_html_label=True,
                 remove_stop_words=False,
                 stop_words_dir="./停用词/",
                 with_space=False,
                 batch_size=256):
        self._remove_space = remove_space
        self._remove_suspension = remove_suspension
        self._remove_sentiment_character = remove_sentiment_character

        self._only_zh = only_zh
        self._to_simple = to_simple

        self._remove_html_label = remove_html_label
        self._remove_stop_words = remove_stop_words
        self._stop_words_dir = stop_words_dir

        self._with_space = with_space
        self._batch_size = batch_size

    def clean_single_text(self, text):
        if self._remove_space:
            text = self.remove_space(text)
        if self._remove_suspension:
            text = self.remove_suspension(text)
        if self._remove_sentiment_character:
            text = self.remove_sentiment_character(text)
        if self._to_simple:
            text = self.to_simple(text)
        if self._only_zh:
            text = self.get_zh_only(text)
        if self._remove_html_label:
            text = self.remove_html(text)
        return text

    def clean_text(self, text_list):
        text_list = [self.clean_single_text(text) for text in tqdm(text_list)]
        tokenized_words_list = self.tokenizer_batch_text(text_list)
        if self._remove_stop_words:
            text_list = [self.remove_stop_words(words_list, self._stop_words_dir, self._with_space) for words_list in tokenized_words_list]
        return text_list

    def remove_space(self, text):     #定义函数
        return text.replace(' ','')   # 去掉文本中的空格

    def remove_suspension(self, text):
        return text.replace('...', '。')

    def get_zh_only(self, text):
        def is_chinese(uchar):
            if uchar >= u'\u4e00' and uchar <= u'\u9fa5':  # 判断一个uchar是否是汉字
                return True
            else:
                return False

        content = ''
        for i in text:
            if is_chinese(i):
                content = content+i
        return content

    def remove_sentiment_character(self, sentence):    
        pattern = re.compile("[^\u4e00-\u9fa5^,^.^!^，^。^?^？^！^a-z^A-Z^0-9]")  #只保留中英文、数字和符号，去掉其他东西
        #若只保留中英文和数字，则替换为[^\u4e00-\u9fa5^a-z^A-Z^0-9]
        line = re.sub(pattern,'',sentence)  #把文本中匹配到的字符替换成空字符
        new_sentence=''.join(line.split())    #去除空白
        return new_sentence

    def to_simple(self, sentence):
        new_sentence = OpenCC('t2s').convert(sentence)   # 繁体转为简体
        return new_sentence

    def to_tradition(self, sentence):
        new_sentence = OpenCC('s2t').convert(sentence)   # 简体转为繁体
        return new_sentence

    def remove_html(self, text):
        return BeautifulSoup(text, 'html.parser').get_text() #去掉html标签

    def tokenizer_batch_text(self, text_list):
        tokenized_text = []
        len_text = len(text_list)
        with torch.no_grad():
            steps = self._batch_size
            for start_idx in tqdm(range(0, len_text, steps)):
                if start_idx + steps > len_text:
                    tokenized_text += nlp.seg(text_list[start_idx:])[0]
                else:
                    tokenized_text += nlp.seg(text_list[start_idx:start_idx+steps])[0]
        return tokenized_text

    def remove_stop_words(self, words_list, stop_words_dir, with_space=False):
        """
        中文数据清洗  stopwords_chineses.txt存放在博客园文件中
        :param text:
        :return:
        """
        stop_word_filepath_list = glob(stop_words_dir + "/*.txt")
        for stop_word_filepath in stop_word_filepath_list:
            with open(stop_word_filepath) as fp:
                stopwords = {}.fromkeys([line.rstrip() for line in fp]) #加载停用词(中文)
        eng_stopwords = set(stopwords) #去掉重复的词
        words = [w for w in words_list if w not in eng_stopwords] #去除文本中的停用词
        if with_space:
            return ' '.join(words)
        else:
            return ''.join(words)

In [None]:
def Pretreatment(): 
    #加载数据
    data_df=pd.read_csv('sohu.txt',sep='\t',header=None)
    data_df.columns=['分类','文章']
    #读取停顿词列表
    stopword_list=[k.strip() for k in open('stopwords.txt',encoding='utf8').readlines() if k.strip()!='']
    #对样本循环遍历，使用jieba库的cut方法获得分词列表，判断此分词是否为停用词，如果不是停用词赋值给变量cutWords
    cutWords_list=[]
    for article in data_df['文章']:
        cutWords=[k for k in jieba.cut(article) if k not in stopword_list]
        cutWords_list.append(cutWords)
    #由于分词过程较为耗时，将分词结果保存为本地文件cutWords_list.txt，其后就可以直接读取本地文件
    with open('cutWords_list.txt','w') as file:
        for cutWordsin in cutWords_list:
            file.write(' '.join(cutWords)+'\n')  
    #读取已保存的分词文件
    with open('cutWords_list.txt') as file:
        cutWords_list=[k.split() for k in file.readlines()]

    #文本特征提取及向量化
    tfidf=TfidfVectorizer(cutWords_list,stop_words=stopword_list,min_df=40,max_df=0.3)
    X=tfidf.fit_transform(data_df['文章'])
    labelEncoder=LabelEncoder()
    y=labelEncoder.fit_transform(data_df['分类'])
    with open('tfidf_feature.pkl','wb') as file:  #将tfidf特征写入文件，之后可以直接读取该文件
        save={'featureMatrix':X,'label':y}
        pickle.dump(save,file)
    
    print('文本分类数据预处理完成！')