在做NLP的深度学习任务时，一个关键的问题是如何构建输入。本文介绍如何利用有限内存进行大规模数据处理，主要包括：
    1. 建立词典
    2. 将单词转换为id
    3. 训练集验证集切分

## 方法一


参考： https://state-of-art.top/2018/11/28/%E6%96%87%E6%9C%AC%E9%A2%84%E5%A4%84%E7%90%86/


In [None]:
import os
import random
import pickle
import operator
from glob import glob
from tqdm import tqdm
from collections import Counter

import config
from Logginger import init_logger

logger = init_logger("torch", logging_path=config.LOG_PATH)


def sent_label_split(line):
    """
    句子处理成单词
    :param line: 原始行
    :return: 单词， 标签
    """
    line = line.strip('\n').split('@')
    label = line[0]
    sent = line[1].split(' ')
    return sent, label

def word_to_id(word, word2id):
    """
    单词-->ID
    :param word: 单词
    :param word2id: word2id @type: dict
    :return:
    """
    return word2id[word] if word in word2id else word2id['unk']


def bulid_vocab(vocab_size, min_freq=3, stop_word_list=None,
                is_debug=False):
    """
    建立词典
    :param vocab_size: 词典大小
    :param min_freq: 最小词频限制
    :param stop_list: 停用词 @type：file_path
    :param is_debug: 是否测试模式 @type: bool True:使用很小的数据集进行代码测试
    :return: word2id
    """
    size = 0
    count = Counter()

    with open(os.path.join(config.ROOT_DIR, config.RAW_DATA), 'r') as fr:
        logger.info('Building vocab')
        for line in tqdm(fr, desc='Build vocab'):
            words, label = sent_label_split(line)
            count.update(words)
            size += 1
            if is_debug:
                limit_train_size = 10000
                if size > limit_train_size:
                    break
    if stop_word_list:
        stop_list = {}
        with open(os.path.join(config.ROOT_DIR, config.STOP_WORD_LIST), 'r') as fr:
                for i, line in enumerate(fr):
                    word = line.strip('\n')
                    if stop_list.get(word) is None:
                        stop_list[word] = i
        count = {k: v for k, v in count.items() if k not in stop_list}
    count = sorted(count.items(), key=operator.itemgetter(1))
    # 词典
    vocab = [w[0] for w in count if w[1] >= min_freq]
    if vocab_size < len(vocab):
        vocab = vocab[:vocab_size]
    vocab = config.flag_words + vocab
    logger.info('vocab_size is %d'%len(vocab))
    # 词典到编号的映射
    word2id = {k: v for k, v in zip(vocab, range(0, len(vocab)))}
    assert word2id['<pad>'] == 0, "ValueError: '<pad>' id is not 0"
    print(word2id)
    with open(config.WORD2ID_FILE, 'wb') as fw:
        pickle.dump(word2id, fw)
    return word2id


def train_val_split(X, y, valid_size=0.3, random_state=2018, shuffle=True):
    """
    训练集验证集分割
    :param X: sentences
    :param y: labels
    :param random_state: 随机种子
    """
    logger.info('train val split')
    data = [(data_x, data_y) for data_x, data_y in zip(X, y)]
    N = len(data)
    test_size = int(N * valid_size)

    if shuffle:
        random.seed(random_state)
        random.shuffle(data)

    valid = data[:test_size]
    train = data[test_size:]
    return train, valid


