## Deadline: 3 May 2023

### 1. Estimate the worst case time complexity

1 point

In [1]:
def func0(n):
    if n == 0: return 1
    return func0(n-1) + 5

def func1(n):
    i = 1
    while i < n:
        i = i*2
    
def func2(n):
    if n == 1:
        return False
    i = 2
    while i*i <= n:
        if n % i  == 0:
            return False
        i+=1
    return True   


1) $T(n)=1$, if $n=0$ so $T(0)=1$ \\
   $T(n)=T(n-1)+O(1)$ \\
   Let's show that geometric progression takes place, use induction rule \\

   $T(n)=T(n-1)+O(1)=T(n-2)+O(1)+O(1)=...\\
   =T(1)+O(1)+...+O(1)$ \\
   Since we looking for longest call - worst time complexity, each adding term is a power of 2

   $T(n)=T(1)+O(1)+...+O(1)=O(2^0)+O(2^1)+O(2^2)+...+O(2^{n-1})= \\
   =\sum_{j=0}^{n-1}O(j)=O(2^n -1)=O(\exp(n \cdot \ln2))$ 

   So the worst is like $T(n)=O(\exp(n))$

   

   

2) The loop works untill $T(2^k) < O(n) \leq T(2^{k+1}) $ because at each step i is getting doubled, so $T(k) < O(\ln n) \leq T(k+1) $. So the worst is $O(\ln n)$

3) Since we have $i\cdot i \leq n$ true the loop works, so $T(k^2)\leq O(n) < T((k+1)^2)$

$T(k)\leq O(n^{0.5}) < T(k+1)$
So the worst case is $O(n^{\frac{1}{2}})$



### 1. Hash table

Implement k-mer index that stores positions of all k-mers of an input nucleotide sequence in a hash table. Write your own implementation of hash table with the interface: add(), delete(), get() using polynomial hash function. Provide some tests for your class.

2 points

In [2]:
class HashTable:
    '''
    Class takes two arguments: size and p. size is the size of the hash table 
    and p is a prime number used for the polynomial hash function.
    '''
    def __init__(self, size=10000, p=1):
        '''
        Set p = 1 for instance, size = 1000s0
        '''
        self.size = size
        self.p = p
        self.table = [[] for _ in range(size)]

    def add(self, key, value):
        '''
        The method first hashes the key and then adds the key-value pair to the appropriate bucket in the hash table. 
        If the key already exists in the bucket, appends the new value to the list of values associated with the key.
        '''
        index = self.hash(key)
        for pair in self.table[index]:
            if pair[0] == key:
                pair[1].append(value)
                return 
        self.table[index].append((key, [value]))

    def delete(self, key):
        '''
        Method hashes the key and then searches for the key in the appropriate bucket. 
        If the key is found, the method removes the key-value pair from the bucket.
        '''
        index = self.hash(key)
        for i, pair in enumerate(self.table[index]):
            if pair[0] == key:
                del self.table[index][i]
                return self.table[index]

    def get(self, key):
        '''
        The method hashes the key and then searches for the key in the appropriate bucket. 
        If the key is found,  returns the list of values associated with the key. 
        If the key is not found - returns None.
        '''
        index = self.hash(key)
        for pair in self.table[index]:
            if pair[0] == key:
                return pair[1]
        return None

    def hash(self, key):
        '''
        The method applies polynomial hash function
        '''
        h = 0
        for char in key:
            h = (h * self.p + ord(char)) % self.size
        return h

In [24]:
# class HashTable:
#     def __init__(self, size):
#         self.size = size
#         self.table = [[] for _ in range(size)]
    
#     def _hash(self, key):
#         # Polynomial hash function
#         p = 31
#         m = self.size
#         hash_value = 0
#         for i in range(len(key)):
#             hash_value = (hash_value * p + ord(key[i])) % m
#         return hash_value
    
#     def add(self, key, value):
#         hash_value = self._hash(key)
#         bucket = self.table[hash_value]
#         for i in range(len(bucket)):
#             if bucket[i][0] == key:
#                 bucket[i][1].append(value)
#                 return
#         bucket.append([key, [value]])
    
