In [None]:
import numpy as np
import matplotlib.pyplot as plt

import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset

%matplotlib inline
%config InlineBackend.figure_format='svg'

In [None]:
# M, N, L = 180, 180, 1   # these are the values mentioned in the paper
M, N, L = 145, 145, 200
# M = height of HSI
# N = width of HSI
# L = number of spectral bands in HSI

In [None]:
# original image
X = np.random.randn(L, M*N)

# White Gaussian noise
mu = 0     # mean
sigma = 1  # standard deviation
white_gaussian_noise = np.random.normal(mu, sigma**2, size=(L, M*N))

# Sparse noise (drawn from zero-mean Laplace distribution)
sparse_noise = np.random.laplace(loc=0, scale=1, size=(L, M*N))

# Y = X + N + S
Y = X + white_gaussian_noise + sparse_noise

# note both X and Y are from the standard normal distribution

print(f"X's shape: {X.shape}")
print(f"Y's shape: {Y.shape}")

X's shape: (200, 21025)
Y's shape: (200, 21025)


**Do not plot X and Y as they hang your laptop.**

Information in this cell, **do not delete**

<!-- # Ignore this... (just for demonstrating MLE)

Example showing maximum likelihood estimation. But the authors said that it is ill posed.

```python
import numpy as np

# Generate some sample data
np.random.seed(0)
data = np.random.normal(loc=5, scale=2, size=100)

# Define the likelihood function for a normal distribution
def normal_likelihood(data, mu, sigma):
    n = len(data)
    likelihood = (1/(np.sqrt(2*np.pi*sigma**2)))**n * np.exp(-1/(2*sigma**2) * np.sum((data-mu)**2))
    return likelihood

# Define the initial guesses for mu and sigma
mu_0 = 0
sigma_0 = 1

# Define the tolerance level for convergence
tol = 0.01

# Initialize the parameters
mu = mu_0
sigma = sigma_0

# Initialize the change in parameters
change = tol + 1

# Define the learning rate
learning_rate = 0.001

# Perform the maximum likelihood estimation
while change > tol:
    # Compute the likelihood
    likelihood = normal_likelihood(data, mu, sigma)
    
    # Compute the gradient of the likelihood function with respect to mu and sigma
    grad_mu = np.sum(data-mu)/sigma**2
    grad_sigma = -n/(sigma) + np.sum((data-mu)**2)/sigma**3
    
    # Update the parameters
    mu_new = mu + learning_rate*grad_mu
    sigma_new = sigma + learning_rate*grad_sigma
    
    # Compute the change in parameters
    change = np.abs(mu_new - mu) + np.abs(sigma_new - sigma)
    
    # Update the parameters
    mu = mu_new
    sigma = sigma_new

# Print the estimated parameters
print("Estimated mu:", mu)
print("Estimated sigma:", sigma)
``` -->

### About LSMM, Abundance matrix, Endmember matrix

In the Linear Spectral Mixing Model (LSMM), an abundance matrix and an endmember matrix are two important components used to represent and analyze the multi-spectral image data.

- An endmember matrix is a matrix of pure spectral signatures representing the different materials present in the scene. Each column in the endmember matrix corresponds to a single endmember and each row corresponds to a single wavelength or band of the electromagnetic spectrum. The endmember matrix is usually obtained by manually selecting a set of pixels that are known to correspond to a specific material, and then averaging their spectral signatures.

- An abundance matrix is a matrix of mixing coefficients, where each coefficient represents the proportion of a specific endmember in a pixel. Each column in the abundance matrix corresponds to a single pixel, and each row corresponds to a single endmember. The abundance matrix is usually obtained by solving a system of linear equations, where the observed spectral pixel vectors are modeled as a linear combination of the true signals (endmembers), and the mixing coefficients are the unknown parameters that need to be estimated.

In summary, the endmember matrix provides a set of spectral signatures that represent the different materials present in the scene. While, the abundance matrix provides information about the proportions of these materials in each pixel of the image. 

