In [1]:
import numpy as np
from scipy.stats import gaussian_kde, gamma, beta, gmean, norm
import pprint
from bokeh.plotting import figure, show
from bokeh.io import push_notebook
from bokeh.models import ColumnDataSource, HoverTool
from ipywidgets import interact
import hail as hl
hl.plot.output_notebook()

In [None]:
import plotly.plotly as py
from plotly.offline import iplot, init_notebook_mode
import plotly.graph_objs as go
init_notebook_mode(connected=True)

In [None]:
mt = hl.utils.range_matrix_table(100000, 100, n_partitions=1)
mt = mt.annotate_entries(k = mt.col_idx).cache()

In [None]:
mt2 = mt.annotate_rows(med = hl.agg.approx_quantiles(mt.k, 0.5))
%timeit mt2.aggregate_rows(hl.agg.mean(mt2.med))

In [None]:
t = hl.utils.range_table(100000, n_partitions=1)
t = t.annotate(array = hl.range(0, 100)).cache()

In [None]:
t2 = t.annotate(med = hl.median(t.array))
%timeit t2.aggregate(hl.agg.mean(t2.med))

In [None]:
t = hl.utils.range_table(10000000, n_partitions=8)
t = t.annotate(k = hl.rand_gamma(9,.5))
data = t.aggregate(hl.agg.approx_cdf(t.k, k=50))

p = [go.Scatter(x = [*data.values, data.values[-1]], y = data.ranks, mode = 'lines', line=dict(shape='hv'))]
iplot(p)

New `hl.plot.cdf` and `hl.plot.pdf` aim to be easy to use black boxes.

In [None]:
t = hl.utils.range_table(10000000, n_partitions=8)
t = t.annotate(k = hl.rand_gamma(9,.5))

hl.plot.show(hl.plot.cdf(t.k))

In [None]:
p = hl.plot.cdf(t.k, k=50)
x_d = np.linspace(0, 15, 1000)
p.line(x=x_d, y=gamma.cdf(x_d, 9, scale=.5), color='green', line_width=2)
hl.plot.show(p)

In [None]:
(p, i) = hl.plot.pdf(t.k, interactive=True)
hl.plot.show(p, interact=i)

Rather than passing a Hail expression like `t.k` to the plotting functions, you can also perform the `hl.agg.approx_cdf` aggregation yourself, and pass the result to `hl.plot.{cdf, pdf, histogram}`.

In [None]:
data = t.aggregate(hl.agg.approx_cdf(t.k, k=350))
print(len(data.values))
(p, i) = hl.plot.pdf(data, interactive=True)
x_d = np.linspace(data.values[0], data.values[-1], 1000)
p.line(x=x_d, y=gamma.pdf(x_d, 9, scale=.5), color='green', line_width=2)
hl.plot.show(p, interact=i)

The PDF is smoothing using a variable bandwidth kernel density estimator. For comparison, here is a fixed bandwidth estimator.

In [None]:
from scipy.stats import gaussian_kde
values = np.array(data.values)
weights = np.diff(data.ranks)
kde = gaussian_kde(values, .05, weights)
x_d = np.linspace(values[0], values[-1], 1000)
p = figure()
p.line(x=x_d, y=gamma.pdf(x_d, 9, scale=.5), color='green', line_width=2)
l = p.line(x_d, kde(x_d), line_width=2, line_color='black')

handle = show(p, notebook_handle=True)

def update(smoothing=.05):
    kde = gaussian_kde(values, smoothing, weights)
    l.data_source.data = {'x': x_d, 'y': kde(x_d)}
    push_notebook(handle)
    
interact(update, smoothing=(.001, .2, .0005))

Finally, without any smoothing...

In [None]:
data = t.aggregate(hl.agg.approx_cdf(t.k, k=350))
p = figure()
hist, edges = np.histogram(data.values, data.values, weights=np.diff(data.ranks), density=True)
p.quad(bottom=0, top=hist, left=edges[:-1], right=edges[1:])
x_d = np.linspace(data.values[0], data.values[-1], 1000)
p.line(x=x_d, y=gamma.pdf(x_d, 9, scale=.5), color='green', line_width=2)
show(p)

