In [1]:
import numpy as np
import pandas as pd

from os import listdir

from dask.dataframe import from_pandas
from multiprocessing import cpu_count, Pool

from config import PURE_TRAIN_PATH, PURE_VAL_PATH, ORIG_DIR

# Are the columns aligned? (so we can skip reading them in the future)

In [None]:
# train_1 = pd.read_csv(PURE_TRAIN_PATH, nrows=1)
# val_1 = pd.read_csv(PURE_VAL_PATH, nrows=1)
# all(train_sample.columns == val_sample.columns)

# Float64 seems like overkill

In [2]:
train_1 = pd.read_csv(PURE_TRAIN_PATH, skiprows=1, nrows=1, header=None)
float32_cols = {col: np.float32 for col in train_1 if train_1[col].dtype=='float64'}

# Are there any null values?

In [34]:
train = pd.read_csv(PURE_TRAIN_PATH,
                    skiprows=1,
                    engine='c',
                    delimiter=',',
                    low_memory=False,
                    dtype=float32_cols,
                   )

print(train.isnull().values.any())    

False


In [35]:
val = pd.read_csv(PURE_VAL_PATH,
                  skiprows=1,
                  engine='c',
                  delimiter=',',
                  low_memory=False,
                  dtype=float32_cols,
                 )

print(val.isnull().values.any())

False


# Let's see the improvements (if any)

In [5]:
%timeit pd.read_csv(PURE_TRAIN_PATH, skiprows=1, nrows=10, engine='c', delimiter=',', na_filter=False, low_memory=False, dtype=float32_cols)

18.9 s ± 24.4 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [6]:
%timeit pd.read_csv(PURE_TRAIN_PATH, nrows=10)

26.1 s ± 270 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [41]:
new = pd.read_csv(PURE_TRAIN_PATH,
                  skiprows=1,
                  nrows=10,
                  engine='c',
                  delimiter=',',
                  na_filter=False,
                  low_memory=False,
                  dtype=float32_cols,
                 )

old = pd.read_csv(PURE_TRAIN_PATH,
                  nrows=10,
                 )

In [42]:
ratio = new.memory_usage(deep=True).sum() / old.memory_usage(deep=True).sum()
print(f"The new way to read in the train data uses {1-ratio:.2%} less memory")

The new way to read in the train data uses 50.00% less memory


...probably due to using float32 instead of float64.

In [3]:
%timeit pd.read_csv(PURE_TRAIN_PATH, skiprows=1, engine='c', delimiter=',', na_filter=False, low_memory=False, dtype=float32_cols)

51.2 s ± 650 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [2]:
%timeit np.loadtxt(PURE_TRAIN_PATH, skiprows=1, delimiter=',', dtype=np.float32)

2min 14s ± 328 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


# Separating (data from targets)
Want to try out transformations on X (and not y), so let's separate them

In [23]:
train = pd.read_csv(PURE_TRAIN_PATH,
                    skiprows=1,
                    dtype=float32_cols,
                    delimiter=',',
                    header=None,
                    engine='c',
                    na_filter=False,
                    low_memory=False,
                   )

In [24]:
train_x, train_y = train.iloc[:,:-1], train.iloc[:,-1]
train_x.to_csv(ORIG_DIR / 'train_data.csv', header=False, index=False)
train_y.to_csv(ORIG_DIR / 'train_targets.csv', header=False, index=False)

In [25]:
val = pd.read_csv(PURE_VAL_PATH,
                  skiprows=1,
                  dtype=float32_cols,
                  delimiter=',',
                  header=None,
                  engine='c',
                  na_filter=False,
                  low_memory=False,
                 ) 

In [26]:
val_x, val_y = val.iloc[:,:-1], val.iloc[:,-1]
val_x.to_csv(ORIG_DIR / 'val_data.csv', header=False, index=False)
val_y.to_csv(ORIG_DIR / 'val_targets.csv', header=False, index=False)

# Parallelization

In [28]:
X_TRAIN_PATH = ORIG_DIR / 'train_data.csv'
X_VAL_PATH = ORIG_DIR / 'val_data.csv'
SPLIT_DIR = ORIG_DIR / 'mp_split'
NUM_SPLITS = 64

In [7]:
def enum_splits(title, X, axis=0, *, num_splits):
    return [(i, title, split) for i, split in 
            enumerate(np.array_split(X, num_splits, axis=axis))]

def save_split(i_title_split):
    i, title, split = i_title_split
    np.savetxt(SPLIT_DIR/title/f'{i:02d}.csv', split, delimiter=',', fmt='%f')
    
def read_csv_custom(path, *, asarray, **kwargs):
    X = pd.read_csv(path,
                    header=None,
                    engine='c',
                    delimiter=',',
                    na_filter=False,
                    low_memory=False,
                    dtype=np.float32,
                    **kwargs
                   )
    return np.asarray(X) if asarray else X

def _read_csv_custom_df(path):
    return read_csv_custom(path, asarray=False)

def read_csv_mp(split_subdir, *, axis, asarray=True, processes=None):
    path_list = sorted(split_subdir/csv for csv in listdir(split_subdir))
    with Pool(processes=processes) as pool:
        df_list = pool.map(_read_csv_custom_df, path_list)
    X = pd.concat(df_list, axis=axis, ignore_index=True)
    return np.asarray(X) if asarray else X

In [4]:
X_train = read_csv_custom(X_TRAIN_PATH, asarray=True)

### Split on rows (axis=0)

In [11]:
train_mp_splits = enum_splits('train', X_train, axis=0)

with Pool() as pool:
    pool.map(save_split, train_mp_splits)

In [12]:
X_train_mp = read_csv_mp(SPLIT_DIR/'train', axis=0)
X_train_mp.shape

(464, 482739)

In [13]:
np.allclose(X_train, np.asarray(X_train_mp))

True

In [14]:
%timeit read_csv_mp(SPLIT_DIR/'train')

29.1 s ± 452 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Split on cols (axis=1)

In [23]:
train_mp_splits = enum_splits('train', X_train, axis=1, num_splits=NUM_SPLITS)

with Pool() as pool:
    pool.map(save_split, train_mp_splits)

In [25]:
X_train_mp = read_csv_mp(SPLIT_DIR/'train', axis=1)
X_train_mp.shape

(464, 482739)

In [26]:
np.allclose(X_train, np.asarray(X_train_mp))

True

##### num_splits = 4

In [10]:
%timeit read_csv_mp(SPLIT_DIR/'train', axis=1)

17.6 s ± 227 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


##### num_splits = 8

In [17]:
%timeit read_csv_mp(SPLIT_DIR/'train', axis=1)

15.2 s ± 195 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


##### num_splits = 16

In [20]:
%timeit read_csv_mp(SPLIT_DIR/'train', axis=1)

14.1 s ± 83 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


##### num_splits = 32

In [22]:
%timeit read_csv_mp(SPLIT_DIR/'train', axis=1)

13.5 s ± 229 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


##### num_splits = 64

In [24]:
%timeit read_csv_mp(SPLIT_DIR/'train', axis=1)

13.3 s ± 169 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


### Let's also split the validation data

In [29]:
X_val = read_csv_custom(X_VAL_PATH)
X_val = np.asarray(X_val)
val_mp_splits = enum_splits('val', X_val, axis=1, num_splits=NUM_SPLITS)

with Pool() as pool:
    pool.map(save_split, val_mp_splits)