def text2id(word2id, maxlen=None, valid_size=0.3, random_state=2018, shuffle=True, is_debug=False):
    """
    训练集文本转ID
    :param valid_size: 验证集大小
    """
    print(os.path.join(config.ROOT_DIR, config.TRAIN_FILE))
    if len(glob(os.path.join(config.ROOT_DIR, config.TRAIN_FILE))) > 0:
        logger.info('Text to id file existed')
        return
    logger.info('Text to id')
    sentences, labels, lengths = [], [], []
    size = 0
    with open(os.path.join(config.ROOT_DIR, config.RAW_DATA), 'r') as fr:
        for line in tqdm(fr, desc='text_to_id'):
            words, label = sent_label_split(line)
            sent = [word_to_id(word=word, word2id=word2id) for word in words]
            if maxlen:
                sent = sent[:maxlen]
            length = len(sent)
            sentences.append(sent)
            labels.append(label)
            lengths.append(length)
            size += 1
            if is_debug:
                limit_train_size = 10000
                if size > limit_train_size:
                    break

    train, valid = train_val_split(sentences, labels,
                                   valid_size=valid_size,
                                   random_state=random_state,
                                   shuffle=shuffle)
    del sentences, labels, lengths


    with open(config.TRAIN_FILE, 'w') as fw:
        for sent, label in train:
            sent = [str(s) for s in sent]
            line = "\t".join[str(label), " ".join(sent)]
            fw.write(line + '\n')
        logger.info('Writing train to file done')

    with open(config.VALID_FILE, 'w') as fw:
        for sent, label in train:
            sent = [str(s) for s in sent]
            line = "\t".join[str(label), " ".join(sent)]
            fw.write(line + '\n')
        logger.info('Writing valid to file done')


# 功能整合，提供给外部调用的函数接口
def data_helper(vocab_size, min_freq=3, stop_list=None,
                valid_size=0.3, random_state=2018, shuffle=True, is_debug=False):
    # 判断文件是否已存在
    if len(glob(os.path.join(config.ROOT_DIR, config.WORD2ID_FILE))) > 0:
        logger.info('Word to id file existed')
        with open(os.path.join(config.ROOT_DIR, config.WORD2ID_FILE), 'rb') as fr:
            word2id = pickle.load(fr)
    else:
        word2id = bulid_vocab(vocab_size=vocab_size, min_freq=min_freq, stop_word_list=stop_list,
                is_debug=is_debug)
    text2id(word2id, valid_size=valid_size, random_state=random_state, shuffle=shuffle, is_debug=is_debug)

config.py

In [None]:
# ---------PATH------------
ROOT_DIR = '/home/daizelin/pytorch/'
RAW_DATA = 'data/data_for_test.csv'
TRAIN_FILE = 'outputs/intermediate/train.tsv'
VALID_FILE = 'outputs/intermediate/valid.tsv'
LOG_PATH = 'outputs/logs'
is_debug = False
flag_words = ['<pad>', '<unk>']

Logginger.py

In [None]:
import logging
from logging import Logger
from logging.handlers import TimedRotatingFileHandler

'''
使用方式
from you_logging_filename.py import init_logger
logger = init_logger("dataset",logging_path='')
def you_function():
	logger.info()
	logger.error()

'''
'''
日志模块
1. 同时将日志打印到屏幕跟文件中
2. 默认值保留近7天日志文件
'''
def init_logger(logger_name, logging_path):
    if logger_name not in Logger.manager.loggerDict:
        logger  = logging.getLogger(logger_name)
        logger.setLevel(logging.DEBUG)
        handler = TimedRotatingFileHandler(filename=logging_path+"/all.log",when='D',backupCount = 7)
        datefmt = '%Y-%m-%d %H:%M:%S'
        format_str = '[%(asctime)s]: %(name)s %(filename)s[line:%(lineno)s] %(levelname)s  %(message)s'
        formatter = logging.Formatter(format_str,datefmt)
        handler.setFormatter(formatter)
        handler.setLevel(logging.INFO)
        logger.addHandler(handler)
        console= logging.StreamHandler()
        console.setLevel(logging.INFO)
        console.setFormatter(formatter)
        logger.addHandler(console)

        handler = TimedRotatingFileHandler(filename=logging_path+"/error.log",when='D',backupCount=7)
        datefmt = '%Y-%m-%d %H:%M:%S'
        format_str = '[%(asctime)s]: %(name)s %(filename)s[line:%(lineno)s] %(levelname)s  %(message)s'
        formatter = logging.Formatter(format_str,datefmt)
        handler.setFormatter(formatter)
        handler.setLevel(logging.ERROR)
        logger.addHandler(handler)
    logger = logging.getLogger(logger_name)
    return logger

## 方法二

In [None]:

MAX_CONTEXT_LEN = 512
BATCH_SIZE = 128 
device = torch.device("cuda:1" if torch.cuda.is_available() else "cpu")

