In [1]:
import multiprocessing
import pandas as pd
from utils.model_selection import train_val_test_split

# Get and save training, val and test data

In [2]:
def get_and_save_data(path='data/raw/Google-Playstore.csv', chunksize=100000, target="Rating",
                      val_size = 0.15, test_size=0.15,
                      random_state=42, save_path_prefix='google_playstore'):
    '''
    Loads the raw data. Splits it into training, validation and test sets and incrementally saves them 
    by appending each chunk to a CSV file at the specified location.
    '''

    try:
        success = 0
        chunks = pd.read_csv(path, chunksize=chunksize)
        pool = multiprocessing.Pool(processes=multiprocessing.cpu_count())
        results = []
        for chunk in chunks:
            f = pool.apply_async(train_val_test_split, args=(chunk, val_size, test_size, random_state)) # asynchronously applying function to chunk
            results.append(f) # appending result to results

        chunks = pd.read_csv(path, chunksize=chunksize)
        first_chunk = True
        
        for f, chunk in zip(results, chunks):
            train_idx, val_idx, test_idx = f.get(timeout=120)
            
            train_chunk = chunk.loc[train_idx, :].copy() # getting output of each parallel job
            y_train = train_chunk[target].fillna(0)
            x_train = train_chunk.drop(columns=target)
            
            
            val_chunk = chunk.loc[val_idx, :].copy() # getting output of each parallel job
            y_val = val_chunk[target].fillna(0)
            x_val = val_chunk.drop(columns=target)
            
            
            test_chunk = chunk.loc[test_idx, :].copy() # getting output of each parallel job
            y_test = test_chunk[target].fillna(0)
            x_test = test_chunk.drop(columns=target)
            
            if first_chunk == True :
                x_train.to_csv(f'data/raw/x_{save_path_prefix}_train.csv', index=False)
                y_train.to_csv(f'data/raw/y_{save_path_prefix}_train.csv', index=False)
                
                x_val.to_csv(f'data/raw/x_{save_path_prefix}_val.csv', index=False)
                y_val.to_csv(f'data/raw/y_{save_path_prefix}_val.csv', index=False)
                
                x_test.to_csv(f'data/raw/x_{save_path_prefix}_test.csv', index=False)
                y_test.to_csv(f'data/raw/y_{save_path_prefix}_test.csv', index=False)
                
                first_chunk = False
            else:
                x_train.to_csv(f'data/raw/x_{save_path_prefix}_train.csv', mode="a", header=False, index=False)
                y_train.to_csv(f'data/raw/y_{save_path_prefix}_train.csv', mode="a", header=False, index=False)
                
                x_val.to_csv(f'data/raw/x_{save_path_prefix}_val.csv', mode="a", header=False, index=False)
                y_val.to_csv(f'data/raw/y_{save_path_prefix}_val.csv', mode="a", header=False, index=False)
                
                x_test.to_csv(f'data/raw/x_{save_path_prefix}_test.csv', mode="a", header=False, index=False)
                y_test.to_csv(f'data/raw/y_{save_path_prefix}_test.csv', mode="a", header=False, index=False)
                
    except:
        pool.close()
        pool.join()
        success = 0
        raise
    else:
        pool.close()
        pool.join()
        success = 1
    
    return success

In [3]:
train_result = get_and_save_data()
train_result

1