Both matrices are essential for understanding the composition of the scene and for performing further analysis and interpretation of the multi-spectral image data.

### LSMM (Linear Spectral Mixing Model) implementation

1. Load the multi-spectral image data: You can use libraries such as GDAL or rasterio to read the image data into a numpy array.

2. Extract the spectral pixel vectors: For each pixel in the image, extract the reflectance or radiance values for each band and store them as a spectral pixel vector. This can be done using numpy array slicing and indexing.

3. Estimate the mixing coefficients and true signals: There are several algorithms and methods that can be used to perform this estimation, such as linear unmixing, linear regression, and principal component analysis. You can use libraries such as scikit-learn to implement these algorithms.

```python
from sklearn.linear_model import LinearRegression

# Linear unmixing algorithm
def linear_unmixing(data, endmembers):
    """
    data: (N, M) array, where N is the number of pixels and M is the number of bands
    endmembers: (K, M) array, where K is the number of endmembers and M is the number of bands
    """
    model = LinearRegression()
    model.fit(endmembers, data)
    return model.coef_, model.intercept_

# Extract the spectral pixel vectors
data = image.reshape(-1, n_bands)

# Estimate the mixing coefficients and true signals
coef, intercept = linear_unmixing(data, endmembers)
```

It is assummed in model that `X` can be factorized into 2 non-negative matrices.  

Model in TensorFlow:

```python
# Input as specified in the paper
# M, N, L = 180, 180, 1   # these are the values mentioned in the paper
M, N, L = 145, 145, 200
input_shape = (1, M, N, L)
input_layer = Input(shape=input_shape)

# Model starts
model_input = input_layer
# Block 1
x = Conv3D(filters=64, kernel_size=(3,3,1), activation='relu', padding='same', strides=1, 
           dilation_rate=1, name='block1')(model_input)
# Block 2
x = Conv3D(filters=64, kernel_size=(3,3,64), activation='relu', padding='same', strides=1, 
           dilation_rate=2, name='block2')(x)
x = BatchNormalization()(x)
# Block 3
x = Conv3D(filters=64, kernel_size=(3,3,64), activation='relu', padding='same', strides=1,
           dilation_rate=4, name='block3')(x)
x = BatchNormalization()(x)
# Block 4
x = Conv3D(filters=64, kernel_size=(3,3,64), activation='relu', padding='same', strides=1,
           dilation_rate=3, name='block4')(x)
x = BatchNormalization()(x)
# Block 5
x = Conv3D(filters=64, kernel_size=(3,3,64), activation='relu', padding='same', strides=1,
           dilation_rate=4, name='block5')(x)
x = BatchNormalization()(x)
# Block 6
x = Conv3D(filters=64, kernel_size=(3,3,64), activation='relu', padding='same', strides=1,
           dilation_rate=2, name='block6')(x)
x = BatchNormalization()(x)
# Block 7
x = Conv3D(filters=64, kernel_size=(3,3,64), activation='relu', padding='same', strides=1,
           dilation_rate=1, name='block7')(x)
# Skip Connection
x = Concatenate(axis=-1)([x, input_layer])
# x = tf.activation.relu()(x)
# x = BatchNormalization()(x)

# Block 8 -- (not in the paper) I added this so as to match the 
# spectral/channels dimension with the dimensions of the original image
output = Conv3D(filters=200, kernel_size=(3,3,64), activation='relu', padding='same', strides=1, name='block8')(x)
```

Model in PyTorch:

