In [1]:
import pandas as pd
import numpy as np
import math

## Data

In [2]:
# Read the file
def read_file(txt_file):
    lines = []
    with open(txt_file, "r") as file:
        l = file.readlines()
        for i in l:
            lines.append(i.strip())
    return lines

encoding_txt_file = read_file("encoding_regions.txt")
non_encoding_txt_file = read_file("non_encoding_regions.txt")
unlabelled_txt_file = read_file("unlabelled.txt")
genome_txt_file = read_file("genome.txt")
len(genome_txt_file[0])
# genome_txt_file

1000000

## 1. Emission Probabilities

In [3]:
def freq_of_nucleotides(region_data):
    frequency = dict()
    for line in range(len(region_data)):
        for char in region_data[line]:
            if char not in frequency:
                frequency[char] = 1 
            else:
                frequency[char] += 1 
    return frequency


def get_emission_probs(freq_dict):
    emission_probabilities = dict()
    count_probabilities = sum(freq_dict.values())
    for key in freq_dict:
        emission_probabilities[key] = freq_dict[key]/count_probabilities
    return emission_probabilities

a = freq_of_nucleotides(encoding_txt_file)    
b = freq_of_nucleotides(non_encoding_txt_file) 
emi_encoding = get_emission_probs(a)
emi_noncoding = get_emission_probs(b)
print("Emission probabiliites in the encoding txt file", emi_encoding)
print("Emission probabiliites in the non encoding txt file", emi_noncoding)

Emission probabiliites in the encoding txt file {'G': 0.3999818, 'C': 0.3999538, 'T': 0.1000112, 'A': 0.1000532}
Emission probabiliites in the non encoding txt file {'C': 0.2501792, 'G': 0.249929, 'T': 0.2499335, 'A': 0.2499583}


## 2. Gene-finding

In [4]:
def gene_finding(encoding_emi_probs, non_encoding_emi_probs, txt_file):
    gene_find_txt = open("HW2 2.txt", "a")
    # 0 = non encoding, 1 = encoding
    for gene in txt_file:
        prob_encoding = 0
        prob_non_encoding = 0 
        for char in gene:
            prob_encoding += math.log(encoding_emi_probs[char])
            prob_non_encoding += math.log(non_encoding_emi_probs[char])
        if prob_encoding > prob_non_encoding:
            gene_find_txt.write("1\n")
        else:
            gene_find_txt.write("0\n")
    gene_find_txt.close()

# gene_finding(emi_encoding, emi_noncoding, unlabelled_txt_file)


## 3. Building Hidden Markov Models

In [5]:
def viterbi_decoding(txt_file, transition_matrix, emission_matrix, start_dict, states):
    
    viterbi_matrix = np.zeros((len(txt_file[0]),2)) # 100000 x 2
    
    for char_index in range(len(txt_file[0])):
        for state in range(len(states)):
            if char_index == 0:
                viterbi_matrix[char_index, state] = math.log(start_dict[state]) + math.log(emission_matrix[state][txt_file[0][char_index]])
            else:
                first_term = viterbi_matrix[char_index-1, 0] + math.log(transition_matrix[0][state])
                second_term = viterbi_matrix[char_index-1, 1] + math.log(transition_matrix[1][state])
                max_probability = max(first_term, second_term)
                viterbi_matrix[char_index, state] = max_probability + math.log(emission_matrix[state][txt_file[0][char_index]])

    trace_path_matrix = np.zeros((len(txt_file[0])), dtype=int)
    trace_path_matrix[-1] = np.argmax([viterbi_matrix[-1]])
    for i in range(len(viterbi_matrix)-2, -1,-1): 
        trace_path_matrix[i] = np.argmax([viterbi_matrix[i, 0]+ math.log(transition_matrix[0][trace_path_matrix[i+1]]), viterbi_matrix[i,1] + math.log(transition_matrix[1][trace_path_matrix[i+1]])])
    
    return trace_path_matrix
    


# 1 - encoding region, 0 - non-encoding region
states = ['1', '0']

observations = ('A', 'C', 'G', 'T')

start_probability = {1: 0.5, 0: 0.5}

transition_probability = [[0.999, 0.001],[0.001, 0.999]]

emission_probability = {
1 : {'A': 0.1000532, 'C': 0.3999538, 'G': 0.3999818, 'T': 0.1000112},
0 : {'A': 0.2499583, 'C': 0.2501792, 'G': 0.249929, 'T': 0.2499335},
}

trace_path_array = viterbi_decoding(genome_txt_file, transition_probability, emission_probability, start_probability, states)
np.savetxt('HW2_3.txt', trace_path_array, fmt='%d', delimiter='', newline='')

## 4. Learning HMMs

In [6]:
def get_state_indices(trace_path): 
    num = trace_path
    changes = [[int(num[0]), 0]]
    for i in range(1, len(num)):
        if num[i] != num[i-1]:
            changes[-1].append(i-1)
            changes.append([int(num[i]), i])
    changes[-1].append(len(num)-1)

    indices = {1:[],0:[]}
    for change in changes:
        indices[change[0]].append((change[1], change[-1]))
    return indices
        

