Elasticsarch example

b-Bit Minwise Hashing

http://research.microsoft.com/pubs/120078/wfc0398-lips.pdf


Initialize

In [1]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import scan as Scan
from elasticsearch.helpers import bulk as Bulk
from elasticsearch_dsl import Search, Q

import numpy as np

from base64 import standard_b64decode as b64decode
import BitVector
import random

import matplotlib.pyplot as plt
from bokeh.plotting import *
import toyplot

Connect

In [2]:
es = Elasticsearch([{'host': 'elasticsearch'}])

Delete old if it exists

In [3]:
if es.indices.exists(index='my_index'):
    res = es.indices.delete(index='my_index')
    print(" response: '%s'" % (res))

 response: '{'acknowledged': True}'
 response: '{'acknowledged': True}'


Create and show new index with minhash:

In [4]:
my_index_settings_body = '''
{
  "index":{
    "analysis":{
      "analyzer":{
        "minhash_analyzer":{
          "type":"custom",
          "tokenizer":"standard",
          "filter":["minhash"]
        }
      }
    }
  }
}
'''
res = es.indices.create(index='my_index', body=my_index_settings_body)
print(" response: '%s'" % (res))
res = es.indices.get(index='my_index')
print(" response: '%s'" % (res))

 response: '{'acknowledged': True}'
 response: '{'my_index': {'mappings': {}, 'aliases': {}, 'settings': {'index': {'creation_date': '1446392764189', 'number_of_shards': '5', 'uuid': 'IZL-D8UFRr6SqCDy2X5zyg', 'analysis': {'analyzer': {'minhash_analyzer': {'tokenizer': 'standard', 'type': 'custom', 'filter': ['minhash']}}}, 'number_of_replicas': '1', 'version': {'created': '1070399'}}}, 'warmers': {}}}'
 response: '{'acknowledged': True}'
 response: '{'my_index': {'mappings': {}, 'aliases': {}, 'settings': {'index': {'creation_date': '1446392764189', 'number_of_shards': '5', 'uuid': 'IZL-D8UFRr6SqCDy2X5zyg', 'analysis': {'analyzer': {'minhash_analyzer': {'tokenizer': 'standard', 'type': 'custom', 'filter': ['minhash']}}}, 'number_of_replicas': '1', 'version': {'created': '1070399'}}}, 'warmers': {}}}'


Update and show mapping:

In [5]:
my_type_mapping_body = '''
{
  "my_type" : {
    "properties" : {
      "message" : {
        "type" : "string",
        "copy_to" : [
          "minhash_value"
        ]
      },
      "bits" : {
        "type" : "string",
        "store" : true
      },
      "minhash_value" : {
        "type" : "minhash",
        "minhash_analyzer" : "minhash_analyzer",
        "copy_bits_to" : "bits"
      }
    }
  }
}
'''
res = es.indices.put_mapping(index='my_index', doc_type='my_type', body=my_type_mapping_body)
print(" response: '%s'" % (res))
res = es.indices.get_mapping(index='my_index', doc_type='my_type')
print(" response: '%s'" % (res))

 response: '{'acknowledged': True}'
 response: '{'my_index': {'mappings': {'my_type': {'properties': {'minhash_value': {'minhash_analyzer': 'minhash_analyzer', 'copy_bits_to': 'bits', 'type': 'minhash'}, 'bits': {'store': True, 'type': 'string'}, 'message': {'copy_to': ['minhash_value'], 'type': 'string'}}}}}}'
 response: '{'acknowledged': True}'
 response: '{'my_index': {'mappings': {'my_type': {'properties': {'minhash_value': {'minhash_analyzer': 'minhash_analyzer', 'copy_bits_to': 'bits', 'type': 'minhash'}, 'bits': {'store': True, 'type': 'string'}, 'message': {'copy_to': ['minhash_value'], 'type': 'string'}}}}}}'


Add data to index

In [6]:
def batch1(start, count):
    k = {
         '_index':'my_index',
         '_type':'my_type',
         '_id':0,
         '_source':{}
    }
    for a in range(count):
        k['_id'] = start + a
        k['_source']['message'] = "Prefix something %f suffix other" % (random.random()*1000)
        yield k

def batch2(start, count):
    k = {
         '_index':'my_index',
         '_type':'my_type',
         '_id':0,
         '_source':{}
    }
    for a in range(count):
        k['_id'] = start + a
        k['_source']['message'] = "ERROR: Why we get this, now %d %f" % (int(a/5), random.random()*1000)
        yield k

