In [1]:
import os
import numpy as np
import scipy.sparse as sp
import pandas as pd
from glob import glob

import dask
import dask.bag as db
import joblib

from distributed import Client
client = Client()
client

0,1
Client  Scheduler: tcp://127.0.0.1:46056  Dashboard: http://127.0.0.1:8787,Cluster  Workers: 4  Cores: 4  Memory: 10.00 GB


In [2]:
folder = 'sparse_chunks'
n_features = int(1e5)
n_informative = int(1e4)

n_chunks = int(1e3)
chunk_size = int(1e4)

rng = np.random.RandomState(42)
true_coef = rng.randn(n_features)
true_coef[n_informative:] = 0


def make_chunk(n_samples, n_features, true_coef, chunk_idx, format='csr',
               density=0.0001, noise=0.01):
    rng = np.random.RandomState(chunk_idx)
    input_data = sp.rand(n_samples, n_features, format=format,
                         density=density, random_state=rng)
    noise = rng.normal(loc=0, scale=noise, size=n_samples)
    target = input_data.dot(true_coef).ravel() + noise
    return chunk_idx, input_data, (target > 0).astype(np.int32)


def save_to_disk(chunk_idx, X, y, folder='sparse_chunks'):
    os.makedirs(folder, exist_ok=True)
    filename = "sparse_chunk_{:04d}.pkl".format(chunk_idx)
    joblib.dump((X, y), os.path.join(folder, filename))
    return filename


def load_from_disk(chunk_idx, filename):
    X, y = joblib.load(filename)
    return chunk_idx, X, y

In [3]:
if not os.path.exists(folder):
    print("Generating chunks of sparse data into", folder)
    b = db.from_sequence([(chunk_size, n_features, true_coef, i)
                      for i in range(n_chunks)])
    b = b.starmap(make_chunk).starmap(save_to_disk).compute()


print("Lazy loading chunks from", folder)
b = db.from_sequence(enumerate(sorted(glob('sparse_chunks/*.pkl'))))
b = b.starmap(load_from_disk)

Lazy loading chunks from sparse_chunks


In [4]:
%time b = b.persist()

CPU times: user 28 ms, sys: 0 ns, total: 28 ms
Wall time: 27.1 ms


In [8]:
%%time
chunk_idx, X_0, y_0 = b.compute()[0]

CPU times: user 2.06 s, sys: 4.13 s, total: 6.19 s
Wall time: 7.33 s


In [7]:
X_0

<10000x100000 sparse matrix of type '<class 'numpy.float64'>'
	with 100000 stored elements in Compressed Sparse Row format>