In [8]:
import numpy as np
import commpy as cp
import scipy.signal as sig
import scipy.linalg as la
import math
import matplotlib.pyplot as plt
from multiprocessing import Pool

import traceback
import warnings
import sys

%matplotlib inline
DEFAULT_SEED = 100
np.random.seed(DEFAULT_SEED)#set the random generator seed to default

In [9]:

def modulate(data, mod_scheme='BPSK', demod=False):
    """  1. Modulates (or demodulates) data according to the modulation scheme """
    mod_schemes = ['BPSK', 'QPSK']
    data = data.flatten()
    if mod_scheme not in mod_schemes:
        raise ValueError('Unknown modulation scheme, please choose from: '+ ' '.join(mod_schemes))
    elif mod_scheme == 'QPSK':
        modulator = cp.modulation.QAMModem(4)
        if demod:
            return modulator.demodulate(data, "hard")
        return modulator.modulate(data)
    elif mod_scheme == 'BPSK':
        def bpsk_one(x):
            if demod:
                return 0 if x < 0 else 1
            return -1 if x==0 else 1
        bpsk = np.vectorize(bpsk_one)
        return bpsk(data)

def apply_channel(signal, channel_function):
    """  2. Convolves signal with channel_function """
    channel_output = sig.convolve(signal, channel_function, mode='full') # convolve input complex data with the channel transfer function
    return channel_output

def add_awgn_noise(signal, SNR_dB):
    """  3. Adds AWGN noise vector to signal  
            to generate a resulting signal vector y of specified SNR in dB
    """
    L=len(signal)
    SNR = 10**(SNR_dB/10.0) #SNR to linear scale
    Esym=np.sum(np.square(np.abs(signal)))/L #Calculate actual symbol energy
    N0=Esym/SNR; #Find the noise spectral density
    if(isinstance(signal[0], complex)):
        noiseSigma=np.sqrt(N0/2.0)#Standard deviation for AWGN Noise when x is complex
        n = noiseSigma*(np.random.randn(1,L)+1j*np.random.randn(1,L))#computed noise 
    else:
        noiseSigma = np.sqrt(N0);#Standard deviation for AWGN Noise when x is real
        n = noiseSigma*np.random.randn(1,L)#computed noise
    y = signal + n #received signal
    return y.flatten()

def num_bit_errs(in_bits, out_bits):
    total = 0
    for i in range(len(in_bits)):
        if in_bits[i] != out_bits[i]:
            total += 1
    return total


In [125]:

class Baseline:
    def __init__(self, params={}):
        pass
    
    def train(self, x, y, params={}):
        pass
    
    def predict(self, x):
        return x
    
class LeastMeanSquares:
    def __init__(self, init_params={'equalizer_order':None,'random_starts':True, 'learning_rate':0.01}):
        if (not init_params['equalizer_order']):
            raise ValueError("LeastMeanSquares: init_params['equalizer_order'] is missing")
        self.order = init_params['equalizer_order']
        if (self.order % 2 == 0):
            raise ValueError("LeastMeanSquares: init_params['equalizer_order'] must be odd")
        if (self.order < 3):
            raise ValueError("LeastMeanSquares: init_params['equalizer_order'] must be at least 3")
        self.h = None
        self.random_starts = init_params['random_starts']
        self.L = (self.order-1)//2
        self.mu = init_params['learning_rate']
        
    def train(self, x, y, train_params={}):
        mu = self.mu
        if (self.h is None):
            if (self.random_starts):
                self.h = np.random.randn(order) + 1j*np.random.randn(self.order) if isinstance(x[0], complex) else np.random.randn(self.order)
            else:
                self.h = np.zeros(self.order, dtype=np.complex_ if isinstance(x[0], complex) else np.float32)
        x = np.pad(x, self.L, 'constant', constant_values=(0j if isinstance(x[0], complex) else 0.0))
        for i in range(len(y)):
            r = np.flip(x[i: i + self.order],0)
            symbol = y[i]
            self.h = self.h + mu * (symbol - np.dot(r, self.h)) * r.conj()
    
    def predict(self, x):
        if (self.h is None):
            if (self.random_starts):
                self.h = np.random.randn(self.order) + 1j*np.random.randn(self.order) if isinstance(x[0], complex) else np.random.randn(self.order)
            else:
                self.h = np.zeros(self.order, dtype=np.complex_ if isinstance(x[0], complex) else np.float32)
        return sig.convolve(x, self.h , mode="full")[self.L:]
    
