In [17]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
import time
%matplotlib notebook
import warnings
warnings.filterwarnings('ignore')

import cv2

# Import image

In [19]:
import glob

start = time.time()

LR = [cv2.imread(file).astype(np.float64) for file in glob.glob("../data/train_set/LR/*.jpg")]
HR = [cv2.imread(file).astype(np.float64) for file in glob.glob("../data/train_set/HR/*.jpg")]
end = time.time()
print(end - start)

36.62194204330444


# Use traditional method

In [24]:
LR_original = [cv2.imread(file) for file in glob.glob("../data/train_set/LR/*.jpg")]

LR_to_HR = [cv2.resize(lr, (lr.shape[1]*2, lr.shape[0]*2), interpolation = cv2.INTER_CUBIC) for lr in LR_original] 

In [25]:
def mse(imageA, imageB):
    # the 'Mean Squared Error' between the two images is the
    # sum of the squared difference between the two images;
    # NOTE: the two images must have the same dimension
    err = np.sum((imageA - imageB) ** 2)
    err /= float(imageA.shape[0] * imageA.shape[1])
    
    # return the MSE, the lower the error, the more "similar"
    # the two images are
    return err

m = 0
for i in np.arange(1500):
    m += mse(HR[i],LR_to_HR[i])
m/1500

526.14243893245475

# Get Features X and respond y using baseline

### Define basic parameters

In [26]:
seed = 123
channels = np.arange(3)
sample_size = 100

In [27]:
# Get X and y for single pair of LR and HR
def get_X_and_y(LR,HR,n_sample = sample_size,seed = seed):
    # determine seed
    np.random.seed(seed)
    
    # Find neighbor
    def get_neighbor_X(a,i,j):
        return([a[i-1,j-1],a[i-1,j],a[i-1,j+1],a[i,j-1],a[i,j+1],a[i+1,j-1],a[i+1,j],a[i+1,j+1]],a[i,j])

    def get_neighbor_y(a,i,j):
        return([a[i,j],a[i+1,j],a[i,j+1],a[i+1,j+1]])
    
    # padding LR image
    BLACK = [0, 0, 0]
    image_padding = cv2.copyMakeBorder(LR, 1 , 1, 1, 1, cv2.BORDER_CONSTANT, value=BLACK)
    
    y1 = np.zeros((1*n_sample,4))
    y2 = np.zeros((1*n_sample,4))
    y3 = np.zeros((1*n_sample,4))
    
    X1 = np.zeros((1*n_sample,8))
    X2 = np.zeros((1*n_sample,8))
    X3 = np.zeros((1*n_sample,8))
    
    result = [X1,X2,X3]
    Y = [y1,y2,y3]
    
#     height = LR.shape[1]
#     width = LR.shape[0]
    width = LR.shape[1]
    height = LR.shape[0]
    
    # Random pick n_sample point per image
    pts_row = np.random.randint(1, height + 1,size = n_sample)
    pts_col = np.random.randint(1, width + 1,size = n_sample)
    
    for X,y,channel in zip(result,Y,channels):
        index = 0
        for i,j in zip(pts_row,pts_col):
            X_neighbor,central = get_neighbor_X(image_padding[:,:,channel],i,j)
            y_neigbor = get_neighbor_y(HR[:,:,channel],2*(i-1),2*(j-1))
            # Get X
            X[index] = X_neighbor - central
            # Get y
            y[index] = y_neigbor - central
            index +=1
    
    
    # Stack X&Y to 3d
    return np.dstack(result),np.dstack(Y)

In [28]:
def get_Whole_X_and_y(LR,HR,n_sample = sample_size,seed = seed):
    flag = 0
    for lr,hr in zip(LR,HR):
        if (flag == 0):
            X,y = get_X_and_y(lr,hr,n_sample,seed)
            flag = 1
        else:
            X_temp,y_temp = get_X_and_y(lr,hr,n_sample,seed)
            X = np.vstack([X,X_temp])
            y = np.vstack([y,y_temp])
    return X,y

In [29]:
# n_sample should be 1000
start = time.time()
X,y = get_Whole_X_and_y(LR,HR,n_sample = sample_size)
end = time.time()
print(end - start)

