In [39]:
import ujson
import re
from os.path import dirname, abspath, exists, isdir
from os import remove, mkdir, walk
import time
from collections import defaultdict

from matplotlib import pyplot as plt
import codecs, csv
import pandas as pd 
import numpy as np
from rich import progress
from rich.table import Table
from rich.console import Console
from fastparquet import ParquetFile, write
import pyarrow.parquet as pq
from opencc import OpenCC

import sys
sys.path.extend(['.','..'])

from logger import Logger
from config import PROJECT_ROOT
from utils.functions import get_path_of_suffix_files, DropDatasetDuplicate

log = Logger('data_process', save2file=True, file_name=PROJECT_ROOT + '/logs/raw_data_process.log')

punctuation = set("!\"#$%&'()*+,-./:;<=>?@[\]^_`{|}~.,;《》？！“”‘’@#￥%…&×（）——+【】{};；●，。&～、|\s:：\n")
en_punctuation = ",().!;:"
zh_punctuation = "，（）。！；："

In [23]:
def delete_file(file: str)-> bool:
    '''
    询问删除文件
    '''
    if exists(file):
        ans = input('delete file: {} ? Yes (y) or No (n)'.format(file))
        ans = ans.lower()
        if ans in ('yes', 'y'):
            remove(file)
            print('deleted.')
            return True
    return False

def remove_duplicate_punctuation(sentence: str) -> str:
    '''
    删除句子中重复的标点符号、重复的空格，同时将换行变为特殊字符'\n'
    '''
    # 将空格（全角空格）替换为逗号, 可能会有重复的空客，下面删除重复标点会删除
    sentence = re.sub(' |　', '，', sentence) 

    ans = ''
    n = len(sentence)
    p = 0
    while p < n:
        ans += sentence[p]

        while p + 1 < n and sentence[p] in punctuation and sentence[p + 1] in punctuation:
            p += 1
        p += 1

    return ans

def convert_en_punctuation_to_zh_punct(sentence: str) -> str:
    '''
    将句子中的英文标点替换文中文标点
    '''
    n = len(zh_punctuation)
    for i in range(n):
        sentence = sentence.replace(en_punctuation[i], zh_punctuation[i])
    return sentence

def get_sentences_dice_similarity(st_a: str, st_b: str) -> float:
    '''
    获取两个句子的Dice相似度（Dice similarity）
    s(a, b) =  2 * len( set(a) & set(b) ) / (len(set(a)) + len(set(b)))
    '''
    set_a, set_b = set(st_a), set(st_b)
    total_len  = len(set_a) + len(set_b)
    
    if total_len == 0: return 0.0

    inter_set =  set_a & set_b
    
    return ( 2 * len(inter_set)) / total_len

def write_single_parquet_file(file_name: str, data_frame: pd.DataFrame) -> None:
    '''
    将dataframe写到单独的parquet file中
    '''
    append = False
    if exists(file_name):
        append = True 

    write(file_name, data_frame, compression='GZIP',append=append)


def read_and_write_template(read_file: str, write_to_file: str, call_back: object, group_cnt: int=10000) -> None:
    '''
    处理数据读写模板，需要提供一个回调函数call_back，
    read_file: 原始数据文件
    write_to_file：处理后的要保存数据文件
    call_back：函数输入一个字符串，输出一个处理后的字典dict，如果输入的字符串为无效数据，请返回None
    group_cnt: parquet file分割行数
    如：
    >>> def call_back(inputs: str) -> dict:
    >>>     if check(inputs) not valid:
    >>>         return None
    ...    
    ...    do something for inputs
    ...
    >>>     my_dict = {
    >>>             'prompt': inputs['p'],
    >>>             'response': inputs['a1'] + inputs['a2'],
    >>>             ...
    >>>         }
    >>>     return my_dict
    '''

    log.info('process file:{}'.format(read_file), save_to_file=True)
    start = time.time()
    
    raw_line_cnt = 0
    keep_line_cnt = 0
    
    with progress.open(read_file, 'r', encoding='utf-8') as f_read:
        cur_rows = []
        append = cur_rows.append
        for line in f_read:
            try:
                raw_line_cnt += 1

                write_dict = call_back(line)

                if write_dict is None: continue

                keep_line_cnt += 1
                append(write_dict)
                # ujson.dump(write_obj, f_write, indent=4, ensure_ascii=False)
                # ujson.dump(write_obj, f_write,  ensure_ascii=False,)
                # f_write.write('\n')

                if len(cur_rows) >= group_cnt:
                    df = pd.DataFrame(cur_rows)
                    write_single_parquet_file(write_to_file, df)
                    cur_rows = []
                    append = cur_rows.append

            except Exception as e:
                # log.error('处理文件异常：{}, content:{}'.format(str(e), line))
                print(line)
                raise e
        
        # end for
        # 处理末尾部分
        if len(cur_rows) > 0:
            df = pd.DataFrame(cur_rows)
            write_single_parquet_file(write_to_file, df)
            cur_rows = []
    
    end = time.time()

    log.info('原始文件:{}，共{}行，处理后剩余{}行，保存到文件：{}。耗时：{:.6}s'\
                .format(read_file, raw_line_cnt, keep_line_cnt, write_to_file, end - start), save_to_file=True)