class LeastSquares:
    def __init__(self, init_params={'equalizer_order':None,'random_starts':True, 'learning_rate':0.01, 'steps':1000}):
        if (not init_params['equalizer_order']):
            raise ValueError("LeastSquares: init_params['equalizer_order'] is missing")
        self.order = init_params['equalizer_order']
        if (self.order % 2 == 0):
            raise ValueError("LeastSquares: init_params['equalizer_order'] must be odd")
        if (self.order < 3):
            raise ValueError("LeastSquares: init_params['equalizer_order'] must be at least 3")
        self.h = None
        self.order = init_params['equalizer_order']
        self.random_starts = init_params['random_starts']
        self.L = (self.order-1)//2
        self.steps = init_params['steps']
        self.mu = init_params['learning_rate']
        
    def train(self, x, y, params={}):
        mu = self.mu
        steps = self.steps
        if (steps == 0):
            self.train_closed_form(x, y)
        else:
            if (self.h is None):
                if (self.random_starts):
                    self.h = np.random.randn(self.order) + 1j*np.random.randn(self.order) if isinstance(x[0], complex) else np.random.randn(self.order)
                else:
                    self.h = np.zeros(self.order, dtype=np.complex_ if isinstance(x[0], complex) else np.float32)
            constant = 0j if isinstance(x[0], complex) else 0.0
            A = []
            x = np.pad(x, self.L, 'constant', constant_values=(constant))
            for i in range(len(y)):
                A += [np.flip(x[i: i+self.order],0)]
            A = np.array(A)
            while steps > 0:
                grad_update = np.dot(A.T, np.dot(A, self.h) - y)
                if(grad_update[0]== float('inf') or grad_update[0]== float('-inf')): 
                    break
                self.h = self.h - (mu / len(y)) * grad_update
                steps -= 1

    def train_closed_form(self, x, y):
        constant = 0j if isinstance(x[0], complex) else 0.0
        A = []
        x = np.pad(x, self.L, 'constant', constant_values=(constant))
        for i in range(len(y)):
            A += [np.flip(x[i: i+self.order],0)]
        A = np.array(A)
        h,_,_,_ = np.linalg.lstsq(A, y,rcond=-1)
        self.h = h
    
    def predict(self, x):
        if (self.h is None):
            if (self.random_starts):
                self.h = np.random.randn(self.order) + 1j*np.random.randn(self.order) if isinstance(x[0], complex) else np.random.randn(self.order)
            else:
                self.h = np.zeros(self.order, dtype=np.complex_ if isinstance(x[0], complex) else np.float32)
        return sig.convolve(x, self.h , mode="full")[self.L:]
    
class ZeroForcing:
    def __init__(self, init_params={}):
        self.channel = None
    
    def train(self, x, y, channel_taps):
        self.channel = channel_taps
    
    def predict(self, x):
        if (self.channel is None):
            raise ValueError("ZeroForcing.predict: must train with channel first")
        if(isinstance(x[0], complex)):
            freq_domain = np.fft.fft(x, len(x))/np.fft.fft(self.channel, len(x))
            return np.fft.ifft(freq_domain)[0:len(x) - len(self.channel) + 1]
        else:
            freq_domain = np.fft.fft(x, len(x))/np.fft.fft(self.channel, len(x))
            return np.real(np.fft.ifft(freq_domain)[0:len(x) - len(self.channel) + 1])


In [151]:
LMS = LeastMeanSquares(init_params={'equalizer_order':5,'random_starts':True, 'learning_rate':0.01})
LS = LeastSquares(init_params={'equalizer_order':5,'random_starts':True, 'learning_rate':0.01, 'steps':1000})
ZF = ZeroForcing()

channel_length = 2
preamble_length = 400
data_length = 100

modulation_scheme = 'BPSK'

snr = 10

channel_function = np.random.randn(channel_length) 
channel_function = channel_function / np.linalg.norm(channel_function)

# generate training data
preamble_bits = np.random.randint(0,2, preamble_length) 
train_symbols = modulate(preamble_bits, modulation_scheme)
train_signal = add_awgn_noise(apply_channel(train_symbols, channel_function), snr)
# generate testing data
test_bits = np.random.randint(0,2, data_length)
test_symbols = modulate(test_bits, modulation_scheme)
test_signal = add_awgn_noise(apply_channel(test_symbols, channel_function), snr)
# calculating baseline error (no training)
baseline_bits = modulate(test_signal, modulation_scheme, True)


LMS.train(train_signal, train_symbols)
lms_test_output = LMS.predict(test_signal)

LS.train(train_signal, train_symbols)
ls_test_output = LMS.predict(test_signal)

ZF.train(train_signal, train_symbols, channel_function)
zf_test_output = ZF.predict(test_signal)


b_ber = num_bit_errs(test_bits, baseline_bits)
print("Baseline bit error rate: ", b_ber/data_length)

lms_ber = num_bit_errs(test_bits, modulate(lms_test_output, modulation_scheme, demod=True))
print("Least Mean Squares bit error rate: ", lms_ber/data_length)

ls_ber = num_bit_errs(test_bits, modulate(ls_test_output, modulation_scheme, demod=True))
print("Least Squares bit error rate: ", ls_ber/data_length)

zf_ber = num_bit_errs(test_bits, modulate(zf_test_output, modulation_scheme, demod=True))
print("Zero Forcing bit error rate: ", zf_ber/data_length)

print(channel_function)

Baseline bit error rate:  1.0
Least Mean Squares bit error rate:  0.0
Least Squares bit error rate:  0.0
Zero Forcing bit error rate:  0.0
[-0.99868121 -0.05134035]