In [None]:
hl.plot.show(hl.plot.histogram(t.k))

In [None]:
(p, i) = hl.plot.histogram(data, interactive=True)
hl.plot.show(p, interact=i)

A demonstration of the improvement over sampling:

In [None]:
n = 100000
t = hl.utils.range_table(n, n_partitions=8)
t = t.annotate(k = hl.rand_gamma(9,.5), s = hl.rand_unif(0, 1))
t = t.key_by(t.s)

samp_size = 2000
sample = t.aggregate(hl.agg.take(t.k, samp_size))

weights = np.full(samp_size, n / samp_size)
values = np.array(sample)
slope = 1 / 15

smoothing = .5

def f(x, prev, smoothing=smoothing):
    inv_scale = (np.sqrt(n * slope) / smoothing) * np.sqrt(prev / weights)
    diff = x[:, np.newaxis] - values
    grid = (3 / (4 * n)) * weights * np.maximum(0, inv_scale - np.power(diff, 2) * np.power(inv_scale, 3))
    return np.sum(grid, axis=1)

round1 = f(values, np.full(len(values), slope))
x_d = np.linspace(0, 15, 1000)
final = f(x_d, round1)

p = figure()
l = p.line(x_d, final, line_width=2, line_color='black')

handle = show(p, notebook_handle=True)

def update(smoothing=smoothing):
    final = f(x_d, round1, smoothing)
    l.data_source.data = {'x': x_d, 'y': final}
    push_notebook(handle)
    
interact(update, smoothing=(.02, .8, .005))

This function computes the Kolmogorov-Smirnov divergence and the total variation distance between the aproximated CDF and the true one, assuming a uniform distribution.

In [None]:
def compute_errors(cdf):
    values = np.array(cdf['values'])
    ranks = np.array(cdf['ranks'])
    n = ranks[-1]
    grid = np.linspace(0, n, 500, endpoint=False).astype('int')[1:]
    if grid[-1] == n:
        grid[-1] = n - 1
    indices = np.searchsorted(ranks, grid)
    
    errors = np.abs(values[indices] - grid)
    return (np.mean(errors) / n, np.max(errors) / n)

In [None]:
t = hl.utils.range_table(10 ** 6, n_partitions=8)
t = t.annotate(k = hl.rand_unif(0, 1))
t = t.key_by(t.k).cache()

In [None]:
cdf = t.aggregate(hl.agg.approx_cdf(t.idx, k=350))

(avg, max) = compute_errors(cdf)
print(f"avg error: {avg}")
print(f"max error: {max}")
print(f"sample size: {len(cdf.values)}")
print(f"space(MB): {len(cdf.values) * 4 / (2 ** 20)}")

In [None]:
samp_size = 2000
sample = t.aggregate(hl.agg.take(t.idx, samp_size))
cdf = {'values': np.sort(sample), 'ranks': np.linspace(0, 1000000, samp_size)}
(avg, max) = compute_errors(cdf)
print(f"avg error: {avg}")
print(f"max error: {max}")
print(f"sample size: {samp_size}")

In [None]:
from bokeh.models import HoverTool
p = figure(
    y_axis_label='Avg. Error',
    y_axis_type='log',
    x_axis_label='Space(MB)',
    x_axis_type='log')
kll_sizes = [10, 31, 100, 316, 1000, 3160, 10000]
reps = 5
cdfs = [[t.aggregate(hl.agg.approx_cdf(t.idx, size)) for i in range(reps)] for size in kll_sizes]
kll_space = [len(cdf[0].values) * 4 / (2 ** 20) for cdf in cdfs]
kll_avg_err = [np.mean([compute_errors(cdf[i])[0] for i in range(reps)]) for cdf in cdfs]
p.circle(
    x = kll_space,
    y = kll_avg_err,
    legend = 'KLL',
    color= 'red',
    size = 7)
p.line(x = kll_space, y = kll_avg_err, color = 'red')
mrl_space =   [.00044, .00085, .002, .015,  .035,  .222]
mrl_avg_err = [.0148,  .01,    .004, .0005, .00026, .000027]
p.circle(
    x = mrl_space,
    y = mrl_avg_err,
    legend = 'MRL',
    color='green',
    size = 7)