def batch3(start, count):
    k = {
         '_index':'my_index',
         '_type':'my_type',
         '_id':0,
         '_source':{}
    }
    for a in range(count):
        k['_id'] = start + a
        k['_source']['message'] = 'DEBUG: This message is entirely not for any use in any way.'
        yield k

Bulk(es, batch1(count=10000, start=0))
Bulk(es, batch2(count=10000, start=10000))
Bulk(es, batch3(count=10000, start=20000))

(10000, [])

(10000, [])

construct search using dsl, and scan

In [7]:
s = Search(using=es, index="my_index") \
.query("term", _type="my_type") \
.fields(['message', 'minhash_value'])
#print(s.to_dict())
response = Scan(es, query=s.to_dict())


Initialize structures

In [None]:
maxlen = 128
maxid = 30000
cm = int((maxid-1)/10000)+1
bm = 2
bfreq = np.zeros([maxlen, bm])
cnt = np.zeros([maxlen, bm, cm])
resp_limit = None

Collect results

In [None]:
for doc in response:
    if resp_limit is not None:
        if resp_limit < 0:
            break
        else:
            resp_limit -= 1
    #print(doc)
    id = int(doc['_id'])
    t = int((id-1)/10000)
    for mhv in doc['fields']['minhash_value']:
        bits = BitVector.BitVector(rawbytes = b64decode(mhv))
        i = 0
        for b in bits:
            bfreq[i,b] += 1
            cnt[i,b,t] += 1
            i += 1

Maybe print result fully

In [None]:
if False:
    for i in range(maxlen):
        for b in range(bm):
            v = bfreq[i,b]
            print("%d %d %d" % (i, b, v))
            if v == 0:
                continue
            for t in range(cm):
                c = cnt[i,b,t]
                print("  %d %d %f" % (t, c, c / v))

print(bfreq.shape)

Print binary distribution 

In [None]:
print(bfreq)

Do a plot

In [None]:
output_notebook()

x = np.transpose(np.arange(maxlen))
y0 = bfreq[x,0]
y1 = bfreq[x,1]
p = figure()
p.circle(x, y0, legend="0")
p.circle(x, y1, legend="1", fill_color="red")
# show the results
show(p)

In [None]:
r = np.transpose(np.arange(maxlen))
x = bfreq[r,0]
y0 = cnt[r,0,0]/bfreq[r,0]
y1 = cnt[r,0,1]/bfreq[r,0]
y2 = cnt[r,0,2]/bfreq[r,0]
p = figure()
p.circle(x, y0, legend="c0-0")
p.circle(x, y1, legend="c0-1", fill_color="red")
p.circle(x, y2, legend="c0-2", fill_color="cyan")
# show the results
show(p)

Try toyplot too, somehow x is not what it should be.

In [None]:
canvas = toyplot.Canvas(width=640, height=480)
axes = canvas.axes()
x = bfreq[r,0]
mark0 = axes.scatterplot(x,y0)
mark1 = axes.scatterplot(x,y1)
mark2 = axes.scatterplot(x,y2)

Try AdaBoost
http://scikit-learn.org/stable/auto_examples/ensemble/plot_adaboost_hastie_10_2.html
First train

In [None]:
from sklearn.tree import DecisionTreeClassifier
from sklearn.metrics import zero_one_loss
from sklearn.ensemble import AdaBoostClassifier

n_estimators = 400
train_cut = 2000

ada_real = AdaBoostClassifier(n_estimators=n_estimators)

xm = train_cut*cm
X_train = np.zeros([xm, maxlen], dtype=np.byte)
y_train = np.zeros([xm])

response = Scan(es, query=s.to_dict())
for doc in response:
    id = int(doc['_id'])
    if (id%10000) >= train_cut:
        continue
    t = int(id/10000)
    for mhv in doc['fields']['minhash_value']:
        bits = BitVector.BitVector(rawbytes = b64decode(mhv))
        X_train[id%10000+train_cut*t] = bits
        y_train[id%10000+train_cut*t] = t
        
ada_real.fit(X_train, y_train)

And then test it

In [None]:
ada_real_err = np.zeros((n_estimators,))

xm = maxid - train_cut*cm
X_test = np.zeros([xm, maxlen], dtype=np.byte)
y_test = np.zeros([xm])

response = Scan(es, query=s.to_dict())
for doc in response:
    id = int(doc['_id'])
    if (id%10000) < train_cut:
        continue
    t = int(id/10000)
    for mhv in doc['fields']['minhash_value']:
        bits = BitVector.BitVector(rawbytes = b64decode(mhv))
        X_test[id-train_cut*(t+1)] = bits
        y_test[id-train_cut*(t+1)] = t