class IDS():
    def __init__(self, fvec):
        self.fvec = fvec     
        (self.ids, self.vec) = self._ids()      # char : index        index : vector
        self.defaultvec = []             
        self.defaultvec.extend([0]*128) 

    def _ids(self):
        _IDS = {} 
        _VEC = {}
        for line in open(self.fvec): 
            _trip_line = line.strip('\n').strip(' ').split(' ')
            if len(_trip_line) == 129:   
                _char = _trip_line[0]  
                if _char not in _IDS:
                    _IDS[_char] = len(_IDS)+1  
                    _VEC[_IDS[_char]] = [ float(k) for k in _trip_line[1:]]  
        print("\t ... load ids(%s) ..." %(len(_IDS)))
        return _IDS, _VEC       # 单词： index      index ： vector

    def get_id(self, char):      # 根据单词获得对应 index
        if char in self.ids:
            return self.ids[char]
        return -1

    def get_vector_byid(self, cid):      # 根据index 获取对应词向量
        if cid in self.vec:
            return self.vec[cid]
        return self.defaultvec 

    def get_vector_bychar(self, char):      # 根据char --> index ----> vector 
        if char in self.ids:
            return self.vec[self.ids[char]]
        return self.defaultvec 

    def get_ids(self, txtlist):
        return [self.get_id(c) for c in txtlist]     # 获取文本的index 列表

    def get_vectors(self, idlist):
        _vecs = []
        for i in idlist:
            _vecs.append(self.get_vector_byid(i))     # 获取文本index列表对应的vector列表
        return _vecs

# 这个lr调整的函数，在这里不适用。调整幅度过大
def adjust_learning_rate(optimizer, _batch_num):
    lr = (1*1e-5) / (_batch_num/2000)     
    for param_group in optimizer.param_groups:
        """
        optimizer.param_groups:是长度为2的list，其中元素是2个字典， 
        optimizer.param_groups[0]:长度为6的字典，包括['amsgrad','params','lr','betas','weight_decay','eps']
        """
        param_group['lr'] = lr

