<a href="https://colab.research.google.com/github/montest/stochastic-methods-optimal-quantization/blob/pytorch_implentation_dim_1/Lloyd_with_pytorch.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [2]:
import os
import sys
import time
import torch
import itertools

import numpy as np
import pandas as pd
import torch.nn.functional as F

from tqdm import trange

np.set_printoptions(threshold=np.inf, linewidth=10_000)
torch.set_printoptions(profile="full", linewidth=10_000)

### Optimized numpy implementation

In [3]:
def lloyd_method_dim_1(N: int, M: int, nbr_iter: int, seed: int = 0):
    """
    Apply `nbr_iter` iterations of the Randomized Lloyd algorithm in order to build an optimal quantizer of size `N`
    for a Gaussian random variable. This implementation is done using numpy.

    N: number of centroids
    M: number of samples to generate
    nbr_iter: number of iterations of fixed point search
    seed: numpy seed for reproducibility

    Returns: centroids, probabilities associated to each centroid and distortion
    """
    np.random.seed(seed)  # Set seed in order to be able to reproduce the results

    # Draw M samples of gaussian variable
    xs = np.random.normal(0, 1, size=M)

    # Initialize the Voronoi Quantizer randomly and sort it
    centroids = np.random.normal(0, 1, size=N)
    centroids.sort(axis=0)

    with trange(nbr_iter, desc='Lloyd method (numpy)') as t:
        for step in t:
            # Compute the vertices that separate the centroids
            vertices = 0.5 * (centroids[:-1] + centroids[1:])

            # Find the index of the centroid that is closest to each sample
            index_closest_centroid = np.sum(xs[:, None] >= vertices[None, :], axis=1)

            # Compute the new quantization levels as the mean of the samples assigned to each level
            centroids = np.array([np.mean(xs[index_closest_centroid == i], axis=0) for i in range(N)])

    # Compute, for each sample, the distance to each centroid
    dist_centroids_points = np.linalg.norm(centroids.reshape((N, 1)) - xs.reshape(M, 1, 1), axis=2)
    # Find the index of the centroid that is closest to each sample using the previously computed distances
    index_closest_centroid = dist_centroids_points.argmin(axis=1)
    # Compute the probability of each centroid
    probabilities = np.bincount(index_closest_centroid) / float(M)
    # Compute the final distortion between the samples and the quantizer
    distortion = np.mean(dist_centroids_points[np.arange(M), index_closest_centroid] ** 2) * 0.5
    return centroids, probabilities, distortion

### PyTorch implementation

In [4]:
def lloyd_method_dim_1_pytorch(N: int, M: int, nbr_iter: int, device: str, seed: int = 0):
    """
    Apply `nbr_iter` iterations of the Randomized Lloyd algorithm in order to build an optimal quantizer of size `N`
    for a Gaussian random variable. This implementation is done using torch.

    N: number of centroids
    M: number of samples to generate
    nbr_iter: number of iterations of fixed point search
    device: device on which perform the computations: "cuda" or "cpu"
    seed: torch seed for reproducibility

    Returns: centroids, probabilities associated to each centroid and distortion
    """
    torch.manual_seed(seed=seed)  # Set seed in order to be able to reproduce the results

    with torch.no_grad():
        # Draw M samples of gaussian variable
        xs = torch.randn(M)
        # xs = torch.tensor(torch.randn(M), dtype=torch.float32)
        xs = xs.to(device)  # send samples to correct device

        # Initialize the Voronoi Quantizer randomly
        centroids = torch.randn(N)
        centroids, index = centroids.sort()
        centroids = centroids.to(device)  # send centroids to correct device

        with trange(nbr_iter, desc=f'Lloyd method (pytorch: {device})') as t:
            for step in t:
                # Compute the vertices that separate the centroids
                vertices = 0.5 * (centroids[:-1] + centroids[1:])

                # Find the index of the centroid that is closest to each sample
                index_closest_centroid = torch.sum(xs[:, None] >= vertices[None, :], dim=1).long()

                # Compute the new quantization levels as the mean of the samples assigned to each level
                centroids = torch.tensor([torch.mean(xs[index_closest_centroid == i]) for i in range(N)]).to(device)

        # Compute, for each sample, the distance to each centroid
        dist_centroids_points = torch.norm(centroids - xs.reshape(M, 1, 1), dim=1)
        # Find the index of the centroid that is closest to each sample using the previously computed distances
        index_closest_centroid = dist_centroids_points.argmin(dim=1)
        # Compute the probability of each centroid
        probabilities = torch.bincount(index_closest_centroid).to('cpu').numpy()/float(M)
        # Compute the final distortion between the samples and the quantizer
        distortion = torch.mean(dist_centroids_points[torch.arange(M), index_closest_centroid] ** 2).item() * 0.5
        return centroids.to('cpu').numpy(), probabilities, distortion

