In [1]:
import numpy as np
import random
import scipy.io as sio
import matplotlib.pyplot as plt

In [2]:
def loadLDPC(filename):
    A = sio.loadmat(filename)
    G = A['G']
    H = A['H']
    return G, H

In [40]:
def apply_channel_noise(y, epsilon):
    ## TODO, complement each bit with probability epsilon
    return np.array([i if np.random.rand() > epsilon else int(not i) for i in y])

In [4]:
def encode_message(x, G):
    new_message = np.dot(G, x) % 2
    ## TODO, implement Gx % 2 :-))
    return new_message

---

In [5]:
class FactorTypeOne():
    def __init__(self, y_til, epsilon):
        self.y_til = y_til
        self.epsilon = epsilon
    
    def calculate_value(self, y):
        return self.epsilon if y != self.y_til else 1 - self.epsilon

In [6]:
class FactorTypeTwo():
    def __init__(self, scope):
        # consider a factor: \phi(1, 4, 6), so in this case scope = [1,4,6]
        self.scope = np.array(scope)
    
    def calculate_value(self, scope_assignment):
        # if sum(scope_assignment) is even, the value = 1, O.W 0
        return 1 if sum(scope_assignment) % 2 == 0 else 0


In [41]:
class FactorGraph():
    
    def __init__(self, H, epsilon, y_tilde):
        self.factors_type1 = [] # list of FactorTypeOne
        self.factors_type2 = [] # list of FactorTypeTwo
        self.var_to_factor = {} # map --> (var, [factors related to this var])
        self.factor_to_var = {} # map --> (factor, [vars related to this factor])
        self.messagesVarToFactor = {}
        self.messagesFactorToVar = {}
        
        for i,b in enumerate(y_tilde):
            self.factors_type1.append(FactorTypeOne(y_tilde[i], epsilon))
            self.var_to_factor[i] = [(1, len(self.factors_type1) - 1), ] # 1 means that the factor is from the first type
            self.factor_to_var[(1, len(self.factors_type1) - 1)] = [i, ] # 1 means that the factor is from the first type
        
        for row in H:
            scope = [var for var in range(len(y_tilde)) if row[var] == 1]
            self.factors_type2.append(FactorTypeTwo(scope))
            
            for i in scope:
                self.var_to_factor[i].append((2, len(self.factors_type2) - 1)) # 2 means that the factor is from the 2nd type
                
            self.factor_to_var[(2, len(self.factors_type2) - 1)] = scope       # 2 means that the factor is from the 2nd type
        
        
    ############################################################################################################       
        
        
    def assignment_probability(self, assignment):
        prob = 1
        
        # For unary Factors:
        for i, b in enumerate(assignment):
            prob_this_bit = self.factors_type1[i].calculate_value(b)
            #  TODO: implement the easy single line to compute the value of this factor
            prob *= prob_this_bit
        
        # Second Type
        for f2 in self.factors_type2:
            #TODO: compute the scope assignment of this factor, due to the given assignment
            prob *= f2.calculate_value(assignment[f2.scope])
            
        return prob
    
    
    ############################################################################################################       
    
    
    def LoopyBP(self, n_iteration):
        
        for ite in range(n_iteration):

            prevMessagesVarToFactor = {}
            prevMessagesFactorToVar = {}
            
            for i, vrbs in enumerate(self.var_to_factor):
                factors = self.var_to_factor[vrbs]
                for s in factors:
                    if (vrbs, s) not in self.messagesVarToFactor:
                        self.messagesVarToFactor[(vrbs, s)] = np.array([0.5, 0.5])
                    prevMessagesVarToFactor[(vrbs, s)] = self.messagesVarToFactor[(vrbs, s)]
                    
            for s, fcts in enumerate(self.factor_to_var):
                variables = self.factor_to_var[fcts]
                for x in variables:
                    if (fcts, x) not in self.messagesFactorToVar:
                        self.messagesFactorToVar[(fcts, x)] = np.array([0.5, 0.5])
                    prevMessagesFactorToVar[(fcts, x)] = self.messagesFactorToVar[(fcts, x)]
                    
            # Update the message var -> factor
            for xm in self.var_to_factor:
                factors = self.var_to_factor[xm]
                for fs in factors:
                    m0, m1 = 1.0 , 1.0
                    for l in factors:
                        if l == fs:
                            continue
                        m0 *= self.messagesFactorToVar[(l, xm)][0]
                        m1 *= self.messagesFactorToVar[(l, xm)][1]
                        
                    self.messagesVarToFactor[(xm, fs)] = np.array([m0 / (m0 + m1), m1 / (m0 + m1)])
                # TODO: complete this loop

            # Update the message factor -> var
            for fs in self.factor_to_var:
                variables = self.factor_to_var[fs]
        
                factor_type = fs[0]
                
                if factor_type == 1:
                    m0 = self.factors_type1[fs[1]].calculate_value(0)
                    m1 = self.factors_type1[fs[1]].calculate_value(1)
                    
                    self.messagesFactorToVar[(fs, variables[0])] = np.array([m0 / (m0 + m1), m1 / (m0 + m1)])
                    continue
                    
                

                for i, x in enumerate(variables):
                    s0, s1 = 0.0 , 0.0
                    for state in range(2 ** len(variables)):
                        
                        y = [int(j) for j in bin(state)[2:]]
                        for k in range(len(variables) - len(y)):
                            y.insert(0, 0)
                        y = np.array(y)    
                        factor_value = self.factors_type2[fs[1]].calculate_value(y)
                        for index, xm in enumerate(variables):
                            if x == xm:
                                continue    
                            factor_value *= prevMessagesVarToFactor[(xm, fs)][y[index]]
                        
                        if (y[i]) == 0:
                            s0 += factor_value
                        else:
                            s1 += factor_value
                    
                    self.messagesFactorToVar[(fs, x)] = np.array([s0 / (s0 + s1), s1 / (s0 + s1)])
            
            
            if ite % 10 == 0 and ite > 0:
                print("Finished Loopy Iteration %s" % ite)
    
    
    ############################################################################################################       
    
    
    def estimate_marginal_probability(self, var):
        '''
        This method assumes LoopyBP has been run
        '''
        res = np.array([1.0, 1.0])
        for factor in self.var_to_factor[var]:
            for i in range(2):
                res[i] *= self.messagesFactorToVar[(factor, var)][i]
        
        if sum(res) == 0:
            return res
        else:
            return res / sum(res)
        
        
    ############################################################################################################       
    
    
    def get_marginal_MAP(self):
        output = np.zeros(256)
        for i, var in enumerate(range(256)):
            output[i] = np.argmax(self.estimate_marginal_probability(i))
        return output
    
        
    

