In [1]:
def minDist(clusters, item):
    dists = map(lambda cl: (sum((cl.center - item) ** 2) ** (1/2), cl), clusters)
    d, cl = min(dists, key=lambda x: x[0])
    return d, cl

In [2]:
import numpy as np
from minas.map_minas_support import *
np.random.seed(300)

# cria lista de meta-classes contendo etiqueta, centro e desvio padrão
classes = list(map(mkClass, range(1000)))

# a partir das classes, cria objetos <minas.Cluster>
clusters = sampleClusters(classes)

# a partir das classes, cria objetos <minas.Example>
inputStream = loopExamplesIter(classes)

In [3]:
init = time.time()
counter = 0
while time.time() - init < 1.0:
    counter += 1
    example = next(inputStream)
    minDist(clusters, example.item)
elapsed = time.time() - init
print(f'minasOnline testSamples {elapsed} seconds, consumed {counter} items, {int(counter / elapsed)} i/s')

minasOnline testSamples 1.0055515766143799 seconds, consumed 145 items, 144 i/s


In [4]:
examples = list(zip(range(200), inputStream))

In [5]:
%%timeit
counter = 0
results = []
init = time.time()
for i, example in examples:
    counter += 1
    result = minDist(clusters, example.item)
    results.append(result)
elapsed = time.time() - init
len(results)
print(f'minasOnline testSamples {elapsed} seconds, consumed {counter} items, {int(counter / elapsed)} i/s')

minasOnline testSamples 1.486374855041504 seconds, consumed 200 items, 134 i/s
minasOnline testSamples 1.365499496459961 seconds, consumed 200 items, 146 i/s
minasOnline testSamples 1.3667986392974854 seconds, consumed 200 items, 146 i/s
minasOnline testSamples 1.3588063716888428 seconds, consumed 200 items, 147 i/s
minasOnline testSamples 1.3360075950622559 seconds, consumed 200 items, 149 i/s
minasOnline testSamples 1.3422062397003174 seconds, consumed 200 items, 149 i/s
minasOnline testSamples 1.3449153900146484 seconds, consumed 200 items, 148 i/s
minasOnline testSamples 1.3675806522369385 seconds, consumed 200 items, 146 i/s
1.35 s ± 12.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [6]:
import dask.distributed
cli = dask.distributed.Client('tcp://192.168.15.14:8786', )
cli

0,1
Client  Scheduler: tcp://192.168.15.14:8786  Dashboard: http://192.168.15.14:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 1.20 GB


In [7]:
import dask
import dask.bag as db

@dask.delayed
def minDistDelayed(clusters, item):
    dists = map(lambda cl: (sum((cl[1] - item) ** 2) ** (1/2), cl[0]), clusters)
    d, cl = min(dists, key=lambda x: x[0])
    return d, cl
# init = time.time()
# counter = 0
# simpleClusters = [(id(cl), cl.center) for cl in clusters]
# simpleClusters = db.from_sequence(simpleClusters)
# cli.persist(simpleClusters)
# simpleClusters = cli.scatter(simpleClusters)
# results = []
# while time.time() - init < 1.0:
#     counter += 1
#     example = next(inputStream)
#     results.append(minDistDelayed(simpleClusters, example.item))
# dask.compute(results)
# elapsed = time.time() - init
# print(f'minasOnline testSamples {elapsed} seconds, consumed {counter} items, {int(counter / elapsed)} i/s')

In [8]:
%%timeit
counter = 0
simpleClusters = [(id(cl), cl.center) for cl in clusters]
simpleClusters = db.from_sequence(simpleClusters)
cli.persist(simpleClusters)
simpleClusters = cli.scatter(simpleClusters)
results = []
init = time.time()
for i, example in examples:
    counter += 1
    result = minDistDelayed(simpleClusters, example.item)
    results.append(result)
elapsed = time.time() - init
print(f'loop {elapsed} seconds')
init = time.time()
dask.compute(results)
elapsed = time.time() - init
print(f'minasOnline testSamples {elapsed} seconds, consumed {counter} items, {int(counter / elapsed)} i/s')