### Some useful functions for benchmarking

In [6]:
def check_existance(dict_of_values, df):
    v = df.iloc[:, 0] == df.iloc[:, 0]
    for key, value in dict_of_values.items():
        v &= (df[key] == value)
    return v.any()


def testing_method(fct_to_test, parameters_grid: dict, path_to_results: str):
    if os.path.exists(path_to_results) and os.path.getsize(path_to_results) > 0:
        df_results = pd.read_csv(path_to_results, index_col=0)
    else:
        df_results = pd.DataFrame()

    keys, values = zip(*parameters_grid.items())
    permutations_dicts = [dict(zip(keys, v)) for v in itertools.product(*values)]

    for permutations in permutations_dicts:
        dict_result = permutations.copy()
        dict_result["method_name"] = fct_to_test.__name__
        if len(df_results) > 0 and check_existance(dict_result, df_results):
            print(f"Skipping {dict_result}")
            continue

        start_time = time.time()
        fct_to_test(**permutations)
        elapsed_time = time.time() - start_time
        dict_result["elapsed_time"] = elapsed_time
        df_results = pd.concat(
            [df_results, pd.DataFrame(dict_result, index=[0])],
            ignore_index=True
        )
        df_results.to_csv(path_to_results)

### Testing methods

In [8]:
parameters_grid = {
        "N": [10, 20, 50, 100, 200, 500],
        "M": [5000, 10000, 20000, 100000],
        "nbr_iter": [10, 100, 500, 1000],
        "seed": [0, 1, 2, 3, 4]
    }
path_to_results = "results.csv"

testing_method(lloyd_method_dim_1, parameters_grid, path_to_results)

Skipping {'N': 10, 'M': 5000, 'nbr_iter': 10, 'seed': 0, 'method_name': 'lloyd_method_dim_1'}
Skipping {'N': 10, 'M': 5000, 'nbr_iter': 10, 'seed': 1, 'method_name': 'lloyd_method_dim_1'}
Skipping {'N': 10, 'M': 5000, 'nbr_iter': 10, 'seed': 2, 'method_name': 'lloyd_method_dim_1'}
Skipping {'N': 10, 'M': 5000, 'nbr_iter': 10, 'seed': 3, 'method_name': 'lloyd_method_dim_1'}
Skipping {'N': 10, 'M': 5000, 'nbr_iter': 10, 'seed': 4, 'method_name': 'lloyd_method_dim_1'}
Skipping {'N': 10, 'M': 5000, 'nbr_iter': 100, 'seed': 0, 'method_name': 'lloyd_method_dim_1'}
Skipping {'N': 10, 'M': 5000, 'nbr_iter': 100, 'seed': 1, 'method_name': 'lloyd_method_dim_1'}
Skipping {'N': 10, 'M': 5000, 'nbr_iter': 100, 'seed': 2, 'method_name': 'lloyd_method_dim_1'}
Skipping {'N': 10, 'M': 5000, 'nbr_iter': 100, 'seed': 3, 'method_name': 'lloyd_method_dim_1'}
Skipping {'N': 10, 'M': 5000, 'nbr_iter': 100, 'seed': 4, 'method_name': 'lloyd_method_dim_1'}
Skipping {'N': 10, 'M': 5000, 'nbr_iter': 500, 'seed': 

Lloyd method (numpy): 100%|██████████| 500/500 [00:03<00:00, 153.16it/s]
Lloyd method (numpy): 100%|██████████| 500/500 [00:03<00:00, 151.47it/s]
Lloyd method (numpy):  79%|███████▊  | 393/500 [00:02<00:00, 156.36it/s]


KeyboardInterrupt: 