In [None]:
import torch
import numpy as np

In [None]:
N = 1_000_000
w = np.random.uniform(-3,3, size=(N,))
w = np.exp(w) / np.sum(np.exp(w))
z = list(range(N))

In [None]:
%%timeit
np.random.choice(z, 1024, p=w)

In [None]:
import random
cw = np.cumsum(w)

In [None]:
%%timeit
np.array(random.choices(z, cum_weights=cw, k=1024))

In [None]:
import h5py
import numpy as np

In [None]:
class BufferedH5Writer:

    def __init__(self, file, dataset, buffer_size) -> None:
        self.file = file
        self.dataset = dataset
        self.buffer_size = buffer_size
        self.buffer = []
        self.index = 0
        
    def _len(self):
        if len(self.buffer) == 0:
            return 0
        return sum(len(xi) for xi in self.buffer)
        
    def _write(self):
        if self._len() == 0:
            return
        h, w = self.file[self.dataset].shape
        n = self._len()
        if n > h - self.index:
            self.file[self.dataset].resize((self.index + n, w))
        data = np.concatenate((self.buffer), axis=0).reshape(n, w)
        self.file[self.dataset][self.index:self.index+n,:] = data
        self.index += n
        self.file.flush()
        self.buffer = []

    def append(self, data: np.ndarray):
        self.buffer.append(data)
        if self._len() >= self.buffer_size:
            self._write()

In [None]:
with h5py.File("test.h5", "w") as f:
    f.create_dataset("ds", (5,2), maxshape=(None, 2))
    bw = BufferedH5Writer(f, "ds", 3)
    for i in range(10):
        z = np.array([[1, 1]] * 4) * i
        bw.append(z)
    bw._write()

In [None]:
import h5py
import numpy as np

f = h5py.File("../data/kcore/skipgrams.h5", "r")

In [None]:
%%timeit

randids = np.sort(np.random.randint(0, len(f["data"]), size=(2048,)))

In [None]:
data = f["data"][:50000000]
data.shape

In [None]:
%%timeit

randids = np.sort(np.random.randint(0, 50000000, size=(2048,)))
f["data"][randids]

In [None]:
%%timeit

randids = np.sort(np.random.randint(0, 50000000, size=(2048,)))
data[randids]

In [None]:
BS = 2048
arr = np.zeros((BS, 2))
with h5py.File("../data/kcore/skipgrams.h5", "r") as f:
    l = len(f["data"])
    for i in range(0, l, BS):
        arr[:] = 

In [None]:
f = h5py.File("../data/kcore/skipgrams.h5", "r")

In [None]:
%%timeit
z = np.random.randint(0, len(f["data"]))
f["data"][z]

In [None]:
ds = H5Dataset("../data/kcore")

In [None]:
%%time
ds[1]

In [12]:
with h5py.File("../data/kcore/skipgrams.h5", "r") as f:
    print(len(f["data"]))

358451802


In [86]:
import logging
logging.basicConfig()
logger = logging.getLogger()
logger.setLevel(logging.ERROR)

In [178]:
import os
import json
import h5py
import math
import torch
import random
import logging
import numpy as np
from torch.utils.data import Dataset, Sampler, DataLoader


class constants:
    FNAME_FREQUENCIES = "frequencies.json"
    FNAME_SKIPGRAMS = "skipgrams.h5"
    FNAME_IDX2SONG = "idx2song.json"
