In [34]:
mapping = {"A": 0,
           "C": 1,
           "G": 2,
           "T": 3}

def order_kmer(seq):
    order = 0
    for i, letter in enumerate(seq[::-1]):
        if letter not in mapping:
            raise ValueError("Letter not in ACGT")
        order += 4**i * mapping[letter]
    return order

seq = "CCC"
order_kmer(seq)


21

In [35]:
def find_minimizer_in_window(seq_window, k):
    min_order = order_kmer(seq_window[:k])
    min_pos = 0
    for i in range(len(seq_window) - k + 1):
        order = order_kmer(seq_window[i: i + k])
        #print(i, order)
        if order < min_order:
            min_order = order
            min_pos = i
    
    return min_pos

seq = "CCAAAACC"
k = 4
find_minimizer_in_window(seq, k)

2

In [53]:
def find_minimizers(seq, w, k):
    minimizers = []
    for window in range(len(seq) - (w + k - 1) + 1):
        minimizers.append(find_minimizer_in_window(seq[window: window + (w + k - 1)], k) +  window)

    return minimizers

seq = "CCAAACCCC"
w = 4
k = 3
find_minimizers(seq, w, k)

[2, 2, 2, 3]

In [16]:
from collections import deque

class MonotonicQueue:
    def __init__(self):
        self.deque = deque()
    
    def push(self, value):
        while self.deque and self.deque[-1] > value:
            self.deque.pop()
        self.deque.append(value)
    
    def min(self):
        return self.deque[0]
    
    def pop(self, value):
        if self.deque and self.deque[0] == value:
            self.deque.popleft()

def min_sliding_window(nums, k):
    mq = MonotonicQueue()
    result = []
    
    for i in range(len(nums)):
        if i >= k:
            mq.pop(nums[i - k])
        mq.push(nums[i])
        if i >= k - 1:
            result.append(mq.min())
    
    return result

# Example usage:
nums = [1, 3, -1, -3, 5, 3, 6, 7]
k = 3
print(min_sliding_window(nums, k))

[-1, -3, -3, -3, 3, 3]


In [None]:
from collections import deque

class MonotonicQueue:
    def __init__(self, values):
        self.deque = deque()
        self.values = values
    
    def push(self, index):
        while self.deque and self.values[self.deque[-1]] > self.values[index]:
            self.deque.pop()
        self.deque.append(index)
    
    def min(self):
        return self.deque[0]
    
    def pop(self, value):
        if self.deque and self.deque[0] == value:
            self.deque.popleft()


def min_sliding_window(nums, k):
    mq = MonotonicQueue()
    result = []
    
    for i in range(len(nums)):
        if i >= k:
            mq.pop(nums[i - k])
        mq.push(nums[i])
        if i >= k - 1:
            result.append(mq.min())
    
    return result

# Example usage:
nums = [1, 3, -1, -3, 5, 3, 6, 7]
k = 3
print(min_sliding_window(nums, k))

In [27]:
from collections import deque
from typing import List

def minSlidingWindow(nums: List[int], k: int) -> List[int]:

    deq = deque()
    result = []
    left = 0

    for right in range(len(nums)):
        # pop larger elements
        while deq and nums[right] < nums[deq[-1]]:
            deq.pop()
        
        # add new element to queue
        deq.append(right)

        # pop expired values
        if left > deq[0]:
            deq.popleft()
        
        # start appending the result
        if right >= k - 1:
            print(f"deq: {deq}, deq[0]: {deq[0]}, value: {nums[deq[0]]}")
            result.append(nums[deq[0]])
            left += 1
    
    return result


# Example usage:
nums = [1, 3, -1, -3, 5, 3, 9, 12]
k = 4

minSlidingWindow(nums, k)

deq: deque([3]), deq[0]: 3, value: -3
deq: deque([3, 4]), deq[0]: 3, value: -3
deq: deque([3, 5]), deq[0]: 3, value: -3
deq: deque([3, 5, 6]), deq[0]: 3, value: -3
deq: deque([5, 6, 7]), deq[0]: 5, value: 3


[-3, -3, -3, -3, 3]

In [37]:
from collections import deque
from typing import List

def minimizerSlidingWindow(seq: List[int], w: int, k: int) -> List[int]:

    deq = deque()
    result = []
    left = 0

    for right in range(len(seq) - k + 1):
        #print(f"deq: {deq}")
        # pop larger elements
        while deq and order_kmer(seq[right: right + k]) < \
                      order_kmer(seq[deq[-1]: deq[-1] + k]):
            deq.pop()
        
        # add new element to queue
        deq.append(right)

        # pop expired values
        if left > deq[0]:
            deq.popleft()
        
        # start appending the result
        if right >= w - 1:
            #print(f"## deq: {deq}, deq[0]: {deq[0]}")
            result.append(deq[0])
            left += 1
    
    return result


# Example usage:
seq = "TCAAAT"
w = 4
k = 3


minimizerSlidingWindow(seq, w, k)

[2]

In [45]:
# benchmark naive vs mono-queue
import random

seq = "".join([random.choice("ATGC") for i in range(10)])
w = 4
k = 3

seq

'CAGCACACGA'

In [52]:
order_kmer("CAC")

17

In [47]:
seq = "CAGCACACGA"

print(minimizerSlidingWindow(seq, w, k))

print(find_minimizers(seq, w, k))

[1, 4, 4, 4, 4]
[4, 3, 2, 1, 0]
