In [40]:
import glob
import numpy as np
from numpy import savetxt
from numpy import loadtxt
import tifffile as tif
from scipy import signal
import cv2
from scipy.optimize import minimize

backprojection.py

In [41]:
def gaussian_kernel(size, sigma=None):
    if sigma == None:
        sigma = size // 6
    center = size // 2
    kernel = np.zeros((size, size))
    for i in np.arange(size):
        for j in np.arange(size):
            diff2 = (i-center)**2 + (j-center)**2
            kernel[i,j] = np.exp(-diff2/sigma**2/2)
    return kernel/np.sum(kernel)

def backprojection(high_resolution_img, low_resolution_img, max_iters):
    row_h, col_h = high_resolution_img.shape
    row_l, col_l = low_resolution_img.shape
    p = gaussian_kernel(5,1)
    p = p**2
    p = p/np.sum(p)

    high_resolution_img = high_resolution_img.astype(float)
    low_resolution_img  = low_resolution_img.astype(float)

    for i in np.arange(max_iters):
        temp = cv2.resize(high_resolution_img, (row_l, col_l), interpolation=cv2.INTER_CUBIC)
        image_diff = low_resolution_img - temp

        image_diff = cv2.resize(image_diff, (row_h, col_h), interpolation=cv2.INTER_CUBIC)
        high_resolution_img = high_resolution_img + signal.convolve2d(image_diff, p, 'same')
    return high_resolution_img


extract_low_resolution_features.py

In [42]:
def extract_low_resolution_features(low_resolution_img):
    row, col = low_resolution_img.shape
    features = np.zeros((row,col,4))
    # first order gradient filters
    hf1 = np.array([-1,0,1]).reshape(1,-1)
    vf1 = hf1.T
    features[:,:,0] = signal.convolve2d(low_resolution_img, hf1, 'same')
    features[:,:,1] = signal.convolve2d(low_resolution_img, vf1, 'same')

    # second order gradient filters
    hf2 = np.array([1,0,-2,0,1]).reshape(1,-1)
    vf2 = hf2.T
    features[:,:,2] = signal.convolve2d(low_resolution_img, hf2, 'same')
    features[:,:,3] = signal.convolve2d(low_resolution_img, vf2, 'same')
    return features

get_compact_X.py

In [151]:
def get_compact_X(img_path, num_basis = 512, labda = 0.15, patch_size = 5, num_patches = 10000, upscale = 2, threshold = 10):
    # randomly sample image patches
    X_high, X_low = rnd_smp_patch(img_path, '*.tif', patch_size, num_patches, upscale)

    # prune patches wiith small variances
    # threshold chosen based on the training data
    X_high, X_low = patch_pruning(X_high, X_low, threshold)

    # train coupled sparse coding dictionary
    # D_high, D_low = train_coupled_dict(X_high, X_low, num_basis, labda, upscale)
    X = train_coupled_dict_before_opt(X_high, X_low, num_basis, labda, upscale)

    # save data
    save_path = 'Training/compact'+'_'+str(num_basis)+'_'\
        +str(labda)+'_'+str(upscale)+'_'
    savetxt(save_path+'X', X, delimiter=' ')

    return X

img_super_resolution.py

In [230]:
def lin_scale(hPatch, mNorm):
    hNorm = np.linalg.norm(hPatch)
    if hNorm > 0:
        scale = 1.2 * mNorm / hNorm
        hPatch = scale * hPatch
    return hPatch

def normalize_image(img, L):
    _min, _max = img.min(), img.max()
    img = ((img-_min)/(_max-_min) * (L-1)).astype(np.uint8)
    return img