---

In [None]:
y_tilde = np.array([[1, 1, 1, 1, 1, 1]]).reshape(6, 1)
H = np.array([
        [0, 1, 1, 0, 1, 0],
        [1, 0, 1, 0, 1, 1],
        [0, 1, 0, 1, 1, 0]])
epsilon = 0.05

Graph = FactorGraph(H, epsilon, y_tilde)

ytest1 = np.array([0, 1, 1, 0, 1, 0])
ytest2 = np.array([1, 0, 1, 1, 0, 1])
ytest3 = np.array([1, 0, 1, 1, 1, 1])

print(Graph.assignment_probability(ytest1))
print(Graph.assignment_probability(ytest2))
print(Graph.assignment_probability(ytest3))


In [49]:
for i in [ytest1, ytest2, ytest3]:
    print(np.dot(H, i) % 2)

[1 0 0]
[1 1 1]
[0 0 0]


The third one is zero and therefore, the probability is not zero, but the two others are 0.

---

In [46]:
G, H = loadLDPC('GH.mat')

epsilon = 0.05
N = G.shape[1]
x = np.ones((N, 1), dtype='int32')

y = encode_message(x, G)

yTilde = apply_channel_noise(y, epsilon)

G = FactorGraph(H, epsilon, yTilde)

G.LoopyBP(50)


best_estimation = G.get_marginal_MAP()
print('Actual: \n', y.reshape(256))
print()
print('Predicted: \n', best_estimation)
sum_ = 0
for i in range(256):
    if best_estimation[i] == y[i]:
        sum_ += 1
print('Accuracy: ', (sum_ / 256) * 100)

Finished Loopy Iteration 10
Finished Loopy Iteration 20
Finished Loopy Iteration 30
Finished Loopy Iteration 40
Actual: 
 [1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1
 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 1 0 1 0 0 1 0 1 1 1 0 0 1 1 0 1 0 1 1 1 0
 1 0 1 0 1 0 1 1 1 1 1 1 1 1 1 0 1 1 1 1 0 1 1 0 1 0 1 1 1 0 1 0 0 0 0 1 0
 1 0 1 1 1 0 0 0 1 0 0 1 1 0 1 1 1 0 1 1 0 0 0 0 1 1 1 1 0 1 1 0 1 1 0 1 1
 0 0 0 0 0 1 0 1 1 1 0 0 1 1 0 0 0 0 0 0 0 1 0 0 1 0 1 0 1 1 1 0 0 1]

Predicted: 
 [1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1.
 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 1. 