In [21]:
import numpy as np
np.random.seed(5)

def compute_poly(x, m, p, poly_power_limit):
    """
    x : m x 1 complex vector
    PolyW : polynomial layer weights
    m : input dimension
    p : polynomial degree
    poly_power_limit : polynomial power limit

    return : a p x m complex vector
    """
    # out put : a nx1 complex vector

    # polynomial 
    u = np.zeros((p, m), dtype=np.complex128)
    for j in range(p): 
        u[j] = np.power(np.abs(x) * poly_power_limit,2*j) * x 
    return u
def compute_poly_sum(u, PolyW):
    """
    u : p x m complex vector
    PolyW : polynomial layer weights
    return : a m x 1 complex vector
    """
    return np.dot(PolyW, u)

def compute_conv(Us, ConvW):
    """
    Us : k x m  complex vector
    ConvW : convolutional layer weights : k x m x m
    return : a 1 x m complex vector
    """
    k, m, m = ConvW.shape ##
    C = np.zeros((m), dtype=np.complex128)

    for i in range(k):
        C += np.dot(Us[i], ConvW[i])
    return C

In [22]:
### generate complex data 

def generate_data(n_samples, true_coefficients, noise_level=0.1, poly_power_limit=0.05):

    # Generate a random input signal
    k,m, n = true_coefficients[0].shape
    p = true_coefficients[1].shape[0]
    ConvW = true_coefficients[0]
    PolyW = true_coefficients[1]

    x = np.random.randn(n_samples, m) + 1j*np.random.randn(n_samples, m)

    # Step 1: Compute the convolution : in n_samples x m and k x m x n , out n_samples x n
    C = np.zeros((n_samples, n), dtype=np.complex128)
    for i in range(k, n_samples):
        C[i] = compute_conv(x[i-k+1 : i+1], ConvW)
    
    # Step 2: Compute the polynomial features: in n_samples x n , out n_samples x p x nn
    U = np.zeros((n_samples, p, n), dtype=np.complex128)
    for i in range(n_samples):
        U[i] = compute_poly(C[i], n, p, poly_power_limit)
    # Step 3: Compute the polynomial sum : in n_samples x p x n, out n_samples x n
    Us = np.zeros((n_samples, n), dtype=np.complex128)
    for i in range(n_samples):
        Us[i] = compute_poly_sum(U[i], PolyW)

    # Add noise
    # noise = noise_level * np.random.randn(n_samples, n) + 1j*noise_level * np.random.randn(n_samples, n)
    # y += noise
    
    return x, Us


In [23]:
# Example usage
n_samples = 10000

# p : polynomial degree 
# k : influence scope 
# m : output dimension  
# n : input dimension 
p,k,m,n = 3, 2, 3, 2

true_coefficients = [0,1]

### part 1 : poly layer 
true_coefficients[1] = np.random.randn(p) + 1j*np.random.randn(p)
poly_power_limit = 0.05


### part 2 : conv layer k x m x n
true_coefficients[0] = np.random.randn(k, m, n) + 1j*np.random.randn(k, m, n)
noise_level = 0.05


x, y = generate_data(n_samples, true_coefficients, noise_level,poly_power_limit)

print("Input signal:", x[9])
print("Desired signal:", y[9])

Input signal: [2.26 -1.j    0.657-0.353j 0.125+0.952j]
Desired signal: [0.834+1.524j 0.338+0.632j]


In [24]:
def compute_starpart(x, u, p, n ):
    """
    input : x : 1 x n complex vector
            u : p x n complex vector
            p : polynomial degree
            n : output dimension
    return : a p x n complex vector
    """
    U = np.zeros((p, n), dtype=np.complex128)
    for i in range(p):
        TMP1 = np.dot(np.power(np.abs(u[i]), i).T, np.conj(x))
        U[i] = np.dot(TMP1, u[i])
    return U

def compute_nonstarpart(x, u, p, n):
    U = np.zeros((p, n), dtype=np.complex128)
    for i in range(p):
        if(i == 0):
            U[i] = 0
        elif ( i == 1) :
            TMP1 = np.dot(np.power(u[i],2).T, np.conj(x))
            U[i] = TMP1
        else:
            TMP1 = np.power(np.abs(u[i]), i-1).T
            TMP2 = np.dot(np.power(u[i],2).T, np.conj(x))
            U[i] = i * np.dot(TMP1, TMP2)
    return U