In [None]:
# Network architecture
class MyModel(nn.Module):
    def __init__(self, spectral_dim):
        """
        spectral_dim = number of spectral bands or number of channels in the 
        input image 
        """
        super(MyModel, self).__init__()
        self.block1 = nn.Conv3d(in_channels=spectral_dim, out_channels=64, 
                                kernel_size=(3,3,1), stride=1, padding="same", 
                                dilation=1)
        self.block2 = nn.Conv3d(in_channels=64, out_channels=64, kernel_size=(3,3,64),
                                stride=1, padding="same", dilation=2)
        self.block3 = nn.Conv3d(in_channels=64, out_channels=64, kernel_size=(3,3,64),
                                stride=1, padding="same", dilation=4)
        self.block4 = nn.Conv3d(in_channels=64, out_channels=64, kernel_size=(3,3,64),
                                stride=1, padding="same", dilation=3)
        self.block5 = nn.Conv3d(in_channels=64, out_channels=64, kernel_size=(3,3,64),
                                stride=1, padding="same", dilation=4)
        self.block6 = nn.Conv3d(in_channels=64, out_channels=64, kernel_size=(3,3,64),
                                stride=1, padding="same", dilation=2)
        self.block7 = nn.Conv3d(in_channels=64, out_channels=64, kernel_size=(3,3,64),
                                stride=1, padding="same", dilation=1)
        # block8 is not in the paper - I added it so as to match the spectral/channels 
        # dimension with the dimensions of the original paper
        self.block8 = nn.Conv3d(in_channels=64, out_channels=200, kernel_size=(3,3,64),
                                stride=1, padding="same", dilation=1)
        self.bnorm_64 = nn.BatchNorm2d(num_features=64)
        self.relu = nn.ReLU()

    def forward(self, x):
        x_copy = x.clone()
        x = self.relu(self.block1(x))
        x = self.bnorm_64(self.relu(self.block2(x)))
        x = self.bnorm_64(self.relu(self.block3(x)))
        x = self.bnorm_64(self.relu(self.block4(x)))
        x = self.bnorm_64(self.relu(self.block5(x)))
        x = self.bnorm_64(self.relu(self.block6(x)))
        x = self.relu(self.block7(x))
        x = self.relu(x + x_copy)
        return x

In [None]:
model1 = MyModel(spectral_dim=L)
model1

MyModel(
  (block1): Conv3d(200, 64, kernel_size=(3, 3, 1), stride=(1, 1, 1), padding=same)
  (block2): Conv3d(64, 64, kernel_size=(3, 3, 64), stride=(1, 1, 1), padding=same, dilation=(2, 2, 2))
  (block3): Conv3d(64, 64, kernel_size=(3, 3, 64), stride=(1, 1, 1), padding=same, dilation=(4, 4, 4))
  (block4): Conv3d(64, 64, kernel_size=(3, 3, 64), stride=(1, 1, 1), padding=same, dilation=(3, 3, 3))
  (block5): Conv3d(64, 64, kernel_size=(3, 3, 64), stride=(1, 1, 1), padding=same, dilation=(4, 4, 4))
  (block6): Conv3d(64, 64, kernel_size=(3, 3, 64), stride=(1, 1, 1), padding=same, dilation=(2, 2, 2))
  (block7): Conv3d(64, 64, kernel_size=(3, 3, 64), stride=(1, 1, 1), padding=same)
  (block8): Conv3d(64, 200, kernel_size=(3, 3, 64), stride=(1, 1, 1), padding=same)
  (bnorm_64): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
  (relu): ReLU()
)

In [None]:
# batch_dim, channels, height, width
X_tensor = torch.from_numpy(X).reshape(1, L, M, N)
Y_tensor = torch.from_numpy(Y).reshape(1, L, M, N)
print(X_tensor.shape, Y_tensor.shape, sep='\n')

torch.Size([1, 200, 145, 145])
torch.Size([1, 200, 145, 145])


In [None]:
# # create a new tensor to fit the input requirements of the model
# model1(X_tensor)
# model_output = model1(X_tensor)

# HSI Denoising with NMF-DPR Algorithm

First we initialize E and A, which are endmember and abundance matrices, respectively. Now, there are 5 steps inside a while loop:
1. Update V using eqn. 12
2. Update A using eqn. 19
3. Update S using eqn. 9
4. Update U using eqn. 10
5. Update E using eqn. 11

Output is X = EA, which is denoised image.

**initialization E and A**