loop 0.008395671844482422 seconds
minasOnline testSamples 4.998846530914307 seconds, consumed 200 items, 40 i/s
loop 0.007643461227416992 seconds
minasOnline testSamples 4.948642253875732 seconds, consumed 200 items, 40 i/s
loop 0.008357763290405273 seconds
minasOnline testSamples 5.433340311050415 seconds, consumed 200 items, 36 i/s
loop 0.010403156280517578 seconds
minasOnline testSamples 5.19452977180481 seconds, consumed 200 items, 38 i/s
loop 0.010891199111938477 seconds
minasOnline testSamples 5.343637943267822 seconds, consumed 200 items, 37 i/s
loop 0.008081912994384766 seconds
minasOnline testSamples 5.514289379119873 seconds, consumed 200 items, 36 i/s
loop 0.010305404663085938 seconds
minasOnline testSamples 5.370033502578735 seconds, consumed 200 items, 37 i/s
loop 0.01108694076538086 seconds
minasOnline testSamples 4.7529261112213135 seconds, consumed 200 items, 42 i/s
5.35 s ± 258 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)


In [9]:
import dask
import dask.bag as db

def minDistSimple(clusters, item):
    dists = map(lambda cl: (sum((cl[1] - item) ** 2) ** (1/2), cl[0]), clusters)
    d, cl = min(dists, key=lambda x: x[0])
    return d, cl

In [10]:
init = time.time()
counter = 0
simpleClusters = [(id(cl), cl.center) for cl in clusters]
simpleClusters = db.from_sequence(simpleClusters)
cli.persist(simpleClusters)
simpleClusters = cli.scatter(simpleClusters)
futures = []
while time.time() - init < 1.0:
    counter += 1
    example = next(inputStream)
    future = cli.submit(minDistSimple, simpleClusters, example.item)
    futures.append(future)

In [11]:
results = cli.gather(futures)
elapsed = time.time() - init
print(f'minasOnline testSamples {elapsed} seconds, consumed {counter} items, {int(counter / elapsed)} i/s')

minasOnline testSamples 64.23640584945679 seconds, consumed 2191 items, 34 i/s


In [13]:
# %%timeit
# import pandas as pd
# from sklearn.datasets import fetch_covtype
# covtype = fetch_covtype()
# total = len(covtype.data)

# zipToMap = lambda x: {'item': x[0], 'label': str(x[1])}
# onePercent = int(total*0.01)
# baseMap = map(zipToMap, zip(covtype.data[:onePercent], covtype.target[:onePercent]))
# onPercentDataFrame = pd.DataFrame(baseMap)

# clusters = minasOffline(onPercentDataFrame)
# print(len(clusters))

In [14]:
counter = 0
simpleClusters = [(id(cl), cl.center) for cl in clusters]
simpleClusters = db.from_sequence(simpleClusters)
cli.persist(simpleClusters)
simpleClusters = cli.scatter(simpleClusters)
futures = []
init = time.time()
for i, example in examples:
    counter += 1
    future = cli.submit(minDistSimple, simpleClusters, example.item)
    futures.append(future)
elapsed = time.time() - init
print(f'loop submit {elapsed} seconds')
init = time.time()
results = cli.gather(futures)
elapsed = time.time() - init
print(f'minasOnline submit {elapsed} seconds, consumed {counter} items, {int(counter / elapsed)} i/s')

loop submit 0.06345081329345703 seconds
minasOnline submit 5.4314656257629395 seconds, consumed 200 items, 36 i/s


In [16]:
# counter = 0
# localClusters = [(id(cl), cl.center) for cl in clusters]
# simpleClusters = db.from_sequence(localClusters)
# cli.persist(simpleClusters)
# simpleClusters = cli.scatter(simpleClusters)
# futures = []
# init = time.time()
# for i, example in examples:
#     counter += 1
#     item = example.item
#     #
#     def d
#     for cl in simpleClusters:
#         for c, x in zip(cl[1], item):
#             s = (c - x) ** 2
#         d = s ** (1/2), cl[0]
        
#     dists = cli.map(lambda cl: (), localClusters)
#     future = cli.submit(min, dists, key=lambda x: x[0])
#     #
#     # future = cli.submit(minDistSimple, simpleClusters, example.item)
#     futures.append(future)
# elapsed = time.time() - init
# print(f'loop submit {elapsed} seconds')
# init = time.time()
# results = cli.gather(futures)
# elapsed = time.time() - init
# print(f'minasOnline submit {elapsed} seconds, consumed {counter} items, {int(counter / elapsed)} i/s')

