# Introduction 

This assignment is mainly to practice the implementation and application of hashmap.

* First, you will be asked to implement your own hash table with collision handling. 
  * Two hash functions `murmur3` and `fnv` are given
  * You could use other hash functions at your will

* Next, there is a basic interview question `two sum` that can be solved efficiently using hashmap 

* Finally, another classic design question, `LRU cache`, is given as a bonus problem.

In [1]:
#Here is the definition of Node class for singly linked list
class Node:
  """
  Node for singly linked list
  val: value for the node
  next: reference to the next node 
  """
  def __init__(self, val, next=None):
    self.val=val
    self.next=next 

# The implementation of murmur3 hash function is given below
def murmur3(data, seed = 0):
  '''
  MurmurHash3 32 bit: https://en.wikipedia.org/wiki/MurmurHash
  data: str
  return: int
  '''
  c1 = 0xcc9e2d51
  c2 = 0x1b873593

  length = len(data)
  h1 = seed
  roundedEnd = (length & 0xfffffffc)  # round down to 4 byte block
  for i in range(0, roundedEnd, 4):
    # little endian load order
    k1 = (ord(data[i]) & 0xff) | ((ord(data[i + 1]) & 0xff) << 8) | \
          ((ord(data[i + 2]) & 0xff) << 16) | (ord(data[i + 3]) << 24)
    k1 *= c1
    k1 = (k1 << 15) | ((k1 & 0xffffffff) >> 17) # ROTL32(k1,15)
    k1 *= c2

    h1 ^= k1
    h1 = (h1 << 13) | ((h1 & 0xffffffff) >> 19)  # ROTL32(h1,13)
    h1 = h1 * 5 + 0xe6546b64

  # tail
  k1 = 0

  val = length & 0x03
  if val == 3:
      k1 = (ord(data[roundedEnd + 2]) & 0xff) << 16
  # fallthrough
  if val in [2, 3]:
      k1 |= (ord(data[roundedEnd + 1]) & 0xff) << 8
  # fallthrough
  if val in [1, 2, 3]:
      k1 |= ord(data[roundedEnd]) & 0xff
      k1 *= c1
      k1 = (k1 << 15) | ((k1 & 0xffffffff) >> 17)  # ROTL32(k1,15)
      k1 *= c2
      h1 ^= k1

  # finalization
  h1 ^= length

  # fmix(h1)
  h1 ^= ((h1 & 0xffffffff) >> 16)
  h1 *= 0x85ebca6b
  h1 ^= ((h1 & 0xffffffff) >> 13)
  h1 *= 0xc2b2ae35
  h1 ^= ((h1 & 0xffffffff) >> 16)

  return h1 & 0xffffffff

def fnv(data, hval_init=0x811c9dc5):
    """
    FNV hash: https://en.wikipedia.org/wiki/Fowler%E2%80%93Noll%E2%80%93Vo_hash_function
    
    Returns the 32 bit FNV-1a hash value for the given data.
    
    data: str
    return: int
    """

# The implementation of fnva hash function is given below
    def fnva(data, hval_init, fnv_prime, fnv_size):
      """
      Alternative FNV hash algorithm used in FNV-1a.
      """
      data=data.encode()

      hval = hval_init
      for byte in data:
          hval = hval ^ byte
          hval = (hval * fnv_prime) % fnv_size
      return hval


    return fnva(data, hval_init, 0x01000193, 2**32)    

In [2]:

#Test of above-defined hash functions
murmur3('hello world'), fnv('hello world')

(1586663183, 3582672807)

# Problem 1: implement hash table with separate chaining (20 points)

Requirement:
* use separate chaining with linked list to resolve hash collision. 
* do not use built-in `dict` class in Python
* O(1) time complexity for both `contains`, `get`, `put`, `remove` methods 
* you don't have to implement dynamic resizing when load factor is high.

Remark:
* Note that the provided hash function takes **str** as input
* The key of hashmap can be any type. However in this exercise, it is safe to assume it is either **str** or **int**.

In [16]:
#implement using this signature. Add anything else necessary.
class SCHashMap(object):
  
  def __init__(self):
    self.size = 0
    self.capacity = 50
    self.buckets = [None] * self.capacity
  
  def put(self, key, val) -> None:
    index = murmur3(str(key)) % self.capacity
    if self.buckets[index] is None:
        self.size += 1
        self.buckets[index] = Node((key, val))
    else:
        node = self.buckets[index]
        while node:
            if node.val[0] == key:
                node.val = (key, val)
                return
            node = node.next
        node = Node ((key, val))
        node.next =  self.buckets[index]
        self.buckets[index] = node
        self.size += 1
    pass

  def remove(self,key):
    index = murmur3(str(key)) % self.capacity
    node = self.buckets[index]
    prev = None
    while node:
        if node.val[0] == key:
            if node == self.buckets[index]:
                self.buckets[index] = node.next
            else:
                prev.next = node.next
            node.next = None
            self.size -= 1
            return
   
        prev = node
        node = node.next
        pass
    
  def get(self, key):
        index = murmur3(str(key)) % self.capacity
        node = self.buckets[index]
        while node:
            if node.val[0] == key:
                return node.val[1]
            node = node.next
        return None

  def contains(self, key) -> bool:
        index = murmur3(str(key)) % self.capacity
        node = self.buckets[index]
        while node:
            if node.val[0] == key:
                return True
            node = node.next
        return False

  def is_empty(self) -> bool:
        return self.size == 0