def img_super_resolution(low_resolution_img, upscale, D_high, D_low, labda, overlap):
    # normalize the dictionary
    D_high = D_high/np.sqrt(np.sum(D_high**2, axis=0))
    D_low  = D_low/np.sqrt(np.sum(D_low**2, axis=0))

    # get patch_size
    patch_size = int(np.sqrt(D_high.shape[0]))

    # bicubic interpolation of the low_resolution_img
    m, n = low_resolution_img.shape
    medium_resolution_img = cv2.resize(low_resolution_img, \
        (m*upscale, n*upscale), interpolation=cv2.INTER_CUBIC)
    M, N = medium_resolution_img.shape

    # initialize high_resolution_img
    high_resolution_img = np.zeros(medium_resolution_img.shape)
    count_map = np.zeros(medium_resolution_img.shape)

    # extract low_resolution_img features
    features = extract_low_resolution_features(medium_resolution_img)

    # patch index for sparse recovery, avoid boundary
    p = patch_size//2
    gridx = np.array(list(range(p,M-patch_size-p,patch_size-overlap))+[M-patch_size-p])
    gridy = np.array(list(range(p,N-patch_size-p,patch_size-overlap))+[N-patch_size-p])

    A = D_low.T @ D_low

    # loop to recover each high resolution patch
    for i in np.arange(len(gridx)):
        for j in np.arange(len(gridx)):
            patch_idx = i*len(gridy)+j
            #print('current patch_idx is {}'.format(patch_idx))
            xx = gridx[i]
            yy = gridy[j]

            # column feature
            mPatch = medium_resolution_img[xx:xx+patch_size, yy:yy+patch_size].reshape(-1)
            mMean  = np.mean(mPatch)
            mPatch = mPatch - mMean
            mNorm  = np.linalg.norm(mPatch)

            mPatchFea = features[xx:xx+patch_size, yy:yy+patch_size, :].reshape(-1)
            mPatchFea = mPatchFea - np.mean(mPatchFea)
            mFeaNorm  = np.linalg.norm(mPatchFea)

            y = mPatchFea / mNorm if mFeaNorm > 1 else mPatchFea
            b = -D_low.T @ y

            # solve for sparce coefficient using feature sign
            w = L1_FeatureSign(labda, A, b)
            #print(len(np.where(w!=0)[0]))

            # recover high resolution patch and scale the contrast
            hPatch = D_high @ w
            hPatch = lin_scale(hPatch, mNorm)
            hPatch = hPatch.reshape(patch_size, patch_size)
            hPatch = hPatch + mMean

            high_resolution_img[xx:xx+patch_size, yy:yy+patch_size] += hPatch
            count_map[xx:xx+patch_size, yy:yy+patch_size] += 1

    # fill in the empty with bicubic interpolation
    plt.subplot(1,2,1)
    plt.imshow(count_map)
    plt.subplot(1,2,2)
    plt.imshow(high_resolution_img)
    high_resolution_img[count_map < 1] = medium_resolution_img[count_map < 1]
    count_map[count_map < 1 ] = 1
    high_resolution_img = high_resolution_img / count_map
    high_resolution_img = normalize_image(high_resolution_img, 256)

    return high_resolution_img


rnd_smp_patch.py

In [48]:
def normalize_image(img, L):
    _min, _max = img.min(), img.max()
    img = ((img-_min)/(_max-_min) * (L-1)).astype(np.uint8)
    return img