In [17]:
from dask import delayed
@delayed
def sub(a, b):
    return a - b
@delayed
def sqr(a, b):
    return a ** b
@delayed
def summ(a, b):
    return a + b
@delayed
def extractI(x, i):
    return x[i]
#
localClusters = [(id(cl), cl.center) for cl in clusters]
dimentions = len(clusters[0].center)

In [None]:
init = time.time()
outStream = []
scatterClusters = cli.scatter(localClusters)
k = None
result = None
for i, example in examples:
    scatterItem = cli.scatter(example.item)
    dists = []
    for cl in scatterClusters:
        # c = []
#         for i in range(dimentions):
#             ci = extractI(extractI(cl, 1), i)
#             xi = extractI(scatterItem, i)
        ci = extractI(cl, 1)
        xi = scatterItem
        a = sub(ci, xi)
        b = sqr(a, 2)
        # c.append(b)
        s = delayed(sum)(b)
        d = (s ** (1/2), extractI(cl, 0))
        dists.append(d)
    result = delayed(min)(dists, key=lambda x: x[0])
    result = result.compute()
    for cl in clusters:
        if id(cl) == result[1]:
            outStream.append((result[0], cl))
print(time.time() - init, 'secods')

In [None]:
result.visualize(rankdir="LR")

In [None]:
result.compute()

In [None]:
(len(outStream), outStream[0])

In [42]:
%%time
outStream = []
scatterClusters = cli.scatter(localClusters, broadcast=True)
k = None
result = None

CPU times: user 160 ms, sys: 6.31 ms, total: 167 ms
Wall time: 243 ms


In [43]:
%%time
i, example = examples[0]
scatterItem = cli.scatter(example.item, broadcast=True)

CPU times: user 2.9 ms, sys: 3.28 ms, total: 6.17 ms
Wall time: 8.07 ms


In [44]:
%%time
@delayed
def ddd(cl, item):
    ci = cl[1]
    d = sum((ci - item) ** 2) ** (1/2)
    return (d, cl[0])
dists = []
for cl in scatterClusters:
    dists.append(ddd(cl, scatterItem))
result = delayed(min)(dists, key=lambda x: x[0])

CPU times: user 121 ms, sys: 13.4 ms, total: 135 ms
Wall time: 132 ms


In [45]:
# result.visualize(rankdir="LR")

In [46]:
%%time
result = result.compute()

CPU times: user 73.6 ms, sys: 6.72 ms, total: 80.3 ms
Wall time: 2.16 s


In [47]:
%%time
for cl in clusters:
    if id(cl) == result[1]:
        outStream.append((result[0], cl))
        break

CPU times: user 58 µs, sys: 2 µs, total: 60 µs
Wall time: 64.1 µs


In [48]:
outStream[0]

(0.00559067433484581,
 Cluster(label=247, n=1, latest=0, timestamp=1558627743898632960, lastExapleTMS=0, maxDistance=0.0, meanDistance=0.9994292921076391, sumDistance=0.9994292921076391, rolingVarianceSum=0.9994292921076391, stdDev=0.9994292921076391))

In [49]:
init = time.time()
outStream = []
scatterClusters = cli.scatter(localClusters)
k = None
result = None
for i, example in examples:
    scatterItem = cli.scatter(example.item)
    dists = []
    for cl in scatterClusters:
        dists.append(ddd(cl, scatterItem))
    result = delayed(min)(dists, key=lambda x: x[0])
    result = result.compute()
    for cl in clusters:
        if id(cl) == result[1]:
            outStream.append((result[0], cl))
print(time.time() - init, 'secods')

1032.34299325943 secods


In [None]:
init = time.time()
outStream = []
scatterClusters = cli.scatter(localClusters)
k = None
result = None
for i, example in examples:
    scatterItem = cli.scatter(example.item)
    dists = []
    for cl in scatterClusters:
        dists.append(ddd(cl, scatterItem))
    result = delayed(min)(dists, key=lambda x: x[0])
    result = result.compute()
    for cl in clusters:
        if id(cl) == result[1]:
            outStream.append((result[0], cl))
elapsed = time.time() - init
counter =  len(outStream)
print(f'minasOnline submit {elapsed} seconds, consumed {counter} items, {int(counter / elapsed)} i/s')