In [4]:
#TEST CODE
#DO NOT MODIFY
import unittest
import random
import string 
class BuiltinHashMap:
  def __init__(self):
    self.data={}
  
  def put(self, key, val) -> None:
    self.data[key]=val 

  def remove(self,key):
    if key in self.data:
      self.data.pop(key)

  def get(self, key):
    return self.data.get(key,None)

  def contains(self, key):
    return key in self.data 

  def is_empty(self) -> bool:
    return not self.data

class TestHashMap(unittest.TestCase):
    
    @classmethod
    def setUpClass(cls):
      print("Test using class:" + str(cls.user_class), flush=True)
      cls.CHARS=string.ascii_uppercase + string.digits+string.ascii_lowercase
    
    def generate(self):
      if random.random()>0.5:
        k=random.randint(1,100)
        return ''.join(random.choices(self.CHARS, k=k))
      else:
        return random.randint(1,10**9)

    def test_method(self):
      obj1=BuiltinHashMap()
      obj2=self.user_class()
      for _ in range(10**6):
        self.assertEqual(obj1.is_empty(), obj2.is_empty())
        rnd=random.random()
        if rnd>2/3:
          k,v=self.generate(),self.generate() 
          obj1.put(k,v)
          obj2.put(k,v)
          self.assertTrue(obj1.contains(k))
          self.assertTrue(obj2.contains(k))
          self.assertEqual(v, obj1.get(k))
          self.assertEqual(v, obj2.get(k))
        elif rnd>1/3:
          if obj1.data:
            for k in obj1.data:
              self.assertTrue(obj1.contains(k))
              self.assertTrue(obj2.contains(k))
              obj1.remove(k)
              obj2.remove(k)
              self.assertTrue(not obj1.contains(k))
              self.assertTrue(not obj2.contains(k))
              break 
        else:
          k=self.generate()
          self.assertEqual(obj1.contains(k), obj2.contains(k))
          self.assertEqual(obj1.get(k), obj2.get(k))

        
TestHashMap.user_class=BuiltinHashMap    
suite = unittest.TestSuite([TestHashMap('test_method')])
unittest.TextTestRunner().run(suite)

Test using class:<class '__main__.BuiltinHashMap'>


.
----------------------------------------------------------------------
Ran 1 test in 14.143s

OK


<unittest.runner.TextTestResult run=1 errors=0 failures=0>

In [5]:
#TEST BLOCK, DO NOT MODIFY
#Your code needs to pass the unittests
TestHashMap.user_class=SCHashMap    
suite = unittest.TestSuite([TestHashMap('test_method')])
unittest.TextTestRunner().run(suite)

Test using class:<class '__main__.SCHashMap'>


E
ERROR: test_method (__main__.TestHashMap)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-4-fbed8cdf257f>", line 51, in test_method
    self.assertTrue(obj2.contains(k))
  File "<ipython-input-3-814eaa0ffe5f>", line 55, in contains
    index = murmur3(str(key)) % capacity
NameError: name 'capacity' is not defined

----------------------------------------------------------------------
Ran 1 test in 0.004s

FAILED (errors=1)


<unittest.runner.TextTestResult run=1 errors=1 failures=0>

# Problem 2: implement hash table with open addressing (30 points)

Requirement:
* Use open addressing to resolve hash collision. 
* You may choose any open addressing techniques (e.g. linear probing etc)
* Do not use built-in `dict` class in Python
* O(1) time complexity for both `contains`, `get`, `put`, `remove` methods


Remark:
* Note that the provided hash function takes **str** as input
* The key of hashmap can be any type. However, in this exercise, it is safe to assume it is **str** or **int**.

Hints:
* `linear probing` is probably the easiest method
* The trickest part is the `remove` function.
* You need to do table doubling as the load factor greater than 0.5.

  