p.line(x = mrl_space, y = mrl_avg_err, color = 'green')
p.add_tools(HoverTool(tooltips=[("size", "$x"), ("err", "$y")], mode='vline'))
show(p)

In [None]:
p = figure(
    y_axis_label='sample size',
    y_axis_type='log',
    x_axis_label='1/error',
    x_axis_type='log')
sample_sizes = [100, 316, 1000, 3160, 10000, 31600, 100000]
p.circle(
    x = [1 / compute_errors({'values': np.sort(t.aggregate(hl.agg.take(t.idx, size))), 'ranks': np.linspace(0, 1000000, size)})[1] for size in sample_sizes],
    y = sample_sizes,
    legend = 'sampling',
    size = 10)
kll_sizes = [10, 31, 100, 316, 1000, 3160, 10000]
cdfs = [t.aggregate(hl.agg.approx_cdf(t.idx, size)) for size in kll_sizes]
p.circle(
    x = [1 / compute_errors(cdf)[1] for cdf in cdfs],
    y = [len(cdf.values) for cdf in cdfs],
    legend = 'KLL',
    color='green',
    size = 10)
show(p)

In [None]:
t = t = hl.utils.range_table(1000000, n_partitions=8)
t = t.annotate(k = hl.rand_unif(0, 1))
%timeit t.aggregate(hl.agg.approx_cdf(t.k))

In [None]:
t = t = hl.utils.range_table(1000000, n_partitions=8)
t = t.annotate(k = hl.rand_unif(0, 1))
%timeit t.aggregate(hl.agg.sum(t.k))

In [None]:
n = 1500000
cn = [31504, 5691, 1899, 639, 217, 71, 28, 7, 3, 1]
k = 8
d = .0001
c1 = np.sum([cn[l] * (2 ** (l - 1)) for l in range(len(cn))])
c2 = np.sum([cn[l] * np.power((2 ** l) - (2 ** k), 2) for l in range(k + 1, len(cn))])
c = c1 + c2
b = (2 ** k) / 3
e = b + np.sqrt(b * b - 2 * c / np.log(d))
np.log(1 / d) * e / n

In [None]:
e = .0009479
np.exp((-1 / 2) * np.power(e * n, 2) / (c +  e * n * b ))

In [2]:
def compare(x1, y1, x2, y2):
    return x1*y2 - x2*y1

def f(min_x, max_x, x, y, e):
    new_y = np.full_like(x, 0)
    keep = np.full_like(x, False, dtype=np.bool_)
    
    e = e
    fx = min_x
    fy = 0
    li = 0
    ui = 0
    ldx = x[0] - min_x
    udx = ldx
    ldy = y[1] - e
    udy = y[0] + e
    j = 1
    # loop vars: li, ui, j, fx, fy, ldx, udx, ldy, udy
    while ui < len(x) and li < len(x):
        if j == len(x):
            ub = 1
            lb = 1
            xj = max_x
        else:     
            ub = y[j] + e
            lb = y[j+1] - e
            xj = x[j]
        dx = xj - fx
        judy = ub - fy
        jldy = lb - fy
        if compare(ldx, ldy, dx, judy) < 0:
            fx = x[li]
            fy = y[li+1] - e
            new_y[li] = fy
            keep[li] = True
            j = li + 1
            if j >= len(x):
                break
            li = j
            ldx = x[j] - fx
            ldy = y[j+1] - e - fy
            ui = j
            udx = x[j] - fx
            udy = y[j] + e - fy
            j += 1
            continue
        elif compare(udx, udy, dx, jldy) > 0:
            fx = x[ui]
            fy = y[ui] + e
            new_y[ui] = fy
            keep[ui] = True
            j = ui + 1
            if j >= len(x):
                break
            li = j
            ldx = x[j] - fx
            ldy = y[j+1] - e - fy
            ui = j
            udx = x[j] - fx
            udy = y[j] + e - fy
            j += 1
            continue
        if j >= len(x):
            break
        if compare(udx, udy, dx, judy) < 0:
            ui = j
            udx = x[j] - fx
            udy = y[j] + e - fy
        if compare(ldx, ldy, dx, jldy) > 0:
            li = j
            ldx = x[j] - fx
            ldy = y[j+1] - e - fy
        j += 1
    return new_y, keep