- First use above mentioned model (which uses spatial correlation for image denoising) to get initial denoised image Y
- Then use "A variable splitting augmented Lagrangian approach to linear spectral unmixing" method (which uses spectral correlation for image denoising) on the basis of this Y to get E and A. This is rough denoising-preprocessing and is not as good as proposed approach but is very time-efficient, so for a start, we'll use this.



In [None]:
import numpy as np

def linear_spectral_unmixing(Y, E_init, A_init, lambda_init=0.1, max_iter=100, tol=1e-4):
    """
    Linear spectral unmixing using the variable splitting augmented Lagrangian approach.
    :param Y: mixed pixels matrix (m x n)
    :param E_init: initial endmembers matrix (m x k)
    :param A_init: initial abundance fractions matrix (k x n)
    :param lambda_init: initial value of the Lagrange multiplier
    :param max_iter: maximum number of iterations
    :param tol: tolerance for convergence
    :return: estimated endmembers matrix (m x k) and abundance fractions matrix (k x n)
    """
    # Define variables
    E = E_init.copy()
    A = A_init.copy()
    lambda_val = lambda_init

    # Start iterations
    for i in range(max_iter):
        # Update endmembers
        E_prev = E.copy()
        A_T = A.T
        A_T_A = np.dot(A_T, A)
        A_T_Y = np.dot(A_T, Y)
        E = np.dot(A_T_Y, np.linalg.inv(A_T_A + lambda_val * np.eye(A.shape[0])))

        # Update abundance fractions
        A_prev = A.copy()
        Y_E = np.dot(Y, E.T)
        E_E = np.dot(E, E.T)
        A = A_prev * (Y_E / (E_E + lambda_val * np.eye(E.shape[1])))
        A = A / np.sum(A, axis=0)
        A[A < 0] = 0

        # Update lambda
        lambda_prev = lambda_val
        lambda_val = lambda_val * (np.linalg.norm(Y - np.dot(E, A)) / np.linalg.norm(Y - np.dot(E_prev, A_prev)))

        # Check for convergence
        if np.linalg.norm(E - E_prev) < tol and np.linalg.norm(A - A_prev) < tol and abs(lambda_val - lambda_prev) < tol:
            break

    return E, A

In [None]:
# to get initial endmembers and abundance matrix of X, we'll use 
# Non-negative matrix factorization will decompose X, we assume it to be
# non-negative, factor it into the product of two other non-negative matrices, 
# an endmember matrix and an abundance matrix.

from sklearn.decomposition import NMF

# Number of endmembers
n_endmembers = 3

# Create an NMF model
nmf_model = NMF(n_components=n_endmembers, init='random', random_state=0)

# let's create a dummy X matrix
demo_X = np.random.rand(2,5)

# Fit the model to the data
E_init = nmf_model.fit_transform(demo_X)
A_init = nmf_model.components_

# the product of E and A will give a rough approximate of X

In [None]:
print(E_init, A_init, sep='\n\n\n')

[[0.30675988 0.         1.18686128]
 [0.24928935 1.47260379 0.32589397]]


[[0.75950885 0.         1.09125601 0.30748747 0.27245271]
 [0.40922849 0.40536429 0.         0.02090917 0.38203944]
 [0.00267827 0.47574103 0.54685045 0.09147712 0.        ]]


**condition of while loop**

In [None]:
E_old = nmf_model.fit_transform(model_output)
A_old = nmf_model.components_

ValueError: ignored

In [None]:
def frob_norm(matrix):
    """
    We calculate the frobenius norm, which is simply the "Euclidean" distance 
    between all the elements of a matrix. 
    It is defined as the square root of the sum of the squares of all elements 
    in the matrix
    """
    return np.linalg.norm(matrix, "fro")
    # alternate implementation
    # return np.sqrt(np.sum(np.square(matrix)))

matrix = A_new*E_new - A_old*E_old
T_iter = 1  # assuming this means the number of epochs
condition_part1 = frob_norm(matrix) / np.sqrt(L*M*N) < 1e-6
condition_part2 = T_iter <= 150
condition = condition_part1 and condition_part2