In [25]:
#implement using this signature. Add anything else necessary.
class OAHashMap(object):
  
  def __init__(self):
    self.size = 0
    self.capacity = 1000
    self.buckets = [None] * self.capacity
    
  
  def put(self, key, val) -> None:
    """
    If key already exists, update the value. Otherwise put (key,val) into the hash table.
    """
    index = murmur3(str(key)) % self.capacity
    while index < self.capacity:
        if self.buckets[index] is None:
            self.size += 1
            self.buckets[index] = (key, val)
            
            if self.size / self.capacity >= 0.5:
                self.resize()
            return
        elif self.buckets[index][0] == key:
            self.buckets[index] = (key, val)
            return
        index += 1
    self.expand()
    self.put(key, val)
    return

  def expand(self):
        self.capacity = self.capacity * 2
        old = self.buckets
        self.buckets = [None] * self.capacity
        self.size = 0
        for old_new in old:
            if old_new is not None:
                self.put(old_new[0], old_new[1])

  def remove(self, key):
    """
    If key is in the table, remove key and value. Otherwise do nothing.
    """
    index = murmur3(str(key)) % self.capacity
    idx = None
    while index < self.capacity:
        if self.buckets[index] is None:
            break
        index += 1
    if idx is None:
        print('the key does not exist!')
        raise KeyError('The key does not exist!')
        self.buckets[idx] = None
    self.size -= 1
    next_key = idx +1

    while next_key < self.capacity:
        if self.buckets[next_key] is None:
            break
        new_hash = murmur3(str(self.buckets[next_key][0])) % self.capacity
        if new_hash <= idx:
            self.buckets[idx] = self.buckets[next_key]
            self.buckets[next_key] = None
            idx = next_key
        next_key += 1

  def get(self,key):
    """
    return the value of key. If not found return None
    """
    index = murmur3(str(key)) % self.capacity
    for i in range(index, self.capacity):
        if self.buckets[i] is None:
            break
        elif self.buckets[i][0] == key:
            return self.buckets[i][1]
    return None

  def contains(self, key) -> bool:
    """
    return True if key found in the hash table, otherwise return False
    """
    index = murmur3(str(key)) % self.capacity
    for i in range(index, self.capacity):
        if self.buckets[i] is None:
            break
        elif self.buckets[i][0] == key:
            return True
    return False 

  def is_empty(self) -> bool:
    """
    return True if the table is empty, otherwise return False
    """
    return self.size == 0

In [26]:
#TEST BLOCK, DO NOT MODIFY
#Your code needs to pass the unittests
TestHashMap.user_class=OAHashMap    
suite = unittest.TestSuite([TestHashMap('test_method')])
unittest.TextTestRunner().run(suite)

Test using class:<class '__main__.OAHashMap'>


E

the key does not exist!



ERROR: test_method (__main__.TestHashMap)
----------------------------------------------------------------------
Traceback (most recent call last):
  File "<ipython-input-4-fbed8cdf257f>", line 60, in test_method
    obj2.remove(k)
  File "<ipython-input-25-2f831e875f73>", line 52, in remove
    raise KeyError('The key does not exist!')
KeyError: 'The key does not exist!'

----------------------------------------------------------------------
Ran 1 test in 0.006s

FAILED (errors=1)


<unittest.runner.TextTestResult run=1 errors=1 failures=0>

# Problem 3: Two Sum (20 points)

Given an array of integers $nums$, return the count of index pairs $(i,j)$ such that $i \lt j$ and $nums[i]+nums[j]=0$

Example:
```
input = [-1,1,0,1,0]
output = 3
```
```
input = []
output = 0
```

Requirements:
* If hashmap is used, use one of  hashmaps you implemented above.
* State the complexity of the algorithm.

In [14]:
#implement using this signature. Add anything else necessary.
#ignore the self parameter, it is just for the test cases to work.
from typing import List
def twoSum(self, nums : List[int]) -> int:
    hash_fun = SCHashMap()
    count = 0
    for i in nums:
        if hash_fun.contains(-i):
            count += hash_fun.get(-i)
        if hash_fun.contains(i):
            hash_fun.put(i, hash_fun.get(i) + 1)
        else:
            hash_fun.put(i, 1)
    return count 

In [17]:
#TEST CODE
#DO NOT MODIFY
#Your code needs to pass the unittests
import unittest
import random

class TestTwoSum(unittest.TestCase):
    
    @classmethod
    def setUpClass(cls):
      print("Test using class:" + str(cls.user_class), flush=True)
   
    def test_method(self):
      fun=self.user_class 
      a=[-1,1,0,1,0]
      b=3
      self.assertEqual(b,fun(a))
      
      a=[]
      b=0
      self.assertEqual(b,fun(a))
      
      a=[0]
      b=0
      self.assertEqual(b,fun(a))

      k=10**6
      a=[0]*k
      b=(k-1)*(k)//2
      self.assertEqual(b,fun(a))

      k=10**6//3
      a=[0]*k+[-1]*k+[1]*k
      random.shuffle(a)
      b=(k-1)*(k)//2+k*k 
      self.assertEqual(b,fun(a))
      
        
