##### 本节使用 datasets 加载数据, hugging face 官网文档:https://huggingface.co/docs/datasets/v2.18.0/en/loading
###### 1. 注意流加载的shuffle与普通datasets shuffle的不同, 流加载的shuffle 是对切片打乱后,对 前buffer_size的数据随机抽取, 普通datasets shuffle 是对全局进行打乱,因此流加载shuffle没有标准shuffle充分, 但随着切片数的提升, 流加载的shuffle也会逐渐均匀
###### 2. datasets 普通加载 会自动保存 .cache 的arrow文件

In [1]:
# 导入huggingface datasets 包
# 详情见:https://huggingface.co/docs/datasets/v2.18.0/en/loading
import datasets
from datasets import Dataset
from datasets import load_dataset,Features,Value

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
# datasets 清除缓存cache
# 默认的cache路径在 ~/.cache/huggingface/datasets, 手动清理所有缓存,也可以进入目录后清除指定的缓存
# jupyter 中使用 linux 命令,必须在前面加入!
!rm -rf ~/.cache/huggingface/datasets
# # 使用 cache 可以使得优化再次加载数据的速度, 但也暂用了大量的硬盘资源
# # 在下载数据集后，可以通过 `load_dataset()` 函数的 `download_mode` 参数来控制加载方式。默认情况下，🤗 Datasets 会重用已存在的数据集。但是如果您需要原始数据集而不应用任何处理函数，请按照以下示例重新下载文件：
# # download_mode = "reuse_cache_if_exists", 具体见: https://huggingface.co/docs/datasets/v2.18.0/en/cache
# 例如: my_dataset = load_dataset('text',data_files=files,num_proc=9,
#                           download_mode = "reuse_cache_if_exists")

In [3]:
# 定于 hugging face .cache 的目录
cache_dir = "/data/temp/julyedu_634415/.cache/huggingface/datasets"

# 定义文件相关的根目录
file_root = "/data/temp/julyedu_634415/testdatas/"
file_root

'/data/temp/julyedu_634415/testdatas/'

In [4]:
# # 创建数据
# line_num = 750000
# for i in range(100):
#     with open(f"{file_root}{i}.txt",'w',encoding='utf-8') as f:
#         for j in range(line_num):
#             f.write(f'这是第{i*line_num+j+1}行数据+++++++++++++++++++++++++++++++++++++++++')
#             f.write('\n')
#     print(f"finish: doc {i}")

In [5]:
# 遍历要加载的数据
import glob
files = glob.glob(f"{file_root}*.txt")
files