164.4905881881714


In [30]:
print(X.shape,y.shape)

(150000, 8, 3) (150000, 4, 3)


# Train & Test split

In [31]:
from sklearn.model_selection import train_test_split

X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2,shuffle = True ,random_state=seed)

In [32]:
print(X_train.shape,y_train.shape)

(120000, 8, 3) (120000, 4, 3)


In [33]:
print(X_test.shape,y_test.shape)

(30000, 8, 3) (30000, 4, 3)


# Prepare functions

###### Prediction

In [34]:
# test paramter == True means you want to predict X_test, False mean you want to predict X_train
def Prediction(X_tr,y_tr,X_ts,model1,model2,model3,model4,test = True):
    
    if (test == False):
        X_test = X_tr
        
    else:
        X_test = X_ts
    
    prediction1 = np.zeros((X_test.shape[0],1,3))
    prediction2 = np.zeros((X_test.shape[0],1,3))
    prediction3 = np.zeros((X_test.shape[0],1,3))
    prediction4 = np.zeros((X_test.shape[0],1,3))

    for channel in channels:
        model1.fit(X_tr[:,:,channel],y_tr[:,0,channel].reshape(-1,1))
        model2.fit(X_tr[:,:,channel],y_tr[:,1,channel].reshape(-1,1))
        model3.fit(X_tr[:,:,channel],y_tr[:,2,channel].reshape(-1,1))
        model4.fit(X_tr[:,:,channel],y_tr[:,3,channel].reshape(-1,1))

        prediction1[:,:,channel] = model1.predict(X_test[:,:,channel]).reshape(-1,1)
        prediction2[:,:,channel] = model2.predict(X_test[:,:,channel]).reshape(-1,1)
        prediction3[:,:,channel] = model3.predict(X_test[:,:,channel]).reshape(-1,1)
        prediction4[:,:,channel] = model4.predict(X_test[:,:,channel]).reshape(-1,1)
    
    return np.concatenate([prediction1,prediction2,prediction3,prediction4],axis = 1)


###### Metric

In [35]:
import math

def MSE(y_true,y_predict):
    temp = (y_true - y_predict)**2
    return sum(temp.reshape(-1))/(temp.shape[0]*temp.shape[1]*temp.shape[2])

def psnr(y_true,y_predict):
    temp = (y_true - y_predict)**2
    mse = sum(temp.reshape(-1))/(temp.shape[0]*temp.shape[1]*temp.shape[2])
    MAXI = 225
    return 20*math.log10(MAXI)-10*math.log10(mse)

###### CV

In [36]:
from sklearn.model_selection import KFold

# Output testing error and training error
def cross_validation(X_train,y_train,model1,model2,model3,model4,folds= 3):
    kf = KFold(n_splits=folds)
    val_error = []
    tr_error = []
    for train_index, val_index in kf.split(X_train):
        X_tr,y_tr = X_train[train_index],y_train[train_index]
        X_val,y_val = X_train[val_index],y_train[val_index]
        
        y_predict_val = Prediction(X_tr,y_tr,X_val,model1,model2,model3,model4,test = True)
        y_predict_tr = Prediction(X_tr,y_tr,X_val,model1,model2,model3,model4,test = False)
        # Metric 
        val_error.append(psnr(y_val,y_predict_val)) # change metric here
        tr_error.append(psnr(y_tr,y_predict_tr)) # change metric here
        
    return val_error,tr_error
        

###### Grid Search

In [37]:
from sklearn.metrics import make_scorer
from sklearn.model_selection import GridSearchCV

# scorer can be MSE or PSNR

def my_grid_search(X_train,y_train,scorer,model,parameters,cv = 3):
    my_scorer = make_scorer(scorer, greater_is_better=False)
    reg = GridSearchCV(model,
                       scoring = my_scorer,
                       cv=cv,
                      param_grid = parameters)
    reg.fit(X_train, y_train)
    
    print('Best params',reg.best_params_)
    print('Best score:',reg.score(X_train, y_train))
    
    return reg.best_params_

# Train using base model (gradient boosting)

###### Grid Search

In [41]:
 from sklearn.ensemble import GradientBoostingRegressor