**eqn 7**

In [3]:
from scipy.optimize import minimize

# Define the log-posterior function
def log_posterior(params, x, y):
    intercept, slope = params
    sigma = 1  # Assume known constant noise level
    log_likelihood = -0.5 * np.sum((y - (intercept + slope * x))**2 / sigma**2 + np.log(2 * np.pi * sigma**2))
    log_prior = -0.5 * (intercept**2 + slope**2)
    return -(log_likelihood + log_prior)

# Define the data
x = np.array([1, 2, 3, 4, 5])
y = np.array([2, 3, 4, 5, 6])

# Find the maximum of the log-posterior using the minimize function
result = minimize(log_posterior, x0=[0, 0], args=(x, y))

# Extract the MAP estimates for the intercept and slope
intercept_MAP, slope_MAP = result.x

print("MAP estimates for the intercept and slope:", intercept_MAP, slope_MAP)

MAP estimates for the intercept and slope: 0.6306305044599195 1.0810811139869003


In this example, we defined the negative logarithm of the posterior function as the function to be minimized, and passed the starting point for the optimization (`[0, 0]`), and the data (`x, y`) as additional arguments to the `minimize` function. The function returns a `OptimizeResult` object, which contains the MAP estimates for the parameters in the `x` attribute.

Note that the only difference between the MAP estimation using `minimize` function instead of `maximize` function is the negative sign that we added in the definition of the log-posterior function, since the `minimize` function finds the minimum of a scalar function, while we want to find the maximum of the log-posterior.

It's also worth mentioning that Scipy library also offers optimization functions such as `minimize_scalar` that can be used to perform similar optimization tasks with scalar functions.

In summary, using the `minimize` function from the `scipy.optimize` module can be a simple way to perform MAP estimation, as it abstracts away some of the implementation details and allows you to focus on defining the log-posterior function.

In [2]:
from scipy.optimize import minimize

# define the function to be minimized on E, A, S
def some_function(x):
    # x[0] is E; x[1] is A; x[2] is S
    sigma = 1
    T = 
    return 0.5 * frob_norm(Y - x[0]*x[1]  - x[2])**2 +  

# initial guess for E, A, S
x0 = np.array([1,2,3])

# find the values of E, A, S that give minimum value of some_function
minimize(some_function, x0)

**eqn 8 - alternate way of solving eqn 7**

**Ignore this -- see the next code cell**

In this example, `A` is the low-rank matrix, and `tau` is the regularization parameter that controls the strength of the shrinkage. The function `shrinkage` first performs a singular value decomposition (SVD) of the low-rank matrix, then shrinks the singular values by subtracting `tau` and setting negative values to zero. Finally, it recompose the matrix by taking the dot product of the left and right singular vectors with the modified singular value matrix.

The output of the function `shrinkage` is the shrunk low-rank matrix. This term can be added to the optimization problem of TV-LRTF to improve the restoration results.

It's worth mentioning that this is a simple example, in practice the shrinkage operator is often used in conjunction with other regularization terms and methods, such as total variation and sparsity constraints.

```python
# Define the shrinkage operator
def shrinkage(A=np.random.rand(256, 256), tau=0.1):
    # A is low-rank matrix. eg: A = np.random.rand(256, 256)
    # tau is regularization parameter. eg: tau = 0.1
    U, S, V = np.linalg.svd(A, full_matrices=False)
    S = np.maximum(S - tau, 0)
    return np.dot(U, np.dot(np.diag(S), V))

# Define the low-rank matrix
A = np.random.rand(256, 256)

# Define the regularization parameter
tau = 0.1

# Apply the shrinkage operator to the low-rank matrix
A_shrunk = shrinkage(A, tau)
```

**see this**

In [6]:
# sign function means signum function
threshold = 0.1
P_threshold = lambda x: np.sign(x) * np.max(np.abs(x) - threshold, 0)  # eqn 8
P_threshold

<function __main__.<lambda>(x)>