In [None]:
t = hl.utils.range_table(100000, n_partitions=8)
t = t.annotate(k = hl.rand_gamma(9,.5))
cdf = t.aggregate(hl.agg.approx_cdf(t.k, k=50))
y = np.array(cdf.ranks[1:-1]) / cdf.ranks[-1]
x = np.array(cdf.values[1:-1])
min_x = cdf.values[0]
max_x = cdf.values[-1]
min_e = np.max(y[1:] - y[:-1]) / 2.0

p = figure()
c1 = p.circle(x=x, y=y[:-1]+min_e)
c2 = p.circle(x=x, y=y[1:]-min_e, color='green')
new_y, keep = f(min_x, max_x, x, y, min_e)
c3 = p.circle(x=x[keep], y=new_y[keep], color='red')
l = p.line(x=[cdf.values[0], *x[keep], cdf.values[-1]], y=[0, *new_y[keep], 1], line_width=2, color='black')

p.add_tools(HoverTool(tooltips=[("index", "$index"), ("(x, y)", "($x, $y)")]))

def update(smoothing=min_e):
    new_y, keep = f(min_x, max_x, x, y, smoothing)
    new_data = {'x': [cdf.values[0], *x[keep], cdf.values[-1]], 'y': [0, *new_y[keep], 1]}
    l.data_source.data = new_data
    c1.data_source.data = {'x': x, 'y': y[:-1]+smoothing}
    c2.data_source.data = {'x': x, 'y': y[1:]-smoothing}
    c3.data_source.data = {'x': x[keep], 'y': new_y[keep]}
    push_notebook(handle)

handle = show(p, notebook_handle=True)
interact(update, smoothing=(min_e, .05, .00001)), min_e

In [3]:
t = hl.utils.range_table(1000000, n_partitions=8)
t = t.annotate(k = hl.rand_gamma(9,.5))
cdf = t.aggregate(hl.agg.approx_cdf(t.k, k=350))
y = np.array(cdf.ranks[1:-1]) / cdf.ranks[-1]
x = np.array(cdf.values[1:-1])
min_x = cdf.values[0]
max_x = cdf.values[-1]
min_e = np.max(y[1:] - y[:-1]) / 2.0

p = figure(y_range=(0, .3))
new_y, keep = f(min_x, max_x, x, y, min_e)
slopes = np.diff([0, *new_y[keep], 1]) / np.diff([cdf.values[0], *x[keep], cdf.values[-1]])
q = p.quad(left=[cdf.values[0], *x[keep]], right=[*x[keep], cdf.values[-1]], bottom=0, top=slopes)
x_d = np.linspace(0, 15, 1000)
p.line(x=x_d, y=gamma.pdf(x_d, 9, scale=.5), color='green', line_width=2)

def update(smoothing=.00085):
    new_y, keep = f(min_x, max_x, x, y, smoothing)
    slopes = np.diff([0, *new_y[keep], 1]) / np.diff([cdf.values[0], *x[keep], cdf.values[-1]])
    new_data = {'left': [cdf.values[0], *x[keep]], 'right': [*x[keep], cdf.values[-1]], 'bottom': np.full(len(slopes), 0), 'top': slopes}
    q.data_source.data = new_data
    push_notebook(handle)

handle = show(p, notebook_handle=True)
interact(update, smoothing=(min_e, .004, .00001))

Initializing Spark and Hail with default parameters...
Running on Apache Spark version 2.2.0
SparkUI available at http://10.1.1.174:4040
Welcome to
     __  __     <>__
    / /_/ /__  __/ /
   / __  / _ `/ / /
  /_/ /_/\_,_/_/_/   version 0.2.12-7d0a46b5079b
LOGGING: writing to /Users/pschulz/hail/hail/hail-20190412-0905-0.2.12-7d0a46b5079b.log


interactive(children=(FloatSlider(value=0.00085, description='smoothing', max=0.004, min=0.0005120000000000124…

<function __main__.update(smoothing=0.00085)>

In [None]:
compute_errors(cdf)

In [4]:
cdf._compaction_counts

[494763, 7030, 2350, 782, 275, 86, 38, 14, 5, 3, 0]