In [30]:
def get_ith_patch(large_shape, patch_shape, i):
    # How many patches can we fit in this large array?
    patch_index_shape = np.array(large_shape) - np.array(patch_shape) + 1

    # What are the coordinates of the starting pixel of the ith patch?
    patch_index = np.unravel_index(i, patch_index_shape)

    # Get the indices for the pixels in the ith patch
    patch_indices = tuple(slice(start, start + size) for start, size in zip(patch_index, patch_shape))

    return patch_indices

def get_num_patches(large_shape, patch_shape):
    patch_index_shape = np.array(large_shape) - np.array(patch_shape) + 1
    return patch_index_shape.prod()


In [0]:
class Sampler:

    def __init__(self):
        #self.seed = 1 # A random seed which will be updated after every use. It is there to ensure reproducibility
        self.paths = [] # A list of the paths where original images/signals are stored

    def add_path(self, path):
        self.paths.append(path)

    def set_paths(self, paths):
        self.paths = paths

In [63]:

#def signal_to_vec(self, signal):

#def vec_to_signal(self, vec):

def sample(self, N):

    # Divide the number of samples requested evenly amongst each original full image
    j = 0
    r = N % len(self.paths)
    d = self.patch_shape.prod()
    Y = np.zeros([d, N], dtype=np.uint8)

    for i, path in enumerate(self.paths):
        img = cv2.imread(path)
        large_shape = img.shape

        for _ in range(N // len(self.paths)):

            num_patches = get_num_patches(large_shape, self.patch_shape)
            patch = get_ith_patch(large_shape, self.patch_shape, random.randint(0, num_patches))
            Y[:, j] = img[patch].flatten().ravel()
            j+=1
        if i<r:
            num_patches = get_num_patches(large_shape, self.patch_shape)
            patch = get_ith_patch(large_shape, self.patch_shape, random.randint(0, num_patches))
            Y[:, j] = img[patch]
            j+=1

    return Y

def sample_noise(self, N, add_noise=False, noise_std=10):
    """
    Randomly sample patches from images in self.paths, with the option to add Gaussian noise.
    Args:
        N: int, number of patches to sample
        add_noise: bool, whether to add Gaussian noise to the patches
        noise_std: float, standard deviation of the Gaussian noise
    Returns:
        Y: np.array, of shape (d, N), where d is the product of the patch dimensions, containing the sampled patches
    """

    # Get the product of the patch dimensions
    d = self.patch_shape.prod()

    # Initialize an array to hold the sampled patches
    Y = np.zeros([d, N], dtype=np.uint8)

    def get_sample(img, add_noise):
        """
        Helper function to get a single patch sample from an image.
        Args:
            img: np.array, representing an image
            add_noise: bool, whether to add Gaussian noise to the patch
        Returns:
            sample: np.array, of shape (d,), representing the sampled patch
        """

        # Get a random patch from the image
        patch = get_ith_patch(large_shape, self.patch_shape, random.randint(0, num_patches))
        sample = img[patch].flatten()

        # Add Gaussian noise to the patch, if specified
        if add_noise:
            noise = np.zeros_like(sample)
            cv2.randn(noise, 0, noise_std)
            sample = np.clip(sample + noise, 0, 255).astype(np.uint8)

        return sample

    # Divide the number of samples requested evenly amongst each original full image
    j = 0
    r = N % len(self.paths)

    for i, path in enumerate(self.paths):
        # Read in the image and get its shape
        img = cv2.imread(path)
        large_shape = img.shape

        # Get the number of patches that can be extracted from the image
        num_patches = get_num_patches(large_shape, self.patch_shape)

        # Sample patches from the image
        for _ in range(N // len(self.paths)):
            sample = get_sample(img, add_noise)
            Y[:, j] = sample
            j += 1

        # If there are any remaining samples, sample them from the current image
        if i < r:
            sample = get_sample(img, add_noise)
            Y[:, j] = sample
            j += 1

    return Y


Sampler.sample = sample
Sampler.sample_noise = sample_noise


#def add_noise(self, signal):

#def drop_pixels(self, signal):




In [91]:
sam = Sampler()
sam.add_path('dinner.jpg')
sam.patch_shape = np.array([1000,1000,3])
Y = sam.sample_noise(3, add_noise=True, noise_std=1000)
cv2.imshow('test', Y[:,0].reshape(sam.patch_shape))
cv2.waitKey(0)
cv2.destroyAllWindows()

In [98]:
import numpy as np

def find_sparse_rep_MP(Y, D, L):
    """
    :param Y: This is a (d x N) matrix representing the N different d-dimensional given signals.
    :param D: This is a (d x K) matrix representing the dictionary of K different atoms, where the atoms are d-dimensional vectors. Each column vector must have already been normalized.
    :param L: This is an integer satisfying 0 < L <= K representing the maximum number of atoms which can be used in a sparse representation.

    Runs in O( N K L ) time.

    Note that we need N > K > d >= L

    :return: A, the (K x N) matrix of the N different K-dimensional sparse representations of the columns of Y.
    """

    (d, N) = Y.shape
    (d1, K) = D.shape
    assert d == d1 # Make sure the dimensions match up

    A = np.zeros((K, N)) # Get our Sparse Representation Matrix

    for j in range(N):  # Iterate over all of the N given signal vectors Y[:,j].

        alpha = np.zeros(K)  # Initialize the sparse representation vector (will be a column vector in A)
        r_vec = Y[:, j] - np.dot(D, alpha)  # Initialize the "residual" vector

        for i in range(L): # Repeat until we utilize L atoms, or no more are needed
            position_coeff_error = []

            for k in range(K): # Find the best atom, D[:,k]

                atom = D[:, k]

                # Project the residual vector r_vec down to the linear subspace defined by the atom
                coeff = np.inner(r_vec, atom)
                error = np.linalg.norm(r_vec - coeff * atom)
                position_coeff_error.append((k, coeff, error))

            position, coeff, error = min(position_coeff_error, key=lambda x: x[2]) # Find the atom whose linear subspace is closest to the residual vector r_vec

            if np.abs(coeff) < 1e-6: #If the coefficient is too small, we don't add it and instead end the iteration
                break

            else:
                alpha[position] += coeff # Update the sparse representation vector, alpha
                r_vec -= coeff * D[:, position] # Update the residual vector, r_vec

        A[:, j] = alpha # Insert the sparse representation vector alpha into the matrix A

    return A


In [99]:
def find_sparse_rep_OMP(Y, D, L):
    """
    Find the sparse representation of the given signals Y over the dictionary D using the Orthogonal Matching Pursuit
    algorithm.
    Runs in O( N K L^2 ) time.

    :param Y: A (d x N) matrix representing the N different d-dimensional given signals.
    :param D: A (d x K) matrix representing the dictionary of K different atoms, where the atoms are d-dimensional
    vectors. Each column vector must have already been normalized.
    :param L: An integer representing the maximum number of atoms which can be used in a sparse representation.

    Note that we need N > K > d >= L

    :return: A, a (K x N) matrix of the N different K-dimensional sparse representations of the columns of Y.
    """

    # Get the shapes of the input matrices
    (d, N) = Y.shape
    (d1, K) = D.shape

    # Ensure that the dimensions match up
    assert d == d1

    # Initialize the sparse representation matrix A
    A = np.zeros((K, N))

    # Iterate over all of the N given signal vectors Y[:,j].
    for j in range(N):

        # Initialize the set of indices for the selected atoms
        idx_set = set()

        # Repeat until we utilize L atoms, or no more are needed
        for i in range(L):

            # Find the remaining unused atoms
            remaining_atoms = set(range(K)).difference(idx_set)

            # Initialize a list to store the coefficients and errors for each candidate atom
            position_coeff_error = []

            # Iterate over the remaining unused atoms and calculate the projection error for each
            for k in remaining_atoms:
                # Create the subspace basis from the remaining unused atoms plus the current candidate atom
                subspace_basis = D[:, list(idx_set) + [k]]

                # Solve for the coefficients of the projection
                coeff = np.linalg.lstsq(subspace_basis, Y[:, j], rcond=None)[0]

                # Calculate the projected vector
                projected = np.dot(subspace_basis, coeff)

                # Calculate the error between the original signal and the projection
                error = np.linalg.norm(Y[:, j] - projected)

                # Store the position, coefficients, and error in the list
                position_coeff_error.append((k, coeff, error))

            # Select the candidate atom with the minimum projection error
            position, coeff, error = min(position_coeff_error, key=lambda x: x[2])

            # Add the selected atom to the set of indices for the selected atoms
            idx_set.add(position)

        # Create the final subspace basis from the selected atoms
        subspace_basis = D[:, list(idx_set)]

        # Solve for the coefficients of the sparse representation using the selected atoms
        coeff = np.linalg.lstsq(subspace_basis, Y[:, j], rcond=None)[0]

        # Initialize the sparse representation vector alpha
        alpha = np.zeros(K)

        # Insert the computed coefficients into the sparse representation vector alpha
        for position, index in enumerate(idx_set):
            alpha[index] = coeff[position]

        # Insert the sparse representation vector alpha into the matrix A
        A[:, j] = alpha

        # Return the matrix of sparse representations
    return A


In [107]:
class DictionaryLearner:

    def __init__(self, L, K):
        self.L = L # The maximum number of atoms a sparse representation can use
        self.K = K # The size of the dictionary
        self.Dictionary = None # The initial guesses for the dictionary

    def set_initial_dictionary(self, D):
        self.Dictionary = D

    def select_algorithm(self, algo):
        if algo == 'MP':
            self.sparse_rep = find_sparse_rep_MP
        elif algo == 'OMP':
            self.sparse_rep = sparse_rep = find_sparse_rep_OMP





In [108]:
def update_dictionary_kSVD(Y, D, A):
    """
    Update the dictionary using the k-SVD algorithm. This

    :param Y: This is the (d x N) matrix representing the N different d-dimensional given signals.
    :param D: This is the (d x K) matrix representing the dictionary of K different atoms, where the atoms are d-dimensional
    vectors. Each column vector must have already been normalized.
    :param A: This is the (K x N) matrix of the N different K-dimensional sparse representations of the columns of Y.

    :return: (D, A), where D is updated and optimized to the given  a (K x N) matrix of the N different K-dimensional sparse representations of the columns of Y.
    """

    # Get the shapes of the input matrices
    (d, N) = Y.shape
    (d1, K) = D.shape
    (K1, N1) = A.shape

    # Ensure that the dimensions match up
    assert d == d1
    assert K == K1
    assert N == N1

    # Iterate over every atom in the dictionary
    for k in range(K):
        # Find the signal vectors, Y[:,j], whose sparse representation, A[:,j], have a non-zero entry in the k^th position. That is, they use the k^th atom.
        non_zero_indices = np.nonzero(A[k, :])[0]

        # Get the k^th "error matrix"
        E = Y - np.dot(D, A) + np.outer(D[:, k], A[k, :])

        # Restrict the matrix to only those non-zero values. The resulting matrix should be KxL
        E = E[:, non_zero_indices]

        # Do the SVD (Singular Value Decomposition) step to the KxL matrix E
        U, S, V = np.linalg.svd(E, full_matrices=False)

        # Update the k^th atom, D[:, k], and the k^th coefficients in the sparse representation, A[k, :].
        # Note: The k-SVD algorithm also converges when run in parallel, only updating the matrix D at the end. However running the algorithm in series, updating the atoms and coefficients after each step, produces more robust results and typically requires more than four times as long to converge.
        D[:, k] = U[:, 0]
        A[k, non_zero_indices] = S[0] * V[0, :]

    return (D, A)

def update_dictionary(self, Y, A):
    return update_dictionary_kSVD(Y, self.D, A)

DictionaryLearner.update_dictionary = update_dictionary



In [109]:
learner = DictionaryLearner(5, 10)
learner.select_algorithm('MP')
learner.update_dictionary()

TypeError: update_dictionary() missing 2 required positional arguments: 'Y' and 'A'

In [None]:
def sparse_dictionary_learning(Y, K, L, iters=10, D_initial=None, algo='OMP', samples=100, with_errors=False):
    """
    This algorithm finds a (d x K) matrix D (the dictionary) and a (K x N) matrix A (the sparse representation) which minimise the L2 distance between Y and D A, ie, minimise ||Y - D A ||, subject to the constraint that each column of A has at most L non-zero elements.

    :param Y: This is the (d x N) matrix representing the N different d-dimensional given signals.
    :param K: An integer representing the size of the dictionary.
    :param L: An integer representing the maximum number of "atoms", D[:, k], in the dictionary that each sparse representation vector, A[:, i], can use.

    Note: This algorithm is written under the assumption that: 0 < L < d < K < N

    :param D_initial: This is the initial guess for the (d x N) matrix D. If not None, the columns of this matrix must be unit length.
    :param algo: This is a string defining the sparse representation algorithm. Either algo = 'OMP' for Orhtogonal Matching Pursuit, or algo = 'MP' for Matching Pursuit.
    :param iters: The number of iterations this will run for
    :param with_errors: A boolean which determines if the output includes the list of the error values at each step of the iteration.
    :param samples: This tells us the number of random samples to take from the training data Y at each step

    :return: (D, A, errors)
        D: This is the (d x K) matrix representing the dictionary of K different atoms, where the atoms are d-dimensional
    vectors.
        A: This is the (K x N) matrix of the N different K-dimensional sparse representations of the columns of Y.
        errors: This is an optional output. It is the list of the error values at each step of the iteration.
    """

    Y_full = Y

    # Get Initial D
    if D_initial == None:
        D = Y[:, random.sample(range(N), k=K)]
        D = D / np.linalg.norm(D, axis=0)

    # Get the correct algorithm
    if algo == 'OMP':
        sparse_rep = find_sparse_rep_OMP
    elif algo == 'MP':
        sparse_rep = find_sparse_rep

    # Initialize the list of error values
    errors = []

    for step in range(iters):
        Y = Y_full[:, random.sample(range(len(Y[0])), k=samples)]
        A = sparse_rep(Y, D, L)
        D = update_dictionary_kSVD(Y, D, A)

        if with_errors:
            errors.append(np.linalg.norm(Y - np.dot(D, A)))

    A = sparse_rep(Y_full, D, L)

    if with_errors:
        return (D, A, errors)
    else:
        return (D, A)

In [None]:

def dictionary_learning(self, Y, )

In [103]:
learner = DictionaryLearner(5, 10)
learner.select_algorithm('MP')
learner.sparse_rep(Y, D, L)

NameError: name 'D' is not defined

In [8]:
import numpy as np

np.array([1,2,3]).prod()

6

In [18]:
import random
random.randint(0,10)

7

In [19]:
2

2

In [55]:
np.random.normal(0, 0.1, np.array([8,8]))

array([[-0.05992862, -0.01217082, -0.01924014,  0.0360319 , -0.00776401,
        -0.000524  ,  0.14573281,  0.07641563],
       [-0.18572422,  0.14079179, -0.15667047,  0.2305574 ,  0.04327394,
         0.03192766, -0.03948167,  0.15574063],
       [-0.18094431,  0.03454035,  0.05829101, -0.04971573, -0.02696382,
        -0.13489656,  0.09165429, -0.09473357],
       [-0.07280259, -0.0614746 , -0.0841094 , -0.10062964,  0.03619999,
        -0.07039527,  0.01951222,  0.04212763],
       [-0.01718095, -0.02416182,  0.14357363, -0.06101194, -0.0752874 ,
         0.05919809,  0.18025043, -0.06981907],
       [ 0.04946283, -0.00567512, -0.03728039, -0.01652367, -0.06759748,
        -0.02250607, -0.03298199, -0.02765604],
       [ 0.06038713, -0.06817364,  0.05721233,  0.06235531,  0.09877378,
         0.06874567,  0.0978708 ,  0.12555003],
       [ 0.07603313, -0.04014293, -0.1132591 ,  0.18838865, -0.01803408,
         0.15091138,  0.0526451 , -0.10075915]])

In [56]:
img

NameError: name 'img' is not defined