for i, y_pred in enumerate(ada_real.staged_predict(X_test)):
    ada_real_err[i] = zero_one_loss(y_pred, y_test)

print(ada_real_err)

Test result, seems prediction is okay.

In [None]:
p = figure()
pred = ada_real.predict(X_test)
x=np.arange(pred.shape[0])
p.circle(x,pred)
show(p)

Try to use AdaBoost for data as 64-bit vlues 

In [None]:
ada_real2 = AdaBoostClassifier(n_estimators=n_estimators)

xm = train_cut*cm
xlen = int(maxlen/64)
X_train = np.zeros([xm,xlen], dtype=np.uint64)
y_train = np.zeros([xm])
x_a = np.zeros([xlen], dtype=np.uint64)

response = Scan(es, query=s.to_dict())
for doc in response:
    id = int(doc['_id'])
    if (id%10000) >= train_cut:
        continue
    t = int(id/10000)
    for mhv in doc['fields']['minhash_value']:
        X_train[id%10000+train_cut*t] = struct.unpack('QQ', b64decode(mhv))
        y_train[id%10000+train_cut*t] = t
        
ada_real2.fit(X_train, y_train)

In [None]:
ada_real_err2 = np.zeros((n_estimators,))

xm = maxid - train_cut*cm
X_test = np.zeros([xm,xlen])
y_test = np.zeros([xm])

response = Scan(es, query=s.to_dict())
for doc in response:
    id = int(doc['_id'])
    if (id%10000) < train_cut:
        continue
    t = int(id/10000)
    for mhv in doc['fields']['minhash_value']:
        X_test[id-train_cut*(t+1)] = struct.unpack('QQ', b64decode(mhv))
        y_test[id-train_cut*(t+1)] = t

for i, y_pred in enumerate(ada_real2.staged_predict(X_test)):
    ada_real_err2[i] = zero_one_loss(y_pred, y_test)

print(ada_real_err2)

Seems not to be working

Try something else

In [None]:
X_full = np.zeros([maxid, maxlen], dtype=np.byte)
X_popc = np.zeros([maxlen], dtype=np.uint32)

response = Scan(es, query=s.to_dict())
for doc in response:
    id = int(doc['_id'])
    for mhv in doc['fields']['minhash_value']:
        rawbytes = b64decode(mhv)
        bits = BitVector.BitVector(rawbytes = rawbytes)
        parts = struct.unpack('QQ', rawbytes)
        popc = gmpy2.popcount(gmpy2.mpz(parts[0])) + gmpy2.popcount(gmpy2.mpz(parts[1]))
        X_full[id] = bits
        X_popc[popc] += 1


In [None]:
print(X_full[0])
print(X_popc)

In [None]:
setup = """
import numpy as np
import gmpy2
from gmpy2 import mpz

POPCOUNT_TABLE16 = np.zeros(2**16, dtype=int) #has to be an array
POPCOUNT_TABLE16b = np.zeros(2**16, dtype=np.byte) #has to be an array

for index in range(len(POPCOUNT_TABLE16)):
    POPCOUNT_TABLE16[index] = (index & 1) + POPCOUNT_TABLE16[index >> 1]
    POPCOUNT_TABLE16b[index] = (index & 1) + POPCOUNT_TABLE16b[index >> 1]

def popcount32_table16(v):
    return (POPCOUNT_TABLE16[ v        & 0xffff] +
            POPCOUNT_TABLE16[(v >> 16) & 0xffff])

def popcount32_table16b(v):
    return (POPCOUNT_TABLE16b[ v        & 0xffff] +
            POPCOUNT_TABLE16b[(v >> 16) & 0xffff])

def count1s(v):
    return popcount32_table16(v).sum()

def count1sb(v):
    return popcount32_table16b(v).sum()

def gmpy2count1s(v):
    return sum(gmpy2.popcount(mpz(int(a))) for a in v)

v1 = np.arange(1000)*1234567                       #numpy array
"""
from timeit import timeit

In [None]:
print(timeit("count1s(v1)", setup=setup))

In [None]:
print(timeit("count1sb(v1)", setup=setup))

In [None]:
#print(timeit("gmpy2count1s(v1)", setup=setup))

In [None]:
from sklearn.cluster import DBSCAN
from sklearn.preprocessing import StandardScaler

X_full_std = StandardScaler().fit_transform(X_full)
db = DBSCAN(eps=0.3, min_samples=3).fit(X_full)