In [1]:
import pandas as pd
import numpy as np
from numba import jit
import math
import random

In [2]:
@jit
def delta(a,b):
    if a == b:
        return 1
    else:
        return 0

In [3]:
#functions to open the file and integer encode the sequences

def integer_encode_seq(sequence): #we can use instead of this the one-hot encoded function
    alphabet = ['A', 'C', 'D', 'E', 'F', 'G','H', 'I', 'K', 'L', 'M', 'N', 'P', 'Q', 'R', 'S', 'T', 'V', 'W', 'Y', '-']
    char_to_int = dict((c, i) for i, c in enumerate(alphabet))
    integer_encoded = [char_to_int[char] for char in sequence]
    return integer_encoded

def integer_encode_fasta_file(path_to_file):
    y = []
    X = []
    with open(path_to_file) as f:
        for line in f:
            line = line.strip() #removes blank spaces at the beginning and at the end of each line
            if not line:
                continue #does nothing if the line is empty
            if line.startswith(">"): #handles a label line
                text_sequence = '' #initializes the string in which we will store our sequence - trick to avoid messing up the two line sequence :)
                if line.endswith("true"):
                    y.append(1)
                elif line.endswith("false"):
                    y.append(0)
            else: #case in which we have the first line of a sequence or the second line of a sequence
                if len(text_sequence) == 0:
                    text_sequence = text_sequence + line
                else:
                    text_sequence = text_sequence + line
                    X.append(integer_encode_seq(text_sequence))
    return np.array(y), np.array(X)

In [4]:
y, X = integer_encode_fasta_file('MSA_nat_with_annotation.faa')

In [5]:
#general parameters
L = len(X[1])
print("length of the sequences, L = ", L)
M = len(X)
print("number of sequences, M = ", M)
N = 21
print("number of characters/AAs, N = ", N)

length of the sequences, L =  96
number of sequences, M =  1130
number of characters/AAs, N =  21


In [6]:
@jit
def f(a, i, X):
    tot = 0
    for m in range(M):
        tot = tot+delta(a, X[m,i])
    return tot/M

@jit
def A(i, X):
    f_array = []
    for a in range(N):
        F = f(a, i, X)
        f_array.append(F)
    f_array = np.array(f_array)
    return np.argmax(f_array)

In [17]:
#energy function
@jit
def energy(h_array, J, sequence,T):
    energy = 0
    for i in range(len(sequence)):
        energy -= h_array[i]*sequence[i]
    for i in range(len(sequence)-1):
        for j in range(i, len(sequence)):
            energy -= J*sequence[i]*sequence[j]
    return energy/T

In [18]:
@jit
def trial(sequence): #select 1 spins at random and flip it
    new_sequence = np.copy(sequence)
    for i in range(1):
        p = random.randint(0, len(sequence)-1)
        new_sequence[p] *= -1
    return new_sequence

def MC_sampling(h_array, J, L, n_steps, sequence_zero, T): #USE np.array() for h ELSE JIT DOES NOT WORK
    #compute energy of the randomly initialized sequence
    energy_zero = energy(h_array, J, sequence_zero,T)
    #array to store my sample
    sample = []
    for i in range(n_steps):
        #propose a move by flipping one of the spins
        sequence_new = trial(sequence_zero)
        #compute energy of the proposed sequence
        energy_new = energy(h_array, J, sequence_new,T)
        alpha = np.exp(-energy_new/83)/np.exp(-energy_zero/83)
        if alpha > 1:
            alpha = 1
        if alpha > random.random(): #to accept with a probability equal to the acceptance
            #update the sequence
            sequence_zero = np.copy(sequence_new)
            energy_zero = energy_new
            #update sample
            sample.append(sequence_zero)
        else:
            sample.append(sequence_zero)
    return np.array(sample), sequence_zero

In [9]:
from numpy import loadtxt
h_array_conv = loadtxt('hHarrayconv_sMax50000_nMC100000_eps0.5_a0.05.txt', unpack=False, comments='P')
J_conv = loadtxt('Jconv_sMax50000_nMC100000_eps0.5_a0.05.txt', unpack=False, comments='P')

In [10]:
def integer_transform(X, N, X_train):
    X_integer = X.copy()
    for m in range(len(X)):
        for i in range(len(X[0])):
            if X[m,i] == +1:
                X_integer[m,i] = A(i,X_train)
            else:
                AA_array = [i for i in range(0,N)]
                ind = AA_array.index(A(i,X_train))
                AA_array.pop(ind)
                p_array = [f(a, i, X_train) for a in range(N)]
                p_array.pop(ind)
                p_array = np.array(p_array)
                if p_array.sum() == 0: #deal with the case in which all p are 0
                    p_array = np.ones(len(AA_array))
                p_array /= p_array.sum()
                chosenAA = np.random.choice(a=AA_array, p=p_array)
                X_integer[m,i]= chosenAA
    return X_integer

In [11]:
def seq_from_int_to_onehot(sequence_int):
    sequence_onehot = []
    for i in range(len(sequence_int)):
        addition = np.zeros(20, dtype=int)
        if sequence_int[i] != 20:
            addition[sequence_int[i]]=1
        sequence_onehot.append(addition)
    return np.array(sequence_onehot).flatten()