class dataOperator():
        """ 类变量，可以直接用类调用，或用实例对象调用"""
    def __init__(self, f_train, f_test, _idobj):
        self.file_train = f_train 
        self.file_test = f_test    # 实例变量
        self.idobj = _idobj       # 类对象
        (__, self.train_txt, self.train_labels, self.train_mask) = self._load_txt(self.file_train, maxcnt=4000000)    # 训练样本向量
        (__, self.test_txt, self.test_labels, self.test_mask) = self._load_txt(self.file_test, maxcnt=50000)
        self.epoch_num = int(len(self.train_txt)/BATCH_SIZE) 
  
    def _load_txt(self, fname, maxcnt=100000):
        _samples = []
        _labels = []
        _masks = []   #局部变量
        if not os.path.isfile(fname):   # 处理文件是否存在
            return (False, _samples, _labels)
        print("\t ... load speakers (%s) ..." %(fname))
        cnt = 0
        for line in open(fname): 
            cnt += 1
            if cnt % 100000 == 0: 
                print("\t ... load samples(%s) ..." %(cnt))   # 加载样本的数量
            _trip_line = line.strip('\n').strip(' ').split(' ')             
            if "_label_" in _trip_line[0] and len(_trip_line) < MAX_CONTEXT_LEN - 1:
                _lab = _trip_line[0] 
                _t_ids = self.idobj.get_ids(_trip_line[1:])            # 文本对应的index列表
                _t_ids.extend([0]*(MAX_CONTEXT_LEN - len(_t_ids)))     # 长度不足512的用0补充
                _samples.append(self.idobj.get_vectors(_t_ids))        # 样本向量list
                _labels.append(int(_lab.replace('__label__', "")))
                # 计算mask, False表示不mask
                _masks.append([ False if k!=0 else True for k in _t_ids ])  #mask  长足不足0补充的位置为 True，相应网络参数不更新
            if cnt > maxcnt:     #样本量 
                break
        return (True, _samples, _labels, _masks)

    def shuffle_train_data(self):
        _index = [i for i in range(len(self.train_txt))]
        random.shuffle(_index)
        self.epoch_train_indexs = _index  #调用方法的时候才会有是这个实例变量，init是生成一个类都会有的公共实例变量
        self.batch_cnt_train = 0
        print("\t ... shuffle_train_data done ...")

    def shuffle_test_data(self):
        _index = [i for i in range(len(self.test_txt))]
        random.shuffle(_index)
        self.epoch_test_indexs = _index
        self.batch_cnt_test = 0
        print("\t ... shuffle_test_data done ...")

    def get_batch_in_trainset(self): 
        _ret = self.epoch_train_indexs[self.batch_cnt_train*BATCH_SIZE: (self.batch_cnt_train+1)*BATCH_SIZE]       #单个epoch 一个batch的数据集合   
        _t_data = [] 
        _t_labels = []
        _t_mask = []
        for _index in _ret:
            _t_data.append(self.train_txt[_index]) 
            _t_labels.append(self.train_labels[_index]) 
            _t_mask.append(self.train_mask[_index]) 
        # set batch_cnt
        self.batch_cnt_train += 1
        if self.batch_cnt_train > self.epoch_num - 2:   #最后一个batch_size
            self.batch_cnt_train = 0
        """
        torch.Tensor转list   :list =tensor.numpy().tolist()
        torch.Tensor 转numpy: ndarry=tensor.cpu().numpy()  gpu上不能直接转
        numpy 转torch.tensor:  tensor= torch.from_numpy(ndarry)
        """

        return torch.FloatTensor(_t_data), torch.LongTensor(_t_labels), torch.BoolTensor(_t_mask)   #转化为tensor      

    def get_batch_in_testset(self): 
        _ret = self.epoch_test_indexs[self.batch_cnt_test*BATCH_SIZE: (self.batch_cnt_test+1)*BATCH_SIZE]
        _t_data = []
        _t_labels = []
        _t_mask = []
        for _index in _ret:
            _t_data.append(self.test_txt[_index]) 
            _t_labels.append(self.test_labels[_index]) 
            _t_mask.append(self.test_mask[_index]) 
        # set batch_cnt
        self.batch_cnt_test += 1
        if self.batch_cnt_test > int(len(self.test_txt)/BATCH_SIZE) - 2:
            self.batch_cnt_test = 0
        return torch.FloatTensor(_t_data), torch.LongTensor(_t_labels), torch.BoolTensor(_t_mask)

    def set_train_dataset(self):
        print("\t ... set train dataset ...")
        print(torch.FloatTensor(self.train_txt).shape)    # 训练样本集格式
        print(torch.FloatTensor(self.train_labels).shape) # 标签大小
        print(torch.FloatTensor(self.train_mask).shape)   # 样本集mask大小
        # set DataLoader
        self.train_set = TensorDataset( torch.FloatTensor(self.train_txt),
                                        torch.LongTensor(self.train_labels), torch.BoolTensor(self.train_mask) )
        self.train_loader = DataLoader(dataset=self.train_set,
                              batch_size=int(BATCH_SIZE),
                              shuffle=True)

        # just for memory ?
        #del self.train_txt, self.train_labels, self.train_mask

    def set_test_dataset(self):
        print("\t ... set test dataset ...")
        """
        torch 的数据加载到模型的操作顺序是这样的：
        1. 创建一个TensorDataset 对象
        2. 创建一个DataLoader对象
        3. 循环这个DataLoader对象，将train、labels 加载到模型进行训练
            
        TensorDataset可以用来对tensor进行打包，通过每一个tensor的第一维度进行索引，因此第一维必须相同类似为dataframe，将其他tensor横向拼接，一一对应
        """
        self.test_set = TensorDataset( torch.FloatTensor(self.test_txt), torch.LongTensor(self.test_labels),
                                       torch.BoolTensor(self.test_mask) )
        print(self.test_set[:2])    #取出各列的前两行
        self.test_loader = DataLoader(dataset=self.test_set,        # 传入的数据集
                              batch_size=BATCH_SIZE,                #每个batch有多少个样本
                              shuffle=True)                         # 每个epoch 开始的时候，对数据进行重新排序
                            #num_workers   :data loading 的线程数
                            # drop_last   对最后不足batch_size 的数据扔掉还是继续正常执行

        # just for memory ?
        del self.test_txt, self.test_labels, self.test_mask

    def get_test_1batch(self):
        _di = iter(self.test_loader)          # 构造test迭代器，每次验证一个batch进行测试
        (_data, _target, _mask) = _di.next() 
        return (_data, _target, _mask)