['/data/temp/julyedu_634415/testdatas/96.txt',
 '/data/temp/julyedu_634415/testdatas/72.txt',
 '/data/temp/julyedu_634415/testdatas/71.txt',
 '/data/temp/julyedu_634415/testdatas/45.txt',
 '/data/temp/julyedu_634415/testdatas/28.txt',
 '/data/temp/julyedu_634415/testdatas/3.txt',
 '/data/temp/julyedu_634415/testdatas/57.txt',
 '/data/temp/julyedu_634415/testdatas/99.txt',
 '/data/temp/julyedu_634415/testdatas/67.txt',
 '/data/temp/julyedu_634415/testdatas/50.txt',
 '/data/temp/julyedu_634415/testdatas/33.txt',
 '/data/temp/julyedu_634415/testdatas/6.txt',
 '/data/temp/julyedu_634415/testdatas/23.txt',
 '/data/temp/julyedu_634415/testdatas/59.txt',
 '/data/temp/julyedu_634415/testdatas/46.txt',
 '/data/temp/julyedu_634415/testdatas/10.txt',
 '/data/temp/julyedu_634415/testdatas/55.txt',
 '/data/temp/julyedu_634415/testdatas/21.txt',
 '/data/temp/julyedu_634415/testdatas/54.txt',
 '/data/temp/julyedu_634415/testdatas/75.txt',
 '/data/temp/julyedu_634415/testdatas/86.txt',
 '/data/temp/ju

##### 加载自己的txt数据

In [6]:
# datasets普通加载 
# 普通加载 会在 .cache 生成 arrow 文件
# 简单来说就是希望大数据的文件切分成多个小文件,文件加载可以使用多进程优化

# 假设每个文本文件只包含一列文本内容，我们要将其命名为"text", 元数据
dataset_features = Features({'text': Value('string')})
# num_proc 进程数
my_dataset = load_dataset('text',
                          data_files=
                                {"train_data":files[:80],
                                 "test_data":files[80:]},
                          cache_dir=cache_dir,
                          features=dataset_features,
                          num_proc=20)
print(my_dataset)

# 进行shuffle, 普通加载直接shuffle即可
train_data = my_dataset['train_data'].shuffle(42)

# 取出前10个元素
train_data[:10]

Resolving data files: 100%|██████████| 80/80 [00:00<00:00, 314769.53it/s]
Resolving data files: 100%|██████████| 20/20 [00:00<00:00, 160087.94it/s]


DatasetDict({
    train_data: Dataset({
        features: ['text'],
        num_rows: 60000000
    })
    test_data: Dataset({
        features: ['text'],
        num_rows: 15000000
    })
})


{'text': ['这是第24294295行数据+++++++++++++++++++++++++++++++++++++++++',
  '这是第17212493行数据+++++++++++++++++++++++++++++++++++++++++',
  '这是第24639339行数据+++++++++++++++++++++++++++++++++++++++++',
  '这是第72398566行数据+++++++++++++++++++++++++++++++++++++++++',
  '这是第21974305行数据+++++++++++++++++++++++++++++++++++++++++',
  '这是第10210943行数据+++++++++++++++++++++++++++++++++++++++++',
  '这是第28969544行数据+++++++++++++++++++++++++++++++++++++++++',
  '这是第58098725行数据+++++++++++++++++++++++++++++++++++++++++',
  '这是第47059035行数据+++++++++++++++++++++++++++++++++++++++++',
  '这是第71791689行数据+++++++++++++++++++++++++++++++++++++++++']}

In [7]:
def func_replace(item):
    item["text"] = item["text"].replace("+++++++++++++++++++","==")
    return item

# map 用法
new_train_data = train_data.map(func_replace,num_proc=20)

new_train_data[:10]

{'text': ['这是第24294295行数据====+++',
  '这是第17212493行数据====+++',
  '这是第24639339行数据====+++',
  '这是第72398566行数据====+++',
  '这是第21974305行数据====+++',
  '这是第10210943行数据====+++',
  '这是第28969544行数据====+++',
  '这是第58098725行数据====+++',
  '这是第47059035行数据====+++',
  '这是第71791689行数据====+++']}

In [8]:
# 转化为 DataFrame 
train_data.set_format("pandas")
train_data[:10]
# 可以看到已经是DataFrame了

Unnamed: 0,text
0,这是第24294295行数据++++++++++++++++++++++++++++++++...
1,这是第17212493行数据++++++++++++++++++++++++++++++++...
2,这是第24639339行数据++++++++++++++++++++++++++++++++...
3,这是第72398566行数据++++++++++++++++++++++++++++++++...
4,这是第21974305行数据++++++++++++++++++++++++++++++++...
5,这是第10210943行数据++++++++++++++++++++++++++++++++...
6,这是第28969544行数据++++++++++++++++++++++++++++++++...
7,这是第58098725行数据++++++++++++++++++++++++++++++++...
8,这是第47059035行数据++++++++++++++++++++++++++++++++...
9,这是第71791689行数据++++++++++++++++++++++++++++++++...


#### 使用 dask + pd 进行处理

In [9]:
from dask.distributed import Client, LocalCluster
import dask.dataframe as dd
 
class DaskTextLoaderWithMultiprocessing:
    def __init__(self, filepath, blocksize=1024 * 1024 * 128, n_workers=4):
        """
        使用 Dask 初始化加载器并设置多处理。
        
        :param filepath: 要读取的文件路径。
        :param blocksize: 单个块(block)读入内存时占用字节大小，默认值设定为128MB。
                          根据系统和硬件配置调整blocksize大小以获得最佳性能，
                          较小值将导致更高I/O频率但容易管理（内存使用上）；
                          较大则减少任务数量但每个任务更耗时及可能引发更高内存消耗压力。
                          
         注意：该参数仅针对文本数据有效，如CSV或JSON格式。如果输入其他格式（比如Parquet）
               DASK将自动管理最佳块划分策略而忽略此设置项。
         :param n_workers: 并行工作线程/进程数，默认为4.
                           增加此数字可并行执行更多操作，但也会增加系统资源消耗。
        """
        
        self.filepath = filepath
        self.blocksize = blocksize
        
        # 创建本地DASK集群
        # 当DaskTextLoaderWithMultiprocessing类初始化时，通过创建LocalCluster和Client，已经配置好了Dask的并行执行环境。
        # 当调用下面的 .load 方法时，dd.read_csv将利用这个并行环境。具体到该方法，它会根据文件的块大小(blocksize)将文件分割为多个部分，并
        # 且每个部分会被分配到不同的工作进程去处理，这是在后台自动发生的，并发/并行处理提高了数据加载效率。  
        cluster = LocalCluster(n_workers=n_workers, processes=True) # 明确指明使用进程而非线程，针对CPU密集型任务提升性能。
        self.client = Client(cluster)
 
    def load(self):
        # 加载txt/csv/json... 文件并返回dask DataFrame对象.
        df = dd.read_csv(self.filepath, 
                         header=None,    # 是否使用头
                         sep='\t', # csv 分隔符
                         blocksize=self.blocksize,
                         names=["text"])
         
        ## 这里可以添加任何必要预处理步骤 ##
 
        return df 
  
    def close_cluster(self):
        # 关闭client和cluster 
        self.client.close()

In a future release, Dask DataFrame will use a new implementation that
contains several improvements including a logical query planning.
The user-facing DataFrame API will remain unchanged.

The new implementation is already available and can be enabled by
installing the dask-expr library:

    $ pip install dask-expr

and turning the query planning option on:

    >>> import dask
    >>> dask.config.set({'dataframe.query-planning': True})
    >>> import dask.dataframe as dd

API documentation for the new implementation is available at
https://docs.dask.org/en/stable/dask-expr-api.html

Any feedback can be reported on the Dask issue tracker
https://github.com/dask/dask/issues 


    # via Python

    # via CLI


  import dask.dataframe as dd


In [10]:
# 切片文件 1024 * 1024 * 1 为 1M
loader = DaskTextLoaderWithMultiprocessing(f"{file_root}*.txt",
                                           blocksize= 1024 * 1024 * 2,
                                           n_workers=20)
df = loader.load()
df

Unnamed: 0_level_0,text
npartitions=2387,Unnamed: 1_level_1
,string
,...
...,...
,...
,...


In [11]:
# 替换 
# regex=True 表示使用正则
df["text"] = df["text"].str.replace(r"\++","==",regex=True)
df["text"] = df["text"].str.strip()
df["text"] = "问题: \n" + df["text"] + "\n答案: \n"
df['num_chars'] = df["text"].str.len()
df.compute()

Unnamed: 0,text,num_chars
0,问题: 这是第1行数据== 答案:,20
1,问题: 这是第2行数据== 答案:,20
2,问题: 这是第3行数据== 答案:,20
3,问题: 这是第4行数据== 答案:,20
4,问题: 这是第5行数据== 答案:,20
...,...,...
31244,问题: 这是第74999996行数据== 答案:,27
31245,问题: 这是第74999997行数据== 答案:,27
31246,问题: 这是第74999998行数据== 答案:,27
31247,问题: 这是第74999999行数据== 答案:,27


In [12]:
loader.close_cluster()