def data_from_int_to_onehot(X):
    X_onehot=[]
    for i in X:
        X_onehot.append(seq_from_int_to_onehot(i))
    return np.array(X_onehot)

In [20]:
"""
The aim of this function is:
-run n_samples independent MC sampling 
-the number of iterations is equal to n_gen
-take from each one k generated sequences
"""

def generate_samples(n_gen,k,n_samples,T):
    samples=[]
    for i in range(n_samples):
        #MC sampling
        sequence_zero = np.array([random.randrange(-1,2,2) for i in range(L)])
        generated_sample, seq = MC_sampling(h_array_conv, J_conv, L, n_gen, sequence_zero,T)
        
        #extract k sequences from the sampled ones and saved the on a file
        # take 1000 sequences as far as possible from each other from the second half of generated sequences
        # minimize correlations between sequences and between sequences and sequence_zero
        step = int(n_gen/(2*k))  
        sample = np.array([generated_sample[int(n_gen/2)+j*(step-1),:] for j in range(k)])
        
        #convert to integer and to onehot
        int_encoded = integer_transform(sample, N, X)
        samples.append(data_from_int_to_onehot(int_encoded))
    
    return np.array(samples)
        

In [14]:
#implement PML 'pseudo maximum likelyhood'
#the function works on integer encoded data

import random as rd
from sklearn.linear_model import LogisticRegression
softmax = LogisticRegression(multi_class='multinomial',max_iter=100000)

# add n_seq composed of one repeating aminoacid for each kind
# useful to regularize and to constrain the softmax classifier to work on 21 possible values per site
def regularization(X, n_seq, N_amino):
    X_reg = np.copy(X)
    for i in range(n_seq):
        for j in range(N_amino):
            add = np.full(X.shape[1],j)
            X_reg = np.vstack((X_reg,add))
    return np.array(X_reg)

def pop(X,i):
    y = X[:,i]
    X_copy = np.copy(X)
    X_copy = np.delete(X_copy,i,1)
    return X_copy, y

#fit all the marginals needed for PML
def marginals_PML(X, reg, N):
    Z = regularization(X,reg,N)
    marginals = []
    for i in range(L):
        #use solver = sag otherwise the regularized problem does not converge
        s = LogisticRegression(max_iter=1000000, multi_class='multinomial', solver='sag')
        s.fit(pop(Z,i)[0],pop(Z,i)[1])
        marginals.append(s)
    return marginals

#T is the computational temperature, set to 1 is ininfluent
def set_T(prob, T):
    p = np.array([np.exp(np.log(a)/T) for a in prob])
    norm = np.sum(p)
    p = p / norm
    return p
    

def PML(marginals,n_steps,sequence_zero, N, T):
    seq = np.copy(sequence_zero)
    sim = [sequence_zero]
    div = 0
    for i in range(n_steps):
        site = rd.randint(0,L-1)
        prob = marginals[site].predict_proba([np.delete(seq,site,0)])
        prob = set_T(prob, T)
        prob = [item for sublist in prob for item in sublist] #flatten the array
        new_site = np.random.choice(range(N),p = prob)
        if seq[site] != new_site:
            div += 1
            seq[site] = new_site
        sim.append(np.copy(seq))
                
    print(div)
    return np.array(sim)

In [15]:
"""
The aim of this function is:
-run n_samples independent PML generated 
-the number of iterations is equal to n_gen
-take from each one k generated sequences
"""

def generate_samples_PML(marginals,n_gen,k,n_samples,T):
    samples=[]
    for i in range(n_samples):
        #PML sampling
        sequence_zero = X[rd.randint(0,M-1)]
        generated_sample = PML(marginals, n_gen, sequence_zero, N, T)
        
        #extract k sequences from the sampled ones and saved the on a file
        # take k sequences as far as possible from each other from the second half of generated sequences
        # minimize correlations between sequences and between sequences and sequence_zero
        step = int(n_gen/(2*k))  
        sample = np.array([generated_sample[int(n_gen/2)+j*(step-1),:] for j in range(k)])
        
        #convert to integer and to onehot
        samples.append(data_from_int_to_onehot(sample))
    
    return np.array(samples)

def generate_samples_PML_memory(marginals,n_gen,k,n_samples,T):
    samples = []
    for i in range(n_samples):
        #PML sampling
        sample = []
        sequence_zero = X[rd.randint(0,M-1)]
        step = int(n_gen/(2*k))  
        for i in range(2*k):
            generated_sample = PML(marginals, step-1, sequence_zero, N, T)
            sequence_zero = generated_sample[-1]
            if i > k-1:
                sample.append(np.copy(sequence_zero))     
        
        sample = np.array(sample)
        #convert to integer and to onehot
        samples.append(data_from_int_to_onehot(sample))
    
    return np.array(samples)

In [16]:
"""
reg = 2
m = marginals_PML(X,reg,N)
"""
from joblib import dump, load
m = load('marginals.joblib')