In [25]:
## LMS filter

def lms_filter(x, y, k, mu, p, true_coefficients, ):
    Ws = [] 
    m, n = x.shape[1], y.shape[1]

    ConvW = np.random.randn(k, m, n) + 1j*np.random.randn(k, m, n)
    PolyW = np.random.randn(p) + 1j*np.random.randn(p)

    # backdoor 
    # PolyW = true_coefficients[1]  
    # ConvW = true_coefficients[0]

    ## iteration
    for _ in range(epoch):
        for i in range(k, len(x)):
            ### compute the error 
            Cx = compute_conv(x[i-k+1 : i+1], ConvW)
            u =  compute_poly(Cx,n,p,poly_power_limit)
            y_hat = compute_poly_sum(u, PolyW)
            e = y_hat - y[i]
            print ("e_0", e)

            ## update the weights 
            ### update the Conv weights
            ### compute conved x : 1 x n


            for l in range(k): # use one x to calcualte
                ThisX = x[i-k+1+l]

                Star_Part = compute_starpart(Cx, u, p, n) ## p x n
                NonStar_Part = compute_nonstarpart(Cx, u, p, n) ## p x n 
                TmpPolyW = np.array([PolyW for _ in range(p)]) ## p x p
                
                coef1 =  np.dot(np.conj(TmpPolyW) ,Star_Part) ## p x n
                coef1 = np.dot(coef1, e) ## 
                coef2 =  np.dot(TmpPolyW ,NonStar_Part)
                ConvW[l] -= mu * np.outer(ThisX, coef1) + mu * np.outer( ThisX, coef2)
                ## calculate the error
                ## calculate the error
                C = compute_conv(x[i-k+1 : i+1], ConvW)
                u =  compute_poly(C,n,p,poly_power_limit)
                y_hat = compute_poly_sum(u, PolyW)
                e = y_hat - y[i]
                print ("e_1", e)

                
            ## calculate the error
            C = compute_conv(x[i-k+1 : i+1], ConvW)
            u =  compute_poly(C,n,p,poly_power_limit)
            y_hat = compute_poly_sum(u, PolyW)
            e = y_hat - y[i]
            print ("e_1", e)

            ### update the Poly weights
            PolyW -= mu * np.dot(np.conj(u), e.T)
            ## calculate the error
            C = compute_conv(x[i-k+1 : i+1], ConvW)
            u =  compute_poly(C,n,p,poly_power_limit)
            y_hat = compute_poly_sum(u, PolyW)
            e = y_hat - y[i]
            print ("e_2", e)

            Ws.append((ConvW.copy(), PolyW.copy()))
    return  Ws



In [26]:
### Example usage
mu = 0.001
Ws = lms_filter(x, y, k, mu, p, true_coefficients)

## print in 3 decimal places
np.set_printoptions(precision=3)

print("Estimated coefficients:", Ws[0][0])
print("True coefficients:", true_coefficients[0])

print("Estimated coefficients:", Ws[0][1])
print("True coefficients:", true_coefficients[1])

e_0 [ 4.416-3.211j -2.443+3.125j]


ValueError: operands could not be broadcast together with shapes (3,2) (3,6) (3,2) 

In [None]:
print("Estimated coefficients:", Ws[-1][0])
print("True coefficients:", true_coefficients[0])

print("Estimated coefficients:", Ws[-1][1])
print("True coefficients:", true_coefficients[1])

Estimated coefficients: [[[nan+nanj nan+nanj]
  [nan+nanj nan+nanj]
  [nan+nanj nan+nanj]]

 [[nan+nanj nan+nanj]
  [nan+nanj nan+nanj]
  [nan+nanj nan+nanj]]]
True coefficients: [[[-0.909-1.511j -0.592+0.645j]
  [ 0.188-0.981j -0.33 -0.857j]
  [-1.193-0.872j -0.205-0.423j]]

 [[-0.359+0.996j  0.603+0.712j]
  [-1.665+0.059j -0.7  -0.363j]
  [ 1.151+0.003j  1.857-0.106j]]]
Estimated coefficients: [nan+nanj nan+nanj nan+nanj]
True coefficients: [ 0.441-0.252j -0.331+0.11j   2.431+1.582j]