#     def delete(self, key, value):
#         hash_value = self._hash(key)
#         bucket = self.table[hash_value]
#         for i in range(len(bucket)):
#             if bucket[i][0] == key:
#                 if value in bucket[i][1]:
#                     bucket[i][1].remove(value)
#                     if len(bucket[i][1]) == 0:
#                         bucket.pop(i)
#                 return
    
#     def get(self, key):
#         hash_value = self._hash(key)
#         bucket = self.table[hash_value]
#         for i in range(len(bucket)):
#             if bucket[i][0] == key:
#                 return bucket[i][1]
#         return None

In [25]:
# def build_kmer_index(sequence, k):
#     index = HashTable(len(sequence) - k + 1)
#     for i in range(len(sequence) - k + 1):
#         kmer = sequence[i:i+k]
#         index.add(kmer, i)
#     return index

In [27]:
# # Create a sequence
# sequence = "ATGACATGATGACGTAC"

# # Build k-mer index
# k = 3
# index = build_kmer_index(sequence, k)

# # Test get() function
# assert index.get("ATG") == [0, 6, 9]
# assert index.get("GAC") == [3, 12]
# assert index.get("TAC") == [7, 15]
# assert index.get("CCC") == None

# # Test add() and delete() functions
# index.add("ATG", 13)
# assert index.get("ATG") == [0, 6, 9, 13]
# index.delete("ATG", 9)
# assert index.get("ATG") == [0, 6, 13]

In [3]:
class KmerIndex:
    '''
    The KmerIndex class is a class that implements a k-mer index for a given nucleotide sequence.
    A k-mer index is a data structure that stores the positions of all k-mers of length k in a sequence.
    '''
    def __init__(self, sequence, k):
        '''
        Define sequence itself, k - length of kmer, index via HashTable class
        '''
        self.sequence = sequence
        self.k = k
        self.index = HashTable()
        self.build_index()

    def build_index(self):
        '''
        The kmer index by iterating over the input sequence and storing the positions of each k-mer in a hash table. 
        The hash table stores key-value pairs and allows for constant time access to values by key.
        '''
        for i in range(len(self.sequence) - self.k + 1):
            kmer = self.sequence[i:i+self.k]
            self.index.add(kmer, i)

    def get_kmer_positions(self, kmer):
        '''
        Method takes a k-mer as input and returns a list of positions where the k-mer occurs in the input sequence. 
        If the k-mer is not in the hash table, the method returns None.
        '''
        return self.index.get(kmer)

Applying tests

In [4]:
# kmer of length 2
seq = "AAACCCGGGTTT"
k = 2
ki = KmerIndex(seq, k)
assert ki.get_kmer_positions("AA") == [0, 1]
assert ki.get_kmer_positions("CC") == [3, 4]

# kmer of length 3
seq = "ATCGATCGATCG"
k = 3
ki = KmerIndex(seq, k)
assert ki.get_kmer_positions("ATC") == [0, 4, 8]
assert ki.get_kmer_positions("TTT") == None

### 2. Bloom filter

Implement a Bloom filter class using the input sequence from the previous task. Implement build() and query() interface. Provide some tests for your class.

2 points

In [31]:
!pip install mmh3
!pip install bitarray

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting mmh3
  Downloading mmh3-3.1.0-cp39-cp39-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (37 kB)
Installing collected packages: mmh3
Successfully installed mmh3-3.1.0


In [None]:
import mmh3
from bitarray import bitarray

In [33]:
class BloomFilter:
    def __init__(self, size, num_hashes):
        self.size = size
        self.num_hashes = num_hashes
        self.bit_array = bitarray(size)
        self.bit_array.setall(0)
    
    def _hash(self, key):
        hash_values = []
        for seed in range(self.num_hashes):
            hash_value = mmh3.hash(key, seed) % self.size
            hash_values.append(hash_value)
        return hash_values
    
    def add(self, key):
        hash_values = self._hash(key)
        for hash_value in hash_values:
            self.bit_array[hash_value] = 1
    
    def query(self, key):
        hash_values = self._hash(key)
        for hash_value in hash_values:
            if self.bit_array[hash_value] == 0:
                return False
        return True
    
    def build(self, sequence, k):
        for i in range(len(sequence) - k + 1):
            kmer = sequence[i:i+k]
            self.add(kmer)

In [38]:
# Create a sequence
sequence = "ATCGATCGATCG"