# parameters = {
#     'learning_rate':0.1, # tuning (started with higher learning rate), hight value --> overfit
#     'n_estimators' : 100, # tuning with learning_rate
#     'criterion' : 'mse',
#     'min_samples_split' : 2, # tuning,high values --> under-fitting
#     'min_samples_leaf' : 1, # tuning, similar to min_samples_split
#     'max_depth' : 3, # tuning, high value --> overfitting
#     'max_features' : 3, # ROT: = sqrt(# of features) 
#     'subsample' : 0.8,
#     'random_state' : seed
# }

In [42]:

parameters = {
    'learning_rate':0.1, # tuning (started with higher learning rate), hight value --> overfit
    'n_estimators' : 100, # tuning with learning_rate
    'criterion' : 'mse',
    'min_samples_split' : 2, # tuning,high values --> under-fitting
    'min_samples_leaf' : 1, # tuning, similar to min_samples_split
    'max_depth' : 3, # tuning, high value --> overfitting
    'max_features' : 3, # ROT: = sqrt(# of features) 
    'subsample' : 0.8,
    'random_state' : seed
}

In [45]:
start = time.time()

gbm1 = GradientBoostingRegressor(**parameters)
gbm2 = GradientBoostingRegressor(**parameters)
gbm3 = GradientBoostingRegressor(**parameters)
gbm4 = GradientBoostingRegressor(**parameters)

# prediction = Prediction(X_train,y_train,X_test,gbm1,gbm2,gbm3,gbm4,test = True)

end = time.time()
print(end - start)

68.67245697975159


### Do CV

In [46]:
start = time.time()

val_error,tr_error = cross_validation(X_train,y_train,gbm1,gbm2,gbm3,gbm4,folds= 3)
print('Validation error:',val_error,'Train error:',tr_error)

end = time.time()
print(end - start)

Validation error: [24.836008464329385, 24.86166645095201, 24.80210526428031] Train error: [25.09859840409154, 25.088562991093152, 25.105711596423657]
396.8438630104065


In [47]:
MSE(prediction , y_test)

161.25034664262898

In [48]:
MSE(prediction , y_train)

ValueError: operands could not be broadcast together with shapes (30000,4,3) (120000,4,3) 

In [49]:
psnr(m)

TypeError: psnr() missing 1 required positional argument: 'y_predict'

In [None]:
# def get_X_and_y(LR,HR):
    
#     def get_neighbor_X(a,i,j):
#         return([a[i-1,j-1],a[i-1,j],a[i-1,j+1],a[i,j-1],a[i,j+1],a[i+1,j-1],a[i+1,j],a[i+1,j+1]],a[i,j])

#     def get_neighbor_y(a,i,j):
#         return([a[i,j],a[i+1,j],a[i,j+1],a[i+1,j+1]],a[i,j])
    
#     BLACK = [0, 0, 0]
#     image_padding = cv2.copyMakeBorder(LR, 1 , 1, 1, 1, cv2.BORDER_CONSTANT, value=BLACK)
    
#     height = LR.shape[1]
#     width = LR.shape[0]
    
#     y1 = np.zeros((height*width,4))
#     y2 = np.zeros((height*width,4))
#     y3 = np.zeros((height*width,4))
    
#     X1 = np.zeros((height*width,8))
#     X2 = np.zeros((height*width,8))
#     X3 = np.zeros((height*width,8))
    
#     result = [X1,X2,X3]
#     Y = [y1,y2,y3]
    
#     for X,y,channel in zip(result,Y,channels):
#         index = 0
#         for i,iy in zip(np.arange(1,width + 1),np.arange(0,width_HR,2)):
#             for j,jy in zip(np.arange(1,height + 1),np.arange(0,height_HR,2)):
#                 neighbor,central = get_neighbor_X(image_padding[:,:,channel],i,j)
#                 y_neigbor,y_central = get_neighbor_y(HR[:,:,channel],iy,jy)
#                 # Get X
#                 X[index] = neighbor - central
#                 # Get y
#                 y[index] = y_neigbor - y_central
#                 index +=1
    
    
#     # Stack X&Y to 3d
#     return np.dstack(result),np.dstack(Y)