In [7]:
def update_frequencies(txt_file_non_encoded, indices):
    encoding_frequencies = dict()
    non_encoding_frequencies = dict()
    for i in indices: 
        for tup in indices[i]:
            start_index = tup[0]
            end_index = tup[-1]
            for char in txt_file_non_encoded[0][start_index:end_index+1]:
                if i == 1:
                    if char not in encoding_frequencies:
                        encoding_frequencies[char] = 1 
                    else:
                        encoding_frequencies[char] += 1
                elif i == 0:
                    if char not in non_encoding_frequencies:
                        non_encoding_frequencies[char] = 1 
                    else:
                        non_encoding_frequencies[char] += 1
    return encoding_frequencies, non_encoding_frequencies
    
def update_transition_matrix(trace_path_matrix):
    new_mat = [[0,0],[0,0]]
    for i in range(len(trace_path_matrix)-1):
        new_mat[trace_path_matrix[i]][trace_path_matrix[i+1]] += 1
    denom = [sum(new_mat[0]), sum(new_mat[1])]
    for i in range(2):
        for j in range(2):
            new_mat[i][j] *= 1/denom[i]
    return new_mat    


In [8]:
def learning_hmm(txt_file, start_dict, states, trace_path): 
    for i in range(5):
        # Get indices 
        indices = get_state_indices(trace_path)
        
        # Update transition matrix
        transition_matrix = update_transition_matrix(trace_path)
        
        # Update emission probabilities 
        en_freq, non_en_freq = update_frequencies(txt_file, indices)
        em_mat_encoding = get_emission_probs(en_freq)
        em_mat_non_encoding = get_emission_probs(non_en_freq)
        emission_matrix = {
                    1 : {'A': em_mat_encoding['A'], 'C': em_mat_encoding['C'], 'G': em_mat_encoding['G'], 'T': em_mat_encoding['T']},
                    0 : {'A': em_mat_non_encoding['A'], 'C': em_mat_non_encoding['C'], 'G': em_mat_non_encoding['G'], 'T': em_mat_non_encoding['T']},
                    }
        print(f"Iteration Number {i}")
        print("Transition Matrix\n", np.array(transition_matrix))
        print("Emission Matrix \n " ,emission_matrix)
        print("\n")
        # Viterbi Decoding  
        trace_path = viterbi_decoding(txt_file, transition_matrix, emission_matrix, start_dict, states)

learning_hmm(genome_txt_file, start_probability, states, trace_path_array)

Iteration Number 0
Transition Matrix
 [[9.99859006e-01 1.40993715e-04]
 [3.86819889e-03 9.96131801e-01]]
Emission Matrix 
  {1: {'A': 0.11373069429934778, 'C': 0.38413756105824887, 'G': 0.3878363497755315, 'T': 0.11429539486687185}, 0: {'A': 0.22934366456800503, 'C': 0.27038212367416803, 'G': 0.2706827717262278, 'T': 0.22959144003159915}}


Iteration Number 1
Transition Matrix
 [[9.99893635e-01 1.06364742e-04]
 [3.28770588e-03 9.96712294e-01]]
Emission Matrix 
  {1: {'A': 0.11269876394904056, 'C': 0.38361837321784215, 'G': 0.3891189580501375, 'T': 0.1145639047829798}, 0: {'A': 0.22892560361928896, 'C': 0.27084359545502895, 'G': 0.2710986640395635, 'T': 0.22913213688611858}}


Iteration Number 2
Transition Matrix
 [[9.99894686e-01 1.05314130e-04]
 [3.27316639e-03 9.96726834e-01]]
Emission Matrix 
  {1: {'A': 0.11224100673700267, 'C': 0.3839455955256133, 'G': 0.38890301258421256, 'T': 0.11491038515317148}, 0: {'A': 0.22892067582692158, 'C': 0.27085217628328234, 'G': 0.27112578624144584, 

## 5. HMM Duration Distributions

In [13]:
def get_duration_distributions(regions):
    encoding_num_observerations = 0
    non_encoding_num_observations = 0
    for i in regions: 
        if i == 1:
            encoding_num_regions = len(regions[i])
            for tup in regions[i]:
                encoding_num_observerations += tup[-1] - tup[0]
            lambda_encoding = encoding_num_regions/encoding_num_observerations
        elif i == 0:
            non_encoding_num_regions = len(regions[i])
            for tup in regions[i]:
                non_encoding_num_observations += tup[-1] - tup[0]
            lambda_non_encoding = non_encoding_num_regions/non_encoding_num_observations
    return lambda_encoding, lambda_non_encoding

pairs = get_state_indices(trace_path_array)
lamba_MLE_encoding, lamba_MLE_non_encoding = get_duration_distributions(pairs)
print("Lambda for encoding regions", lamba_MLE_encoding)
print("Lambda for non encoding regions", lamba_MLE_non_encoding)

Lambda for encoding regions 0.0038832199546485263
Lambda for non encoding regions 0.00014205046213059102