def sample_patches(img, patch_size, num_copies, upscale):
    high_resolution_img = img
    high_resolution_img = high_resolution_img.astype(float)
    m, n = high_resolution_img.shape
    # generate low resolution counterparts
    low_resolution_img  = cv2.resize(\
        high_resolution_img, (m//upscale, n//upscale),interpolation=cv2.INTER_CUBIC)
    low_resolution_img  = cv2.resize(\
        low_resolution_img, (m, n), interpolation=cv2.INTER_CUBIC)
    low_resolution_img = low_resolution_img.astype(float)

    x = np.arange(m-2*patch_size)+patch_size
    y = np.arange(n-2*patch_size)+patch_size
    np.random.shuffle(x)
    np.random.shuffle(y)

    X, Y = np.meshgrid(x,y)
    xrow, ycol = X.reshape(-1), Y.reshape(-1)
    if num_copies < len(xrow):
        xrow = xrow[:num_copies]
        ycol = ycol[:num_copies]
    else:
        num_copies = len(xrow)

    # initialize output
    x_high = np.zeros((patch_size**2, num_copies))
    x_low  = np.zeros((4*patch_size**2, num_copies))

    # compute the first and second order gradients
    hf1 = np.array([-1,0,1]).reshape(1,-1)
    vf1 = hf1.T
    hf2 = np.array([1,0,-2,0,1]).reshape(1,-1)
    vf2 = hf2.T

    # get low_resolution_img features
    low_resolution_feature1 = signal.convolve2d(low_resolution_img, hf1, 'same')
    low_resolution_feature2 = signal.convolve2d(low_resolution_img, vf1, 'same')
    low_resolution_feature3 = signal.convolve2d(low_resolution_img, hf2, 'same')
    low_resolution_feature4 = signal.convolve2d(low_resolution_img, vf2, 'same')

    # collect patches from sample
    for i in np.arange(num_copies):
        row, col = xrow[i], ycol[i]
        Hpatch  = high_resolution_img[row:row+patch_size, col:col+patch_size].reshape(-1)
        Lpatch1 = low_resolution_feature1[row:row+patch_size, col:col+patch_size].reshape(-1)
        Lpatch2 = low_resolution_feature2[row:row+patch_size, col:col+patch_size].reshape(-1)
        Lpatch3 = low_resolution_feature3[row:row+patch_size, col:col+patch_size].reshape(-1)
        Lpatch4 = low_resolution_feature4[row:row+patch_size, col:col+patch_size].reshape(-1)
        Lpatch  = np.concatenate([Lpatch1,Lpatch2,Lpatch3,Lpatch4],axis=0)
        x_high[:,i] = Hpatch-np.mean(Hpatch)
        x_low[:,i]  = Lpatch

    return x_high, x_low

def rnd_smp_patch(img_path, type, patch_size, num_patches, upscale):
    # get all training images name
    img_list = glob.glob(img_path+type) # type = '*.tif'
    # get total number of images being considered
    img_num = len(img_list)
    # initialize number of copies for each image
    # depends on its size
    num_copies_img = np.zeros(img_num)

    # read images and determine number of copies for each image
    # this number is proportional to total number of patches
    for i in np.arange(img_num):
        img = tif.imread(img_list[i])
        num_copies_img[i] = np.prod(img.shape)
    num_copies_img = np.floor(num_copies_img*num_patches/np.sum(num_copies_img)).astype(np.int)

    # initialize output
    X_high = []
    X_low  = []

    for i in np.arange(img_num):
        num_copies = num_copies_img[i]
        img = tif.imread(img_list[i])
        img = normalize_image(img, 256)
        x_high, x_low = sample_patches(img, patch_size, num_copies, upscale)
        X_high.append(x_high)
        X_low.append(x_low)

    # assemble a numpy ndarray
    X_high = np.concatenate(X_high, axis=1)
    X_low  = np.concatenate(X_low, axis=1)

    # save data
    save_path = 'Training/rnd_patches'+str(patch_size)+'_'+str(upscale)+'_'
    savetxt(save_path+'X_high.csv', X_high, delimiter=' ')
    savetxt(save_path+'X_low.csv', X_low, delimiter=' ')

    return X_high, X_low

def patch_pruning(X_high, X_low, threshold):
    vars = np.var(X_high, axis=0)
    idx  = vars > threshold
    X_high = X_high[:,idx]
    X_low  = X_low[:,idx]
    return X_high, X_low


sparse_coding.py

In [49]:
def L1_FeatureSign(gamma, A, b):
    """
    The detail of the algorithm is described in the following paper:
    'Efficient Sparse Coding Algorithms', Honglak Lee, Alexis Battle, Rajat Raina, Andrew Y. Ng,
    Advances in Neural Information Processing Systems (NIPS) 19, 2007

    minimize 0.5*x.T @ A @ x + b.T @ x + gamma * |x|
    """
    A = A.astype(float)
    b = b.astype(float)
    x = np.zeros(A.shape[0])
    eps = 1e-9
    grad = A @ x + b
    ii = np.argmax(np.abs(grad*(x==0)))

    while True:
        if grad[ii] > gamma+eps:
            x[ii] = (gamma-grad[ii])/A[ii,ii]
        elif grad[ii] < -gamma-eps:
            x[ii] = (-gamma-grad[ii])/A[ii,ii]
        else:
            if np.all(x==0):
                break

        while True:
            # consider active set
            activated = x != 0
            AA = A[activated,:][:,activated]
            bb = b[activated]
            xx = x[activated]
            # new b based on unchanged sign
            # Ax + b + gamma * sign(x) = 0
            b_new = -gamma*np.sign(xx)-bb
            # analytical solution
            x_new = np.linalg.inv(AA)@b_new
            idx   = x_new != 0
            cost_new  = (b_new[idx]/2 + bb[idx]).T @ x_new[idx] + \
                                gamma*np.sum(np.abs(x_new[idx]))
            change_signs = np.where(xx*x_new <= 0)[0]
            # if no sign change, x_new is optimum since it's analytical solution
            if len(change_signs) == 0:
                x[activated] = x_new
                loss         = cost_new
                break
            # find the best interpolation solution x_inter between x_new and xx
            # x_inter is improved compared with xx, because of convexity
            x_min    = x_new
            cost_min = cost_new
            d        = x_new-xx
            t        = d/xx
            for pos in change_signs:
                x_inter = xx - d/t[pos] # interpolating at pos-th point
                x_inter[pos] = 0        # make sure it is zero
                idx = x_inter != 0
                # cost of x_inter
                cost_temp = (AA[idx,:][:,idx]@x_inter[idx]/2+bb[idx]).T@x_inter[idx] + \
                        gamma*np.sum(np.abs(x_inter[idx]))
                if cost_temp < cost_min:
                    x_min = x_inter
                    cost_min = cost_temp
            # update x and loss
            x[activated] = x_min
            loss         = cost_min

        grad  = A @ x + b
        ii    = np.argmax(np.abs(grad*(x==0)))
        max_x = np.abs(grad[ii])
        if max_x <= gamma+eps:
            break

    return x



def L1_FeatureSign_Setup(X, D, labda):
    """
    minimize ||X[:,j] - D*Z[:,j]||_2^2 + 2*sigma^2*beta |Z[:,j]|_1
    2*sigma^2*beta = labda
    since z = Z[:,j] could be separately optimized
    """
    M, N = X.shape
    K = D.shape[1]
    Z = np.zeros((K,N))
    A = D.T @ D
    for i in np.arange(N):
        b = -D.T @ X[:,i]
        Z[:,i] = L1_FeatureSign(labda, A, b)
    return Z

def object_func(dual_lambda, ZZt, XZt, X, c, trXXt):
    # objective function at given dual_lambda
    L = XZt.shape[0]
    M = len(dual_lambda)
    ZZt_inv = np.linalg.inv(ZZt+np.diag(dual_lambda))
    if L > M:
        f = -np.trace(ZZt_inv @ (XZt.T @ XZt)) + trXXt - c*np.sum(dual_lambda)
    else:
        f = -np.trace(XZt @ ZZt_inv @ XZt.T) + trXXt - c*np.sum(dual_lambda)
    return f

def gradient_func(dual_lambda, ZZt, XZt, X, c, trXXt):
    L = XZt.shape[0]
    M = len(dual_lambda)
    ZZt_inv = np.linalg.inv(ZZt+np.diag(dual_lambda))
    # gradient of the function at given dual_lambda
    g = np.zeros((M,1))
    temp = XZt @ ZZt_inv
    g = np.sum(temp**2, axis=0)-c
    return g

def hessian_func(dual_lambda, ZZt, XZt, X, c, trXXt):
    L = XZt.shape[0]
    M = len(dual_lambda)
    ZZt_inv = np.linalg.inv(ZZt+np.diag(dual_lambda))
    # Hessian evaluated at given dual_lambda
    temp = XZt @ ZZt_inv
    h = -2 * (temp.T @ temp) * ZZt_inv
    return h

def L2_Lagrange_Dual(X, Z, c):
    M, N = X.shape
    K = Z.shape[0]

    ZZt = Z @ Z.T
    XZt = X @ Z.T

    # arbitrary initialization dual_lambda as Gaussian
    dual_lambda = 10*np.abs(np.random.rand(K))
    trXXt = np.sum(X**2)
    options = {'disp':True, 'maxiter': 100}
    # those three are good, if warning says Desired error not necessarily achieved due to precision loss.
    # try another method
    res = minimize(object_func, dual_lambda, args=(ZZt, XZt, X, c, trXXt), jac = gradient_func, method='CG', options=options)
    #res = minimize(object_func, dual_lambda, args=(ZZt, XZt, X, c, trXXt), jac = gradient_func, hess = hessian_func, method='trust-ncg', options=options)
    #res = minimize(object_func, dual_lambda, args=(ZZt, XZt, X, c, trXXt), jac = gradient_func, hess = hessian_func, method='Newton-CG', options=options)
    dual_lambda = res.x
    Dt = np.linalg.inv(ZZt+np.diag(dual_lambda)) @ XZt.T
    D  = Dt.T
    return D

def sparse_coding(X, num_basis, labda, num_iters=50, batch_size=500, initD=None):
    """
    Regularized Sparse Coding
    X:          preprocessed np.ndarray, dimension: MxN
    num_basis:  number of basis. D.shape[1]
    gamma:      sparsity regularization
    num_iters:  number of iterations
    batch_size: batch size
    initD:      initial dictionary

    D:          learned dictionary, dimension: MxK, where K << N, K=num_basis
    Z:          sparse code, dimension: KxN

    this function solve:
        minimize_{D,Z} ||X-DZ||_2^2 + lambda*|Z|_1
        s.t. column norm of D <= c
    This is not convex in both D and Z, but is convex in one of them with the other fixed.
    """
    M, N = X.shape # M: patch_size, N: num_patches
    K    = num_basis
    X = X.astype(float)
    if batch_size == None: batch_size = N
    if initD == None:
        # Initialize D with a Gaussian random matrix
        D = np.random.rand(M, K)-0.5
        D = D - np.mean(D, axis=0)
        # each column is unit normalized
        D = D/np.sqrt(np.sum(D**2, axis=0))
    else:
        D = initD

    # optimization loop:
    for iter in np.arange(num_iters):
        print('{}-th iterations for sparse coding'.format(iter))
        idx = np.arange(N)
        np.random.shuffle(idx)

        for i in np.arange(N//batch_size):
            batch_idx = idx[np.arange(batch_size)+batch_size*(i-1)]
            X_batch   = X[:,batch_idx]

            # fix D, update Z
            # Z = argmin_Z ||X-DZ||_2^2 + labda*|Z|_1
            # for the paper referenced, labda = sigma^2 * beta
            # this is provided by Honglak Lee. et al (2017), Efficient sparse coding algorithms
            Z = L1_FeatureSign_Setup(X_batch, D, labda)

            # fix Z, update D
            # c = 1 sum_j ||D_i^j|| \leq 1, i = 1,2,...,k, column norm constraints
            D = L2_Lagrange_Dual(X_batch, Z, 1)

    return D, Z


train_coupled_dict.py

In [50]:
def train_coupled_dict(X_high, X_low, num_basis, labda, upscale):
    """
    solve:
    D = argmin ||X-DZ||_2^2 + labda * |Z|_1
    s.t ||D_i||_2^2 <= 1, for i = 1, 2, 3, ..., num_basis
    """
    high_dim = X_high.shape[0]
    low_dim  = X_low.shape[0]
    # should pre-normalize X_high and X_low
    high_norm  = np.sqrt(np.sum(X_high**2, axis=0))
    low_norm   = np.sqrt(np.sum(X_low**2, axis=0))
    nontrivial = np.intersect1d(np.where(high_norm != 0)[0], np.where(low_norm != 0)[0])

    X_high = X_high[:,nontrivial]
    X_low  = X_low[:,nontrivial]

    X_high = X_high/np.sqrt(np.sum(X_high**2, axis=0))
    X_low  = X_low/np.sqrt(np.sum(X_low**2, axis=0))

    # joint learning of the dictionary
    X = np.concatenate([np.sqrt(high_dim)*X_high, np.sqrt(low_dim)*X_low], axis=0)
    X_norm = np.sqrt(np.sum(X**2, axis=0))
    X = X[:,X_norm > 1e-5]
    X = X/np.sqrt(np.sum(X**2, axis=0))

    # dictionary training
    D, Z = sparse_coding(X, num_basis, labda) # X = DZ, num_basis = D.shape[1]
    D_high = D[:high_dim,:]
    D_low  = D[high_dim:,:]

    # normalize the dictionary
    # some column is not useful due to zero.
    high_norm  = np.sqrt(np.sum(D_high**2, axis=0))
    low_norm   = np.sqrt(np.sum(D_low**2, axis=0))
    nontrivial = np.intersect1d(np.where(high_norm != 0)[0], np.where(low_norm != 0)[0])

    D_high = D_high[:,nontrivial]
    D_low  = D_low[:,nontrivial]

    D_high = D_high/np.sqrt(np.sum(D_high**2, axis=0))
    D_low  = D_low/np.sqrt(np.sum(D_low**2, axis=0))

    return D_high, D_low

def train_coupled_dict_before_opt(X_high, X_low, num_basis, labda, upscale):
    """
    Unfortunately, I did not find python equivalent to solve this function
    I have to go to MATLAB and carry the result back.
    solve:
    D = argmin ||X-DZ||_2^2 + labda * |Z|_1
    s.t ||D_i||_2^2 <= 1, for i = 1, 2, 3, ..., num_basis
    """
    high_dim = X_high.shape[0]
    low_dim  = X_low.shape[0]
    # should pre-normalize X_high and X_low
    high_norm  = np.sqrt(np.sum(X_high**2, axis=0))
    low_norm   = np.sqrt(np.sum(X_low**2, axis=0))
    nontrivial = np.intersect1d(np.where(high_norm != 0)[0], np.where(low_norm != 0)[0])

    X_high = X_high[:,nontrivial]
    X_low  = X_low[:,nontrivial]

    X_high = X_high/np.sqrt(np.sum(X_high**2, axis=0))
    X_low  = X_low/np.sqrt(np.sum(X_low**2, axis=0))

    # joint learning of the dictionary
    X = np.concatenate([np.sqrt(high_dim)*X_high, np.sqrt(low_dim)*X_low], axis=0)
    X_norm = np.sqrt(np.sum(X**2, axis=0))
    X = X[:,X_norm > 1e-5]
    X = X/np.sqrt(np.sum(X**2, axis=0))

    return X


def train_coupled_dict_after_opt(D, Z):
    # dictionary training
    D, Z = sparse_coding(X, num_basis, labda) # X = DZ, num_basis = D.shape[1]
    D_high = D[:high_dim,:]
    D_low  = D[high_dim:,:]

    # normalize the dictionary
    # some column is not useful due to zero.
    high_norm  = np.sqrt(np.sum(D_high**2, axis=0))
    low_norm   = np.sqrt(np.sum(D_low**2, axis=0))
    nontrivial = np.intersect1d(np.where(high_norm != 0)[0], np.where(low_norm != 0)[0])

    D_high = D_high[:,nontrivial]
    D_low  = D_low[:,nontrivial]

    D_high = D_high/np.sqrt(np.sum(D_high**2, axis=0))
    D_low  = D_low/np.sqrt(np.sum(D_low**2, axis=0))

    return D_high, D_low


In [52]:
import glob
import numpy as np
from numpy import savetxt
from numpy import loadtxt
import tifffile as tif
from scipy import signal
import cv2
from scipy.optimize import minimize