In [74]:
import random
from collections import MutableMapping
from random import randrange

In [82]:
def random_seq(bases = "ATCG", k = 21):
    return "".join(random.choices(bases, k = k))

'''consider a DNA sequence with fixed length 21:
there are at most: pow(4, 21) = 4^21 = 4398046511104 possible sequences.
'''


In [183]:
# MyProbHashMap class using MAD (multiply-add-and-divide, MAD) as compression 
# function and linear probing collision-handling scheme
class MyProbeHashMap(MutableMapping):
    class _Item:
        __slots__ = '_key', '_value'
        def __init__( self, k, v = None ):
            self._key = k
            self._value = v

        def __eq__( self, other ):
            return self._key == other._key

        def __ne__( self, other ):
            return not( self == other )

        def __lt__( self, other ):
            return self._key < other._key

        def __ge__( self, other ):
            return self._key >= other._key

        def __str__( self ):
            return "<" + str( self._key ) + "," + str( self._value ) + ">"

        def key( self ):
            return self._key

        def value( self ):
            return self._value    


    _AVAIL = object()
    _MIN_CAP = 11               # minimal capacity of the Map
    
    def __init__( self, cap = 11, p = 109345121, probing = 'linear' ):
        cap = max(MyProbeHashMap._MIN_CAP, cap)  # minimal capacity of Map
        # is this prime p big enough for k = 21?
        self._T = cap * [None]               # bucket array with cap entries
        self._n = 0                          # number of elements in array
        self._prime = p                      # prime number for MAD compression
        # a: scale
        self._scale = 1 + randrange( p - 1 ) # scale(also called: a) in [1, p-1]
        # seems we don't need this loop as p is prime
        trouve = False
        while not trouve:
            self._scale = 1 + randrange( p - 1 )
            if not ( self._scale % p ) == 0:
                trouve = True
        
        self._shift = randrange( p )         # shift(also called: b) in [0, p-1]
        self._mask = cap                     # N
        self._q = 7                          # used for double hash
        self._prob_func = self._probing_linear
        if probing == "quadratic":
            self._prob_func = self._probing_quadratic
        elif probing == "double_hash":
            self._prob_func = self._probing_double_hash
        elif probing == "random":
            self._prob_func = self._probing_random
            
        self._collisions = 0

        
    def _hash_code(self, seq: str) -> int:
        """convert a dna sequence (as a key) to a hash code
        params
            seq: dna sequence with bases from: 'A','T','C','G'
        returns
            code: hash code for key k, int 
        """
        bases = "ATCG"
        codes = ["1"] # highest bit, if len(seq) is quarantee to fixed 21
                      # we may not use highest bit 1.
        for base in seq:
            # use two bits represent each base: A:00, T:01, C:10, G:11
            codes.append(bin(bases.index(base))[2:].zfill(2)) # binary code

        code = int("".join(codes), 2) # converto binary code to decimal
        return code


    # hash function using _hash_code and MAD compression function
    def _hash_function( self, k ):
        tmp = self._hash_code(k) * self._scale + self._shift 
        tmp = tmp  % self._prime % self._mask
        return tmp        
        

    # length (number of elements) in the table(T)
    def __len__( self ):
        return self._n

    def _is_available( self, j ):
        return self._T[j] is None or self._T[j] is MyProbeHashMap._AVAIL
    

    # looking for an entry for a key k start from index j
    # we didn't use this function
    # Instead, we use _find_slot2 to support different probing methods
    # and to count collisions
    def _find_slot( self, j, k ):
        first_avail = None
        while True:
            if self._is_available( j ):
                if first_avail is None:
                    first_avail = j
                if self._T[j] is None:
                    return ( False, first_avail )

            elif k == self._T[j]._key:
                return ( True, j )
            # loop in the table
            j = (j + 1) % len( self._T )
            self._collisions += 1
            
    # three different probing methods to solve collision
    def _probing_linear(self, i, *args):
        return i
    
    def _probing_quadratic(self, i, *args):
        """probing quadratic"""
        return i*i
    
    def _probing_double_hash(self, i, k, *args):
        """double hashing
        f(i) = i * h'(k)
        h'(k) = q - (k mod q)
        """
        h_prime = self._q - (self._hash_code(k) % self._q)
        return i * h_prime

    def _probing_random(self, i, k, *args):
        random.seed(1)
        return i * random.randint(1, 7)
    
    def get_collisions(self):
        return self._collisions
    
    def reset_collisions(self):
        self._collisions = 0
        
    def _find_slot2(self, j, k):
        """compared with _find_slot, this method support three different
        probing methods, and we can count the collision numbers
        """
        first_avail = None
        i = 0
        while True:
            new_j = (j + self._prob_func(i, k)) % len(self._T)
            if self._is_available(new_j):# May be None or _AVAIL
                if first_avail is None:
                    first_avail = new_j
                if self._T[new_j] is None:
                    return (False, first_avail)
            elif k == self._T[new_j]._key:
                return (True, new_j)
            # collision:
            i += 1
            self._collisions += 1
                
            
    def _bucket_getitem( self, j, k ):
        found, s = self._find_slot2( j, k )
        if not found:
            return False

        return self._T[s]._value

    def _bucket_setitem( self, j, k, v ):
        found, s = self._find_slot2( j, k )
        if not found:
            self._T[s] = self._Item( k, v )
            self._n += 1

        else:
            self._T[s]._value = v

    def _bucket_delitem( self, j, k ):
        found, s = self._find_slot2( j, k )
        if not found:
            return False
        value = self._T[s]._value
        self._T[s] = MyProbeHashMap._AVAIL
        return value

    def __iter__( self ):
        for j in range( len( self._T ) ):
            if not self._is_available( j ):
                yield self._T[j]._key

    def __getitem__( self, k ):
        j = self._hash_function( k )
        tmp = self._bucket_getitem( j, k )
        if not tmp:
            raise KeyError
        else:
            return tmp

    def __setitem__( self, k, v ):
        j = self._hash_function( k )
        self._bucket_setitem( j, k, v )
        if self._n > len( self._T ) // 2:
            self._resize( 2 * len( self._T ) - 1 )

    def __delitem__( self, k ):
        j = self._hash_function( k )
        succes = self._bucket_delitem( j, k )
        if succes:
            self._n -= 1
            # need resize? minimal size = 
            if self._n < len(self._T) //4:
                new_cap = max(self._min_cap, (len(self._T)+1)/2)
                self._resize(new_cap)
        else:
            raise KeyError

    def _resize( self, c ):
        old = list( self.items() )
        self._T = c * [None]
        self._n = 0
        self._mask = c
        for (k,v) in old:
            self[k] = v

    def is_empty( self ):
        return len( self ) == 0

    def __str__( self ):
        if self.is_empty():
            return "{}"
        pp = "{"
        for item in self.items():
            pp += str( item )
        pp += "}"
        pp += " size = "
        pp += str( len( self ) )
        return pp

    def get( self, k, d = None ):
        try:
            tmp = self[k]
            return tmp
        except KeyError:
            return d

    def setdefault( self, k, d = None ):
        try:
            tmp = self[k]
            return tmp
        except:
            self[k] = d
            return d

