<a href="https://colab.research.google.com/github/mh-sarkar/singleBaseDNAArithmaticEncodeDecode/blob/main/SingleBaseArithmeticEncodeDecoseWithDeepDNA_05_06_2022_for_github.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Connecting Google Drive

---


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Import Library

---



In [None]:
!pip install Bio
!pip install fuzzywuzzy

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting Bio
  Downloading bio-1.3.9-py3-none-any.whl (270 kB)
[K     |████████████████████████████████| 270 kB 5.3 MB/s 
[?25hCollecting biopython>=1.79
  Downloading biopython-1.79-cp37-cp37m-manylinux_2_5_x86_64.manylinux1_x86_64.whl (2.3 MB)
[K     |████████████████████████████████| 2.3 MB 68.8 MB/s 
[?25hCollecting mygene
  Downloading mygene-3.2.2-py2.py3-none-any.whl (5.4 kB)
Collecting biothings-client>=0.2.6
  Downloading biothings_client-0.2.6-py2.py3-none-any.whl (37 kB)
Installing collected packages: biothings-client, mygene, biopython, Bio
Successfully installed Bio-1.3.9 biopython-1.79 biothings-client-0.2.6 mygene-3.2.2
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting fuzzywuzzy
  Downloading fuzzywuzzy-0.18.0-py2.py3-none-any.whl (18 kB)
Installing collected packages: fuzzywuzzy
Successfully installed fuzzy

In [None]:
import random
import numpy as np
import sys
import io
import math
import tensorflow as tf
import keras

from __future__ import print_function
from itertools import product

from Bio import SeqIO
from Bio.Seq import Seq
# from Bio.Alphabet import generic_dna

from keras import backend as K
from keras.callbacks import LambdaCallback, Callback
from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation, Flatten
from keras.layers import LSTM
from keras.layers import Conv1D, MaxPooling1D
from keras.utils.data_utils import get_file
from keras.models import model_from_json
from keras.models import load_model
from keras.callbacks import EarlyStopping
from keras.callbacks import ModelCheckpoint
from sklearn.model_selection import train_test_split

from tensorflow.keras.optimizers import RMSprop

#Define Constant

---



In [None]:
SEQ_MAX_LEN = 16400
CHARS = "ACGT"
# print('total chars:', len(CHARS))
# print('chars:', CHARS)
CHAR_INDICES = dict((c, i) for i, c in enumerate(CHARS))
INDICES_CHAR = dict((i, c) for i, c in enumerate(CHARS))
MAXLEN = 64
STEP = 1
BATCH_SIZE = 4096
INITIAL_LR = 0.001
GLOBAL_DECAY_STEPS = 10000
GLOBAL_DECAY_RATE = 0.9
EPOCHS = 10
INPUT_DIM = len(CHARS)

##Path Define

In [None]:
# early_stopping = EarlyStopping(monitor='val_loss', patience=2)
# np.random.seed(1337) # for reproducibility
train_path = 'TRAIN_FILE_PATH'
valid_path = 'VALID_FILE_PATH'
test_path = 'TEST_FILE_PATH'

##Data Vectorization ▶

---



###Read Data From Path Function

In [None]:
def read_fasta(data_path):
    records = list(SeqIO.parse(data_path, "fasta"))
    text = ""
    for record in records:
        text += str(record.seq)
    return text

###Read Data for Vectorization from Path Function

In [None]:
def read_fasta_vectorization(data_path):
    records = list(SeqIO.parse(data_path, "fasta"))
    text = ""
    for record in records:
        text += str(record.seq)
    text = text + text[0:BATCH_SIZE-len(text)%BATCH_SIZE]
    return text

###Small Data Sequence Split from Full Sequence Data Function 

In [None]:
def read_data(data_path):
    text = read_fasta_vectorization(data_path)
    for i in range(0, len(text) - MAXLEN, STEP):
        sentence = text[i: i + MAXLEN]
        next_char = text[i + MAXLEN]
        yield sentence, next_char

###Data Vectorization Function

In [None]:
def vectorization(sentences, next_chars):
    x = np.zeros((BATCH_SIZE, MAXLEN, len(CHARS)), dtype=np.float32)
    y = np.zeros((BATCH_SIZE, len(CHARS)), dtype=np.float32)
    for i, sentence in enumerate(sentences):
        for t, char in enumerate(sentence):
            x[i, t, CHAR_INDICES[char]] = 1
        y[i, CHAR_INDICES[next_chars[i]]] = 1
    return x, y

###Get Batch Function

In [None]:
def get_batch(stream):
    sentences = []
    next_chars = []
    for sentence, next_char in stream:
        sentences.append(sentence)
        next_chars.append(next_char)
        if len(sentences) == BATCH_SIZE:
            data_tuple = vectorization(sentences,next_chars)
            yield data_tuple
            sentences = []
            next_chars = []
    # Added for padding characters
    if len(sentences) <= BATCH_SIZE:
        data_tuple = vectorization(sentences,next_chars)
        yield data_tuple
        sentences = []
        next_chars = []

###Get Vectorized Data Function

In [None]:
def get_vectorized_data(data_path):
    data_X = []
    data_y = []
    for i, batch in enumerate(get_batch(read_data(data_path))):
        data_X.append(batch[0])
        data_y.append(batch[1])
    data_X = np.array(data_X)
    data_X = data_X.reshape(-1,64,4)
    data_y = np.array(data_y)
    data_y = data_y.reshape(-1,4)
    return data_X, data_y

###Assign Vectorized Data

In [None]:
X_train, y_train = get_vectorized_data(train_path)
X_test, y_test = get_vectorized_data(test_path)
X_valid, y_valid = get_vectorized_data(valid_path)

###Load Previous Model if exists

In [None]:
model = load_model('SAVE_MODEL_PATH')

#**Arithmetic Encoding Decoding** ⏩

---


## Arithmetic Encoding Decoding Class

In [None]:
from decimal import Decimal, getcontext
import mpmath

getcontext().prec = 64
mpmath.mp.dps = 64

class ArithmeticEncoding:
    """
    ArithmeticEncoding is a class for building arithmetic encoding.
    """

    def __init__(self, frequency_table):
        self.probability_table = self.get_probability_table(frequency_table)

    def get_probability_table(self, frequency_table):
        """
        Calculates the probability table out of the frequency table.
        """
        total_frequency = sum(list(frequency_table.values()))

        probability_table = {}
        for key, value in frequency_table.items():
            probability_table[key] = value/total_frequency

        return probability_table

    def get_encoded_value(self, encoder):
        """
        After encoding the entire message, this method returns the single value that represents the entire message.
        """
        last_stage = list(encoder[-1].values())
        last_stage_values = []
        for sublist in last_stage:
            for element in sublist:
                last_stage_values.append(element)

        last_stage_min = np.min(last_stage_values)
        last_stage_max = np.max(last_stage_values)

        return (last_stage_min + last_stage_max)/2

    def process_stage(self, probability_table, stage_min, stage_max):
        """
        Processing a stage in the encoding/decoding process.
        """
        stage_probs = {}
        stage_domain = stage_max - stage_min
        for term_idx in range(len(probability_table.items())):
            term = list(probability_table.keys())[term_idx]
            term_prob = Decimal(probability_table[term])
            cum_prob = term_prob * stage_domain + stage_min
            stage_probs[term] = [stage_min, cum_prob]
            stage_min = cum_prob
        return stage_probs

    def encode(self, msg, probability_table):
        """
        Encodes a message.
        """

        encoder = []
    
        stage_min = Decimal(0.0)
        stage_max = Decimal(1.0)

        for msg_term_idx in range(len(msg)):

          stage_probs = self.process_stage(probability_table, stage_min, stage_max)

          msg_term = msg[msg_term_idx]
          stage_min = stage_probs[msg_term][0]
          stage_max = stage_probs[msg_term][1]

          encoder.append(stage_probs)

        stage_probs = self.process_stage(probability_table, stage_min, stage_max)
        encoder.append(stage_probs)
        encoded_msg = self.get_encoded_value(encoder)

        return encoder, encoded_msg

    def decode(self, encoded_msg, msg_length, probability_table):
        """
        Decodes a message.
        """

        decoder = []
        decoded_msg = ""

        stage_min = Decimal(0.0)
        stage_max = Decimal(1.0)

        for idx in range(msg_length):
            stage_probs = self.process_stage(probability_table, stage_min, stage_max)

            for msg_term, value in stage_probs.items():
                if encoded_msg >= value[0] and encoded_msg <= value[1]:
                    break

            decoded_msg = decoded_msg + msg_term
            stage_min = stage_probs[msg_term][0]
            stage_max = stage_probs[msg_term][1]

            decoder.append(stage_probs)

        stage_probs = self.process_stage(probability_table, stage_min, stage_max)
        decoder.append(stage_probs)

        return decoder, decoded_msg

##Encode Decode Test ▶

---


###Read Test Sequence

In [None]:
test_text = read_fasta(test_path)
X_test_data = X_test[0:len(test_text)]

###Define Constant

In [None]:
encode_len = 64

###64 Base Encode Testing

In [None]:
# from fuzzywuzzy import fuzz
encoding_record = []
# X_test_data = X_test[0:len(test_text)]
probability = []
for i in range(0, math.ceil(len(X_test_data)/encode_len)):
    test_encode_len =len(test_text)%encode_len if(i==math.ceil(len(X_test_data)/encode_len)-1) else encode_len
    probability = model.predict(X_test_data[i*encode_len:(i*encode_len) +test_encode_len])
    # print(probability)
    
    frequent_table = {'A' : np.mean(probability[:][0].sum()), 'C' : np.mean(probability[:][1].sum()), 'G' : np.mean(probability[:][2].sum()), 'T' : np.mean(probability[:][3].sum()), }
    AE = ArithmeticEncoding(frequent_table)
    encoder, encoded_msg = AE.encode(msg= test_text[i*encode_len: (i*encode_len) + test_encode_len],probability_table=AE.probability_table)
    encoding_record.append(str(encoded_msg))
    # decoder, decoded_msg = AE.decode(encoded_msg=encoded_msg, 
    #                              msg_length=encode_len,
    #                              probability_table=AE.probability_table)

    
    print (f'\r[{i}/{math.ceil(len(X_test_data)/encode_len)-1}] encoded msg {test_text[i*encode_len: (i*encode_len) + test_encode_len]} is {encoded_msg}',end="", flush=True)
    
    # print(i)
    # print(encoded_msg)
    # print(decoded_msg)
    # print(test_text[i*encode_len:(i*encode_len) + encode_len])
    # print(fuzz.ratio(decoded_msg,test_text[i*encode_len:(i*encode_len) + encode_len]))

print('\n')
print(encoding_record)
with open('ENODING_DATA_FILE_PATH', 'w') as f:
    for s in encoding_record:
        f.write(s + u'\n')


[2588/2588] encoded msg CCCTTAAATAAGACATCACGATG is 0.3427850147939545880229360546016829930749855653647632570814543218

['0.551066833869477579508015231265808172688638904100685030511432083', '0.9166827898790407528308010696446685195833486798300755125909827695', '0.929660648915651887192821468278127042619731493644126573728268777', '0.1215216954631458739116994270535002465658294801506425400421770092', '0.2867425094639537622531527708360492866576805293197612831701066386', '0.6448101606325265617344185073102380508861009537818035393151684885', '0.3212776789514982963442979390996746017235705103628161140772419092', '0.8123377137605206903403182517651930826717542357982975912278563755', '0.2666682221436909353605119395040165816828025585580322469474544353', '0.924302553966185801261499436838445722114189133754694639865179516', '0.2530974422529953674163674587029884064243930441529223083499760731', '0.9577359635451903290146412435229983274507495285691478742381205845', '0.5720826216973298952599314495043366477460

###64 Base Decode Testing

In [None]:
from fuzzywuzzy import fuzz

decode_record = []
# X_test_data = X_test[0:len(test_text)]
with open('ENODING_DATA_FILE_PATH', 'r') as f:
    decode_record = [Decimal(line.rstrip(u'\n')) for line in f]
probability = []
for i in range(0, math.ceil(len(X_test_data)/encode_len)):
    test_encode_len =len(test_text)%encode_len if(i==math.ceil(len(X_test_data)/encode_len)-1) else encode_len
    probability = model.predict(X_test_data[i*encode_len:(i*encode_len) + test_encode_len])
    
    frequent_table = {'A' : np.mean(probability[:][0].sum()), 'C' : np.mean(probability[:][1].sum()), 'G' : np.mean(probability[:][2].sum()), 'T' : np.mean(probability[:][3].sum()), }
    AE = ArithmeticEncoding(frequent_table)
    
    decoder, decoded_msg = AE.decode(encoded_msg=decode_record[i], 
                                 msg_length=test_encode_len,
                                 probability_table=AE.probability_table)
    
    ratio = fuzz.ratio(decoded_msg,test_text[i*encode_len:(i*encode_len) + encode_len])
    ratio_not_100 = []
    
    if(ratio<100):
      msg = str(i)+" index matches " + str(ratio)
      ratio_not_100.append(msg)
    print (f'\r[{i}/{math.ceil(len(X_test_data)/encode_len)-1}] decoded msg matches {ratio}% is {decoded_msg}',end="", flush=True)
    
    if i==math.ceil(len(X_test_data)/encode_len-1):
      print('\n')
      if(len(ratio_not_100)!=0):
        print(ratio_not_100)
      else:
        print('All Decoded msg are matches 100%')
    # print(i)
    # print(decoded_msg)
    # print(test_text[i*encode_len:(i*encode_len) + encode_len])
    # print(fuzz.ratio(decoded_msg,test_text[i*encode_len:(i*encode_len) + encode_len]))
    # print('\n')

[4/2588] decoded msg matches 100% is CAGCCGCTTTCCACACAGACATCATAACAAAAAATTTCCACCAAACCCCCCCTCCCCCGCTTCT



[2588/2588] decoded msg matches 100% is CCCTTAAATAAGACATCACGATG

All Decoded msg are matches 100%