class H5Dataset(Dataset):

    DEFAULT_BATCH_SIZE = 512
    DEFAULT_BUFFER_SIZE = 512 * 4096 * 4
    

    def __init__(self, 
                 path: str, 
                 alpha: float = 0.75, 
                 n_negatives: int = 1, 
                 buffer_size: int = DEFAULT_BUFFER_SIZE, 
                 batch_size: int = DEFAULT_BATCH_SIZE,
                 device: str = "cpu"
        ):
        self.path = path
        self.n_negatives = n_negatives
        self.buffer_size = buffer_size
        self.batch_size = batch_size
        self.device = device
        self._init_negative_distribution(path, alpha)
        self.__data = {
            "chunk_index": -1,
            "cap": buffer_size,
            "data": torch.zeros((buffer_size, 2), dtype=torch.long).to(self.device)
        }
        
    @property
    def _batch_per_chunk(self):
        return math.ceil(self.buffer_size / self.batch_size)
    
    @property
    def _n_chunks(self):
        if not hasattr(self, "__n_chunks"):
            path = os.path.join(self.path, constants.FNAME_SKIPGRAMS)
            with h5py.File(path, "r") as f:
                self.__n_chunks = math.ceil(len(f["data"]) / self.buffer_size)
        return self.__n_chunks 
    
    def __len__(self):
        if not hasattr(self, "__n_batches"):
            path = os.path.join(self.path, constants.FNAME_SKIPGRAMS)
            with h5py.File(path, "r") as f:
                self.__n_batches = math.ceil(len(f["data"]) / self.batch_size)
        return self.__n_batches
        
    def _init_negative_distribution(self, path: str, alpha: float):
        with open(os.path.join(path, constants.FNAME_FREQUENCIES), "r") as f:
            freqs = json.load(f)
            weights = np.array(list(freqs.values())) ** alpha
            weights /= np.sum(weights)
            self.cum_weights = np.cumsum(weights)
            self.ids = np.arange(len(weights), dtype=np.int64)

    def _get(self, batch_index):
        index_start = batch_index * self.batch_size
        chunk_index = index_start // self.buffer_size
        if chunk_index != self.__data["chunk_index"]:
            logging.info(f"Loading chunk {chunk_index}")
            path = os.path.join(self.path, constants.FNAME_SKIPGRAMS)
            with h5py.File(path, "r") as f:
                start_ = chunk_index * self.buffer_size 
                end_ = (chunk_index + 1) * self.buffer_size
                assert start_ < len(f["data"]), "index out of bounds"
                length = min(self.buffer_size, len(f["data"]) - start_)
                buffer = torch.empty((length, 2))
                f["data"].read_direct(buffer.numpy(), source_sel=np.s_[start_:end_])
                self.__data["data"][:length] = buffer
                self.__data["chunk_index"] = chunk_index
                self.__data["cap"] = length
        offset = index_start - chunk_index * self.buffer_size
        return self.__data["data"][min(self.__data["cap"], offset):min(self.__data["cap"], offset+self.batch_size)]    

    def _negatives(self, k):
        return torch.tensor(random.choices(self.ids, cum_weights=self.cum_weights, k=k))

    def __getitem__(self, index):
        pos = self._get(index)
        neg = self._negatives(len(pos) * self.n_negatives)
        neg = torch.stack((pos[:, 0], neg), dim=1).to(pos)
        x = torch.cat((pos, neg), dim=0)
        y = torch.tensor([1] * len(pos) + [0] * len(neg))
        return x, y
    


class RandomOrderSampler(Sampler[int]):

    def __init__(self, from_: int, to_: int, generator=None) -> None:
        assert to_ > from_
        self.from_ = from_
        self.to_ = to_
        self.n_ = to_ - from_
        self.generator = generator

    def __len__(self):
        return self.n_
    
    def __iter__(self):
        for i in torch.randperm(self.n_, generator=self.generator):
            yield self.from_ + i
    

class CustomH5Sampler(Sampler):
    """Sample indices within loaded chunk then skip to next chunk"""

    def __init__(self, data_source, generator=None) -> None:
        super().__init__(data_source)
        assert isinstance(data_source, H5Dataset)
        self.ds = data_source
        self.generator = generator

    def __len__(self):
        return len(self.ds)
    
    def __iter__(self):
        chunk_sampler = RandomOrderSampler(0, self.ds._n_chunks, self.generator)
        for chunk_index in chunk_sampler:
            chunk_index = chunk_index.item()
            start_ = math.floor(chunk_index * self.ds.buffer_size / self.ds.batch_size)
            end_ = min(start_ + self.ds._batch_per_chunk, len(self.ds))
            sampler = RandomOrderSampler(start_, end_, self.generator)
            for i in sampler:
                yield i.item()


def _collate_fn(sample):
    x, y = list(zip(*sample))
    return torch.cat(x, dim=0), torch.cat(y, dim=0)


def get_data_loader(
        path: str, 
        alpha: float = 0.75, 
        n_negatives: int = 1, 
        buffer_size: int = H5Dataset.DEFAULT_BUFFER_SIZE,
        dataset_batch_size: int = H5Dataset.DEFAULT_BATCH_SIZE,
        **loader_kwargs
    ):
    ds = H5Dataset(
        path=path,
        alpha=alpha,
        n_negatives=n_negatives,
        buffer_size=buffer_size
    )
    sampler = CustomH5Sampler(ds)
    return DataLoader(
        dataset=ds,
        sampler=sampler,
        collate_fn=_collate_fn,
        **loader_kwargs
    ), ds

In [150]:
ds = H5Dataset("../data/kcore/")

In [151]:
n = 0
for i in tqdm(range(len(ds)), total=len(ds)):
    x = ds[i]
    n += len(x)
    assert len(x) > 0
n

100%|██████████████████████████████████████████████████████████████████████████| 700102/700102 [00:14<00:00, 49558.52it/s]


358451802

In [152]:
358451802 - n

0

In [179]:
from tqdm import tqdm

In [180]:
loader, ds = get_data_loader("../data/kcore/", batch_size=8)
print(len(loader))

87513


In [182]:
n = 0
for b in tqdm(loader):
    n += len(b)
    print(b[0].shape, b[1].shape)
    asd
print(n)

  0%|                                                                                           | 0/87513 [00:00<?, ?it/s]

torch.Size([8192, 2]) torch.Size([8192])





NameError: name 'asd' is not defined

In [159]:
n - 358451802

0

In [139]:
len(set(ds.x)) - len(ds.x)

0

In [None]:
%%prun

for i, b in enumerate(loader):
    if i > 100:
        break
    pass