# Build Bloom filter
size = 1000
num_hashes = 5
bloom_filter = BloomFilter(size, num_hashes)
bloom_filter.build(sequence, k=3)

# Test query() function
assert bloom_filter.query("ATC") == True
assert bloom_filter.query("TCG") == True
assert bloom_filter.query("GAT") == True
assert bloom_filter.query("CCC") == False

# Test add() and query() functions
bloom_filter.add("CCC")
assert bloom_filter.query("CCC") == True

### 3*. (Optional) SBT

Implement Sequence Bloom Trees algorithm from https://www.ncbi.nlm.nih.gov/pmc/articles/PMC4804353/

4 points


In [12]:
class SBTNode:
    """
    A class representing a node in the Sequence Bloom Tree (SBT).

    Attributes:
        bloom_filter (bitarray): A bit array used to store k-mers.
        children (list): A list of child nodes.
    """
    def __init__(self, bloom_filter=None, children=None):
        """
        Initializes a new instance of the SBTNode class.

        """
        self.bloom_filter = bloom_filter
        self.children = children or []

    def add_child(self, node):
        """
        Adds a child node to the current node.
        """
        self.children.append(node)

    def count_kmers(self, k, sequence):
        """
        Counts the k-mers in the specified sequence and adds them to the node's bloom filter.
        """
        kmers = [sequence[i:i+k] for i in range(len(sequence)-k+1)]
        for kmer in kmers:
            self.bloom_filter.add(hash(kmer))

    def contains_kmer(self, kmer):
        """
        Determines if the node's bloom filter contains the specified k-mer.
        """
        return kmer in self.bloom_filter

In [13]:
class SBT:
    """
    A class representing a Sequence Bloom Tree (SBT).

    Attributes:
        k (int): The length of the k-mer.
        max_children (int): The maximum number of children per node.
        capacity (int): The capacity of the bloom filter.
        error_rate (float): The false positive rate of the bloom filter.
        root (SBTNode): The root node of the tree.
        num_nodes (int): The number of nodes in the tree.
    """

    def __init__(self, k, max_children=2, capacity=1000, error_rate=0.05):
        """
        Initializes a new instance of the SBT class.
        """
        self.k = k
        self.max_children = max_children
        self.capacity = capacity
        self.error_rate = error_rate
        self.root = SBTNode(bloom_filter=bitarray(capacity))
        self.num_nodes = 1

    def insert(self, sequence):
        """
        Inserts a sequence into the SBT.
        """
        node = self.root
        path = deque()
        while True:
            path.append(node)
            if len(node.children) == 0:
                break
            elif len(node.children) == self.max_children:
                node = self._split(node)
            else:
                node = node.children[0]

        for n in reversed(path):
            if n.bloom_filter.count() < self.capacity:
                n.count_kmers(self.k, sequence)
                return

            if len(n.children) < self.max_children:
                child_node = SBTNode(bloom_filter=bitarray(self.capacity))
                child_node.count_kmers(self.k, sequence)
                n.add_child(child_node)
                self.num_nodes += 1
                return

            node = self._split(n)
            if len(path) > 0:
                parent = path.pop()
                parent.children[parent.children.index(n)] = node
                node.add_child(n)
            else:
                new_root = SBTNode(bloom_filter=bitarray(self.capacity))
                new_root.add_child(node)
                new_root.add_child(n)
                self.root = new_root
                return

    def query(self, sequence):
        """
        Inserts a sequence into the SBT.
        """
        node = self.root
        for i in range(len(sequence) - self.k + 1):
            kmer = sequence[i:i+self.k]
            if not node.contains_kmer(kmer):
                return False
            node = node.children[0]
        return True

    def _split(self, node):
        """
        Inserts a sequence into the SBT.
        """
        left = SBTNode(bloom_filter=bitarray(self.capacity))
        right = SBTNode(bloom_filter=bitarray(self.capacity))
        for child in node.children:
            if hash(child.bloom_filter.to01()) % 2 == 0:
                left.add_child(child)
            else:
                right.add_child(child)
        node.children = [left, right]
        return node

In [14]:
sbt = SBT(k=31)
sequence = "ATCGATCGATCGATCG"
sbt.insert(sequence)

In [15]:
query_sequence = "ATCG"
if sbt.query(query_sequence):
    print("Sequence found in SBT")
else:
    print("Sequence not")

Sequence found in SBT