TestTwoSum.user_class=twoSum    
suite = unittest.TestSuite([TestTwoSum('test_method')])
unittest.TextTestRunner().run(suite)

Test using class:<function twoSum at 0x00000298204BACA0>


.
----------------------------------------------------------------------
Ran 1 test in 34.251s

OK


<unittest.runner.TextTestResult run=1 errors=0 failures=0>

# Bonus Problem: LRU Cache (Bonus) (30 points)

Design a data structure satisfying  the constraints of a [Least Recently Used (LRU) cache](https://en.wikipedia.org/wiki/Cache_replacement_policies#LRU).

Implement the `LRUCache` class:

* `LRUCache(int capacity)` Initialize the LRU cache with positive size capacity.
* `int get(int key)` Return the value of the key if the key exists, otherwise return -1.
* `void put(int key, int value)` Update the value of the key if the key exists. Otherwise, add the key-value pair to the cache. If the number of keys exceeds the capacity from this operation, evict the least recently used key.

This is a classical question, there are a lot of resources on internet. 

Requirements:
* The functions must each run in $O(1)$ average time complexity.
* use doubly linked list and hashmap to implement it.
* use one of hashmap classes you implemented above. Do not use build-int- `dict` or `OrderedDict` in python

Hints:
* use a doubly linked list to maintain the access order of key-values. key/values are stored in the node.
* additionaly, use a hashmap to store key-nodes for quick query.
* when a key is accessed, find the node by hashmap and move the node to the head or tail of the linked list
* if the size of cache exceeds capacity, remove the head or tail node. 




In [None]:
#implement using this signature. Add anything else necessary.
class LRUCache:

    def __init__(self, capacity: int):
      pass
        

    def get(self, key: int) -> int:
      pass 
        

    def put(self, key: int, value: int) -> None:
      pass 

    def is_empty(self) -> bool:
      """
      True if the cache is empty
      """
      pass 

    def contains(self, key: int) -> bool:
      """
      True if key in the cache
      """
      pass 
    
    def size(self) -> int:
      """
      return the size of cache (actual number of elements in the cache, not the capacity)
      """
      pass 

In [None]:
#TEST CODE
#DO NOT MODIFY
import unittest
import random
from collections import OrderedDict

class BuiltinLRUCache:
  def __init__(self,capacity):
    self.capacity=capacity 
    self.data=OrderedDict()
  
  def get(self, key: int) -> int:
      if key not in self.data: 
        return -1
      else:
        self.data.move_to_end(key,last=True) 
        return self.data[key]
  
  def put(self, key: int, value: int) -> None:
    if key in self.data:
      self.data[key]=value 
      self.data.move_to_end(key,last=True) 
    else:
      while len(self.data)>=self.capacity:
          self.data.popitem(last=False) 
      self.data[key]=value 
      self.data.move_to_end(key,last=True) 
  
  def is_empty(self) -> bool:
    return not self.data 

  def contains(self, key: int) -> bool:
    return key in self.data 
  
  def size(self) -> int:
    return len(self.data)

class TestLRUCache(unittest.TestCase):
    
    @classmethod
    def setUpClass(cls):
      print("Test using class:" + str(cls.user_class), flush=True)
    
    def generate(self):
      return random.randint(1,10**9)

    def test_method(self):
      C=100
      obj1=BuiltinLRUCache(C)
      obj2=self.user_class(C)
      for _ in range(10**6):
        self.assertEqual(obj1.is_empty(), obj2.is_empty())
        self.assertEqual(obj1.size(), obj2.size())
        self.assertTrue(obj1.size()<=C)
        self.assertTrue(obj2.size()<=C)
        rnd=random.random()
        if rnd>1/2:
          k,v=self.generate(),self.generate() 
          obj1.put(k,v)
          obj2.put(k,v)
          self.assertTrue(obj1.contains(k))
          self.assertTrue(obj2.contains(k))
          self.assertEqual(v, obj1.get(k))
          self.assertEqual(v, obj2.get(k))
        else:
          k=self.generate()
          self.assertEqual(obj1.contains(k), obj2.contains(k))
          self.assertEqual(obj1.get(k), obj2.get(k))

        
TestLRUCache.user_class=BuiltinLRUCache    
suite = unittest.TestSuite([TestLRUCache('test_method')])
unittest.TextTestRunner().run(suite)

In [None]:
#TEST BLOCK, DO NOT MODIFY
#Your code needs to pass the unittests
TestLRUCache.user_class=LRUCache    
suite = unittest.TestSuite([TestLRUCache('test_method')])
unittest.TextTestRunner().run(suite)