In [185]:
# test three probing methods and view their total collisions
# check all key and corresponding value are the same as they are stored into map
# chech there is no repeated keys in the map.
def check_probings():
    probe_methods = ['linear', 'quadratic', 'double_hash', 'random']
    #probe_methods = ['random']
    for probe in probe_methods:
        m = MyProbeHashMap(probing = probe)
        m.reset_collisions()
        n_keys_test = 10000
        for i in range(n_keys_test):
            seq21 = random_seq()
            m[seq21] = seq21

        print("{}/{} collisions/items occurs using probing {}".format(
            m.get_collisions(), n_keys_test, probe))

        for key, value in m.items():
            if not key == value:
                print("Error! key is not euqal to value")
                return False   

        keys = set(m.keys())
        if len(keys) != len(m):
            print("found repeated keys in map")

        print("test OK, map length:{}", len(m))
              
check_probings()
print(m)

15017/10000 collisions/items occurs using probing linear
test OK, map length:{} 10000
13101/10000 collisions/items occurs using probing quadratic
test OK, map length:{} 10000
11922/10000 collisions/items occurs using probing double_hash
test OK, map length:{} 10000
0/10000 collisions/items occurs using probing random
test OK, map length:{} 2
{('GTATTACATCCCTCAAGATAT', 'GTATTACATCCCTCAAGATAT')('ACACCAGAGTATCTACATAAG', 'ACACCAGAGTATCTACATAAG')('AAGGCGACAAAACTCCGGCGC', 'AAGGCGACAAAACTCCGGCGC')('TGCCATAAGTATAACCCCACG', 'TGCCATAAGTATAACCCCACG')('TCCGACCTTCCCGGAACGAAC', 'TCCGACCTTCCCGGAACGAAC')('GCAGAGCAAAATCCACTACGC', 'GCAGAGCAAAATCCACTACGC')('GTAGCTTAATCTGGTTGAGGC', 'GTAGCTTAATCTGGTTGAGGC')('AAAGAATTTGCTCACCTTGCT', 'AAAGAATTTGCTCACCTTGCT')('AGCCTACTGGAATCACAGTAT', 'AGCCTACTGGAATCACAGTAT')('TAGGGTCTTCTGCTATGTAGG', 'TAGGGTCTTCTGCTATGTAGG')('TGGATGGACCTGATTGGAGTA', 'TGGATGGACCTGATTGGAGTA')('GTGCAACAGCGTATCATTAAG', 'GTGCAACAGCGTATCATTAAG')('GCTAAAATTACTGTGTCGGCG', 'GCTAAAATTACTGTGTCGGCG')('TAC