In [25]:
def process_web_text(keep_start: int=5, response_less_word: int=10) -> None:
    '''
    处理425万社区问答webtext2019zh知识类数据集
    keep_start: 只保留点赞数大于keep_start的问答
    response_less_word: 答案至少要有response_less_word个字
    '''
    file_names = [
        '/data/raw_data/web_text_zh_test.json',
        '/data/raw_data/web_text_zh_train.json',
        '/data/raw_data/web_text_zh_valid.json',
    ]

    save_file_name = PROJECT_ROOT + '/data/my_data/my_web_text_zh.parquet'

    # 后续append写入，存在文件先删除
    if exists(save_file_name): 
        assert delete_file(save_file_name)

    def process_function(line: str) -> dict:
        item = ujson.loads(line)

        if item['star'] < keep_start or len(item['content']) < response_less_word: 
            return None

        # 数据清洗
        # 去除重复的标点符号
        prompt = remove_duplicate_punctuation(item['title'])
        response = remove_duplicate_punctuation(item['content'])
        write_dict = {
            "prompt": prompt,
            "response": response,
        }
        return write_dict

    for file_name in file_names:
        read_file = PROJECT_ROOT + file_name

        read_and_write_template(read_file, save_file_name, process_function)



In [58]:
processed_file_dir = PROJECT_ROOT + '/data/my_data'
if not exists(processed_file_dir):
    mkdir(processed_file_dir)

In [None]:
process_web_text(keep_start=5, response_less_word=15)

In [3]:
with open('/root/LLM/code/first/data/raw_data/bell_open_source/train_3.5M_CN.json') as file:
    for line in file:
        print(line)
        break
    

{"conversations": [{"from": "human", "value": "针对健身房的新手，设计一套适合他们的健身器械使用指南，包括安全应用、正确姿势等方面。"}, {"from": "assistant", "value": "健身器械使用指南\n1. 开始前，请先进行热身运动。这会帮助你的身体适应运动，并减少受伤的风险。\n2. 在使用健身器械前，确保你已经了解了其使用方法。请阅读说明书或咨询教练以获得正确的使用技巧。\n3. 谨防过度使用或过度挑战你的身体。 如果你觉得有些动作太难或太重，请添加锻炼计划，以逐步提高动作难度。\n4. 使用合适的装备。 确保你拥有合适的运动鞋和舒适的运动服。 不要在裸露的脚或短裤上进行重量训练。\n5. 在健身器械上使用安全装置。 这些通常用于保护你的身体免受不当操作造成的损伤。 例如，重量训练中，你需要使用杠铃和负重时，一定要使用卡子来防止重量滑落。\n6. 注意正确的姿势。 如果你的姿势是错误的，那么你的身体很容易被伤害到，你也可能无法获得最佳的锻炼效果。 至关重要的是，保持直立的身体，保持头部和颈部的稳定，并使用合适的重量。\n7. 保持合理的呼吸方式。 无论何时进行训练，都必须保持正常呼吸。 当你需要用力时，呼气； 当你放松时，吸气。\n8. 安全存放器械。 在使用健身器械后，你需要把它们归还给适当的位置，以便其他人可以使用它们。\n总之，健身器械的正确使用是关键之一，如果不健康和不安全，它们将无法帮助您达到您所需的健康成果。 选择适当的训练计划，并为训练提供足够的时间，以备逐渐适应新方法。 对于任何问题，请向教练咨询。"}], "id": "66182880"}

