## Notes to self:
DictListStorage takes in (string key, bytes value)
DictSetStorage takes in (bytes key, tuple value # ('hello',) example)

In [32]:
import datasketch as ds
import happybase as hb
import struct
import pickle
import base64
import time
import Hbase_thrift
import random
import pickle

In [106]:
def hbase_safe_b64_encode(data):
    return base64.b64encode(data).decode('utf8').replace('=', '')

def safely_create_table(pool, tablename, max_retries):
        needs_create_table = True
        
        retries = 0
        max_retries = 10
        
        while needs_create_table and retries < max_retries:
            try:
                with pool.connection() as c:
                    needs_create_table = not c.is_table_enabled(self._table)
            except Hbase_thrift.IOError as e:
                message = e.message.decode('utf8')
                if not message.startswith('org.apache.hadoop.hbase.TableNotFoundException'):
                    raise e
                        
            if needs_create_table:
                sleep_time = random.uniform(0.01, 2)
                print(f"Need to create table:  {self._name} ({self._table}), sleeping {sleep_time} seconds")
                time.sleep(sleep_time)
                print(f"Finished sleeping, attempting to create table ({retries}/{max_retries} retries): {self._name}")
                
                try:
                    with pool.connection() as c:
                        families = {
                            'fvalue': dict(),
                        }
                        c.create_table(self._table, families)
                        needs_create_table = False
                        print(f"Successfully create table: {self._name}")
                except BaseException as e:
                    retries += 1
                    print(f"Failed to create table: {self._name}")
                    #raise e
                
        if needs_create_table:
            raise ValueError(f"Failed to create table {self._name} with {retries} retries")

class HBaseDictListStorage(ds.storage.OrderedStorage):
    def __init__(self, config, name):
        self._name = name
        self._table = hbase_safe_b64_encode(name)
        self._pool = config['hbase_pool']
        self.buffer = None
                
    def keys(self):
        raise ValueError('Not implemented')

    def get(self, key):
        raise ValueError('Not implemented')

    def remove(self, *keys):
        raise ValueError('Not implemented')

    def remove_val(self, key, val):
        raise ValueError('Not implemented')

    def _insert_batch(self, batch, value):
        key, values = value

        name = key
        if type(name) is str:
            name = name.encode('utf8')
        
        for v in values:            
            #print(f"Inserting key: {base64.b64encode(name + v).decode('utf8')}, value: { {b'fvalue:value': v, b'fvalue:name': name} }")
            batch.put(hbase_safe_b64_encode(v), {b'fvalue:value': v, b'fvalue:name': name})
            
    def _insert(self, values):
        
        try:
            with self._pool.connection() as c:
                table = c.table(self._table)
                with table.batch(transaction=True) as b:
                    for value in values:
                        self._insert_batch(b, value)
                        
        except BaseException as e:
            print(f"Failed inserting with : {e}")
            raise e
        
    def insert(self, key, *vals, **kwargs):
        # Needs implementation
        # Should check: kwargs['buffer'], if true, buffer untill empty_buffer is called
        
        if kwargs['buffer'] and kwargs['buffer'] == True:
            if self.buffer == None:
                self.buffer = []
            
            self.buffer.append((key, vals))
        else:
            self._insert([(key, vals)])
        
        pass
        

    def size(self):
        raise ValueError('Not implemented')

    def itemcounts(self, **kwargs):
        raise ValueError('Not implemented')
        
    def has_key(self, key):
        # Needs implementation
        with self._pool.connection() as c:
            table = c.table(self._table)
            
        pass
    
    def empty_buffer():
        # Used to execute large batch
        if len(self.buffer) > 0:
            self._insert(self.buffer)

In [107]:
class HBaseDictSetStorage(ds.storage.UnorderedStorage, HBaseDictListStorage):
    '''This is a wrapper class around ``defaultdict(set)`` enabling
    it to support an API consistent with `Storage`
    '''
    def __init__(self, config, name=None):
        HBaseDictListStorage.__init__(self, config, name=name)

    def get(self, key):
        pass

    def insert(self, key, *vals, **kwargs):
        pickled_vals = [pickle.dumps(val) for val in vals]
        #print(f"In dictsetstorage, trying to insert: {key} -> {pickled_vals}")
        HBaseDictListStorage.insert(self, key, *pickled_vals, **kwargs)

In [108]:
def hbase_ordered_storage(config, name=None):
    #print(f"Overriden ordered storage ran with config: {config}")
    tp = config['type']
    if tp == 'hbase':
        return HBaseDictListStorage(config, name=name)
    else:
        return ds.storage.ordered_storage(config, name=name)


def hbase_unordered_storage(config, name=None):
    #print(f"Overriden unordered storage ran with config: {config}")
    tp = config['type']
    if tp == 'hbase':
        return HBaseDictSetStorage(config, name=name)
    else:
        return ds.storage.unordered_storage(config, name=name)

In [109]:
def override_lsh__init__(self, threshold=0.9, num_perm=128, weights=(0.5, 0.5),
                 params=None, storage_config=None, prepickle=None, hashfunc=None):
        storage_config = {'type': 'dict'} if not storage_config else storage_config
        self._buffer_size = 50000
        if threshold > 1.0 or threshold < 0.0:
            raise ValueError("threshold must be in [0.0, 1.0]")
        if num_perm < 2:
            raise ValueError("Too few permutation functions")
        if any(w < 0.0 or w > 1.0 for w in weights):
            raise ValueError("Weight must be in [0.0, 1.0]")
        if sum(weights) != 1.0:
            raise ValueError("Weights must sum to 1.0")
        self.h = num_perm
        if params is not None:
            self.b, self.r = params
            if self.b * self.r > num_perm:
                raise ValueError("The product of b and r in params is "
                        "{} * {} = {} -- it must be less than num_perm {}. "
                        "Did you forget to specify num_perm?".format(
                            self.b, self.r, self.b*self.r, num_perm))
        else:
            false_positive_weight, false_negative_weight = weights
            self.b, self.r = ds.lsh._optimal_param(threshold, num_perm,
                    false_positive_weight, false_negative_weight)

        self.prepickle = storage_config['type'] == 'redis' if prepickle is None else prepickle

        self.hashfunc = hashfunc
        if hashfunc:
            self._H = self._hashed_byteswap
        else:
            self._H = self._byteswap

        basename = storage_config.get('basename', ds.storage._random_name(11))
        self.hashtables = [
            hbase_unordered_storage(storage_config, name=b''.join([basename, b'_bucket_', struct.pack('>H', i)]))
            for i in range(self.b)]
        self.hashranges = [(i*self.r, (i+1)*self.r) for i in range(self.b)]
        self.keys = hbase_ordered_storage(storage_config, name=b''.join([basename, b'_keys']))
        
ds.lsh.MinHashLSH.__init__ = override_lsh__init__

In [110]:
pool = hb.ConnectionPool(10, host='datanode2')
lsh = ds.lsh.MinHashLSH(storage_config={'type': 'hbase', 'basename': b'test1', 'hbase_pool': pool}, prepickle=False)

Initializing table with name: b'test1_bucket_\x00\x00', safe: dGVzdDFfYnVja2V0XwAA
Initializing table with name: b'test1_bucket_\x00\x01', safe: dGVzdDFfYnVja2V0XwAB
Initializing table with name: b'test1_bucket_\x00\x02', safe: dGVzdDFfYnVja2V0XwAC
Initializing table with name: b'test1_bucket_\x00\x03', safe: dGVzdDFfYnVja2V0XwAD
Initializing table with name: b'test1_bucket_\x00\x04', safe: dGVzdDFfYnVja2V0XwAE
Initializing table with name: b'test1_keys', safe: dGVzdDFfa2V5cw


In [111]:
def create_hash(string):
    mh2 = ds.MinHash()
    for d in string:
        mh2.update(d.encode('utf8'))
    return mh2

In [112]:
test = create_hash("hello")
lsh.insert("hello", test)

Inserting key: aGVsbG8AAAAAQihtaAAAAAAodLqRAAAAAEtVgOAAAAAAIjgl+gAAAAAKOY1/AAAAAAVpaOcAAAAAvkPIMAAAAAApTAoSAAAAAEZPRTEAAAAAIueMVgAAAAAHffh9AAAAADK+bfIAAAAACwDzKgAAAAAIb7aDAAAAAAYHqjkAAAAADXauhwAAAAAM00tmAAAAACWsNN0AAAAAaV+QngAAAABIducbAAAAAAJ9qOQAAAAAIwbCtgAAAAACve+TAAAAACVxx9wAAAAABRTsqw==, value: {b'fvalue:value': b'\x00\x00\x00\x00B(mh\x00\x00\x00\x00(t\xba\x91\x00\x00\x00\x00KU\x80\xe0\x00\x00\x00\x00"8%\xfa\x00\x00\x00\x00\n9\x8d\x7f\x00\x00\x00\x00\x05ih\xe7\x00\x00\x00\x00\xbeC\xc80\x00\x00\x00\x00)L\n\x12\x00\x00\x00\x00FOE1\x00\x00\x00\x00"\xe7\x8cV\x00\x00\x00\x00\x07}\xf8}\x00\x00\x00\x002\xbem\xf2\x00\x00\x00\x00\x0b\x00\xf3*\x00\x00\x00\x00\x08o\xb6\x83\x00\x00\x00\x00\x06\x07\xaa9\x00\x00\x00\x00\rv\xae\x87\x00\x00\x00\x00\x0c\xd3Kf\x00\x00\x00\x00%\xac4\xdd\x00\x00\x00\x00i_\x90\x9e\x00\x00\x00\x00Hv\xe7\x1b\x00\x00\x00\x00\x02}\xa8\xe4\x00\x00\x00\x00#\x06\xc2\xb6\x00\x00\x00\x00\x02\xbd\xef\x93\x00\x00\x00\x00%q\xc7\xdc\x00\x00\x00\x00\x05\x14\xec\xab', b'fvalue:name':

In [9]:
base64.b64encode(b'hello').decode('utf8').replace('=', '').encode('utf8')

b'aGVsbG8'

In [10]:
with pool.connection() as c:
    try:
        print(c.is_table_enabled('dGVzdDFfYnVja2V0XwAA'))
    except Hbase_thrift.IOError as e:
        message = e.message.decode('utf8')
        if message.startswith('org.apache.hadoop.hbase.TableNotFoundException'):
            print("Table not found")

True


In [11]:
import findspark
findspark.init()
findspark.find()

'/usr/local/spark/python/pyspark'

In [12]:
from pyspark.sql import SparkSession

spark = (SparkSession
         .builder
         .master("yarn")
         .appName("python-testing")
         .config("spark.executor.instances", 16)
         .config("spark.executor.memory", "1536m")
         #.config("spark.dynamicAllocation.enabled", "true")
         #.config("spark.executor.cores", 1)
         #.config("spark.dynamicAllocation.minExecutors", 4)
         #.config("spark.dynamicAllocation.maxExecutors", 32)
         #.config("spark.shuffle.service.enabled", "true")
         #.config("spark.shuffle.service.port", 7337)
         .getOrCreate())
sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
2022-04-14 11:44:39,746 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
2022-04-14 11:44:47,215 WARN yarn.Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


In [13]:
sc.environment['PYTHONPATH'] = '/home/ubuntu/.local/lib/python3.8/site-packages'
sc.addPyFile('hbase_cv2.py')

In [14]:
windows = sc.sequenceFile("hdfs:///files/windowed")
windows.take(1)

                                                                                

[(30126923,
  'CGCATCGTGGCTGGACCTGAGTCCATCTGCCCTGGTGCCTGCATGACTGGCCCTTCTCCTTCACAGACCATGGCCCCAGGCTCCCTTGCTTTCATTTCCCAGCCCGTTATTGGGGCAGGAGAGTAGCAAGCGGGGGAGTTTTGATGAGGCGAGGA')]

In [15]:
sample = windows.sample(fraction=0.0005, withReplacement=False, seed=1).cache()
sample.count()

                                                                                

29288

In [16]:
servers = ['datanode1', 'datanode2', 'datanode3', 'datanode4']

In [17]:
def testFunction(values):    
    import happybase
    import datasketch as ds
    import hbase_cv2
    
    def create_hash(string):
        mh2 = ds.MinHash()
        for d in string:
            mh2.update(d.encode('utf8'))
        return mh2
    
    pool = hb.ConnectionPool(10, host='localhost')
    lsh = ds.lsh.MinHashLSH(storage_config={'type': 'hbase', 'basename': b'test4', 'hbase_pool': pool})
    test = create_hash("hello")
    lsh.insert("hello", test)
    
#sample.foreachPartition(testFunction)

In [19]:
if www is None:
    print("e")

NameError: name 'www' is not defined