In [None]:
import os
import shutil
import numpy as np
import time
import cv2
import random
import matplotlib.image as imgplt
import matplotlib.pyplot as plt

In [None]:
### our definition about LM3L classifier 

class LM3L_classifier:
    def __init__(self, s_factor=1, rate=0.001, p=2, lamda=0.1, th=5, mg=1, T=10, err=0.001):
        self.s_factor = s_factor
        self.rate = rate
        self.p = p
        self.lamda = lamda
        self.th = th
        self.mg = mg
        self.T = T
        self.err = err
    def training(self, samples, labels, K, epsilon = 1e-5):
        ### step 1
        ### Initialization(fin)
        self.K = K
        s_factor = self.s_factor
        rate = self.rate
        p = self.p
        lamda = self.lamda
        th = self.th
        mg = self.mg
        T = self.T
        err = self.err
        n = len(samples[0][0])
        J0 = 0
        J1 = 0
        L = list()
        I = np.zeros((K,1))/K
        w = np.ones((K,1))/K
        for k in range(K):
            d = np.array(samples[k]).shape[0]
            s = int(d/s_factor)
            L.append(np.eye(s,d))


        ### step 2
        ### Alternating optimization
        ## compute the initial loss function J0
        # compute Ik
        for k in range(K):
            samplek = np.transpose(samples[k])
            I[k] = 0
            for i in range(int(n/2)):
                vec = np.dot(L[k], np.transpose(samplek[2*i-1]-samplek[2*i-2]))
                I[k] += hinge(mg-labels[i]*(th - np.linalg.norm(vec)**2))
            print("I[%d] = %f"%(k, I[k]))
        # compute regulization term
        R = 0
        for i in range(int(n/2)):
            for k in range(K):
                samplek = np.transpose(samples[k])
                for l in range(k):
                    samplel = np.transpose(samples[l])
                    vec1 = np.dot(L[k], np.transpose(samplek[2*i-1]-samplek[2*i-2]))
                    vec2 = np.dot(L[l], np.transpose(samplel[2*i-1]-samplel[2*i-2]))
                    dist =  (np.linalg.norm(vec1)- np.linalg.norm(vec2))**2
                    R += dist
        R = lamda*R
        J0 = R
        for k in range(K):
            J0 += I[k]*w[k]**p
        print("Initialized:\n w = %s\n J = %f"%(w, J0))
        # start optimization
        for t in range(T):
            for k in range(K):
                samplek = np.transpose(np.array(samples[k]))
                ## gradient descent for updating L
                # compute the gradient use past L
                d = np.array(samples[k]).shape[0]
                C = np.zeros((d,d))
                for i in range(int(n/2)):
                    # we use adjustified sigmoid to approximate the derivation of hinge function
                    veck = np.dot(L[k], np.transpose(samplek[2*i-1]-samplek[2*i-2]))
                    distk = np.linalg.norm(veck)
                    z = mg-labels[i]*(th - distk**2)
                    coe = (w[k]**p)*labels[i]*sigmoid(z, 5)
                    for l in range(K):  
                        if l==k:
                            continue
                        samplel = np.transpose(samples[l])
                        vecl = np.dot(L[l], np.transpose(samplel[2*i-1]-samplel[2*i-2]))
                        distl = np.linalg.norm(vecl)
                        coe += lamda*(1-np.sqrt(distl**2+epsilon)/np.sqrt(distk**2+epsilon))
                    if i==0:
                        C = coe*np.outer(samplek[2*i-1]-samplek[2*i-2],samplek[2*i-1]-samplek[2*i-2])
                    else:
                        C += coe*np.outer(samplek[2*i-1]-samplek[2*i-2],samplek[2*i-1]-samplek[2*i-2])
                # compute new Lk ...
                D = 2*np.dot(L[k], C)
                L[k] = L[k]-rate*D
                # ... and updated Ik using new Lk(fin)
                I[k] = 0
                for i in range(int(n/2)):
                    vec = np.dot(L[k], np.transpose(samplek[2*i-1]-samplek[2*i-2]))
                    I[k] += hinge(mg-labels[i]*(th - np.linalg.norm(vec)**2))
            ## lagrange method for updating w using I[k]
            w = (np.sqrt(I**2+epsilon))**(-1/(p-1))
            w = w/sum(w) 
            ## compute new J, verify terminal condition(fin)
            # compute new J1, just need to compute the regulization term(fin)
            R = 0
            for i in range(int(n/2)):
                for k in range(K):
                    samplek = np.transpose(samples[k])
                    for l in range(k):
                        samplel = np.transpose(samples[l])
                        vec1 = np.dot(L[k], np.transpose(samplek[2*i-1]-samplek[2*i-2]))
                        vec2 = np.dot(L[l], np.transpose(samplel[2*i-1]-samplel[2*i-2]))
                        dist =  (np.linalg.norm(vec1)- np.linalg.norm(vec2))**2
                        R += dist
            R = lamda*R
            J1 = R
            for k in range(K):
                J1 += I[k]*w[k]**p
            print(J1)
            # verify terminal condition(fin)
            if abs(J1-J0) < err:
                print("reach terminal condition")
                break;
            else:
                J0 = J1
            times = t+1
            print("Iteration %d times:\n w = %s\n J = %f"%(times, w, J0))


        ### step 3
        ### Output distance metrics and weights(fin)
        M = list()
        for k in range(K):
            M.append(np.dot(np.transpose(L[k]), L[k]))
        self.L = L
        self.M = M
        self.w = w
        print("algorithm terminated after %d times iteration"%(times))
        return (M, w)
    def testing(self,samples):
        th = self.th
        w = self.w
        L = self.L
        p = self.p
        K = self.K
        result = list()
        dists = list()
        ### Initialization
        n = np.shape(samples[0])[1]
        th *= np.sum(w**p)
        dist = 0
        print(th)
        for i in range(int(n/2)):
            ### compute weighted distance
            dist = 0
            for k in range(K):
                vec = np.dot(L[k],  np.transpose(samples[k][:,2*i-1]-samples[k][:,2*i-2]))
                dist += (w[k]**p)*(np.linalg.norm(vec)**2)
            #print(dist)
            dists.append(dist)
            if dist < th:
                result.append(1)
            else:
                result.append(-1)
        return dists, result
        
    def save(self, filename):
        print("gaoshi")
    
def split_features(features, train_number):
    train=[[] for i in features]
    test=[[] for i in features]
    randnum=random.sample(range(1600),train_number)
    for i in range(1600):
        if i in randnum:
            for j in range(len(features)):
                train[j].extend([features[j][2*i],features[j][2*i+1]])
        else:
            for j in range(len(features)):
                test[j].extend([features[j][2*i],features[j][2*i+1]])
    randnum=random.sample(range(1600),train_number)
    for i in range(1600):
        if i in randnum:
            for j in range(len(features)):
                train[j].extend([features[j][3200+2*i],features[j][3200+2*i+1]])
        else:
            for j in range(len(features)):
                test[j].extend([features[j][3200+2*i],features[j][3200+2*i+1]])
    print("finish")
    train_label = np.concatenate([np.ones(train_number),np.zeros(train_number)-1],axis=0)
    test_label = np.concatenate([np.ones(1600-train_number),np.zeros(1600-train_number)-1],axis=0)
    return train,test,train_label,test_label
    
    
def evaluate(result:np.ndarray,label:np.ndarray):
    result1=result==1
    label1=label==1
    tp=(result[result1]==label[result1]).sum()
    fp=result1.sum()-tp
    tn=(result[~result1]==label[~result1]).sum()
    fn=label1.sum()-tp
    print(tp,fp,tn,fn)
    accuracy = (tp+tn)/result.shape[0]
    presicion = tp/(tp+fp)
    recall = tp/(tp+fn)
    f1 = 2/(1/presicion+1/recall)
    print(f"accuracy:{accuracy}\npresicion:{presicion}\nrecall:{recall}\nf1 score:{f1}")
    return accuracy,presicion,recall,f1

def hinge(x):
    if x >= 0:
        return x
    return 0

def sigmoid(x, a):
    y = np.exp(-a*x)
    y = 1/(1+y)
    return y



In [None]:
### some functions about input, output and feature extraction

def get_all_path(dirpath, *suffix):
    PathArray = []
    for r, ds, fs in os.walk(dirpath):
        for fn in fs:
            if os.path.splitext(fn)[1] in suffix:
                fname = os.path.join(r, fn)
                PathArray.append(fname)
    return PathArray

sourcePath = r'C:\Users\Yunfeiman\Desktop\courses\19q\Project\LFW'


def get_imgs(dirpath):
    ### initialization
    images = list()
    ### ge all pictures into arrays
    ## get paths
    imagepaths = get_all_path(dirpath, '.jpg')
    ## translate into gray
    for path in imagepaths:
        img = cv2.imread(path)
        img = img[50:200,85:165]
        img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
        images.append(img)
    return images


def get_lbp_pattern(block,block_size):
    pattern = np.zeros(8)
    feature = []
    for i in range(1, block_size[0] - 1):
        for j in range(1, block_size[1] - 1):
            pattern[0] = block[i - 1, j - 1] > block[i, j]
            pattern[1] = block[i - 1, j] > block[i, j]
            pattern[2] = block[i - 1, j + 1] > block[i, j]
            pattern[3] = block[i, j + 1] > block[i, j]
            pattern[4] = block[i + 1, j + 1] > block[i, j]
            pattern[5] = block[i + 1, j ] > block[i, j]
            pattern[6] = block[i + 1, j - 1] > block[i, j]
            pattern[7] = block[i, j - 1] > block[i, j]
            diff = np.array([pattern[i-1]!=pattern[i] for i in range(8)]).sum()
            if diff<=2:
                feature.append(np.dot(pattern,[2**i for i in range(8)]))
            else:
                feature.append(-1)
    return np.array(feature)

def lbp(input:np.ndarray,block_size=(15,16)):#int8 input with dim=2,edge around blocks is ignored
    uniform=[0, 1, 2, 3, 4, 6, 7, 8, 12, 14, 15, 16, 24, 28, 30, 31, 32, 48, 56, 60, 62, 63, 64, 96, 112, 120, 124, 126, 127, 128, 129, 131, 135, 143, 159, 191, 192, 193, 195, 199, 207, 223, 224, 225, 227, 231, 239, 240, 241, 243, 247, 248, 249, 251, 252, 253, 254, 255]
    if input.ndim!=2:
        print("input dim!=2")
    else:
        row_index = 0
        feature = []
        block_feature = []
        while row_index+block_size[0]<=input.shape[0]:
            column_index = 0
            while column_index+block_size[1]<=input.shape[1]:
                block_feature = []
                block = input[row_index:row_index+block_size[0],column_index:column_index+block_size[1]]
                pattern = get_lbp_pattern(block,block_size)
                block_feature.append(np.sum(pattern==-1))
                block_feature.extend([np.sum(pattern==i) for i in uniform])
                block_feature=np.array(block_feature)
                feature.append(block_feature/block_feature.sum())
                column_index+=block_size[1]
            row_index += block_size[0]
        return np.concatenate(feature,axis=0)
    

def extract_lbp(images):
    lbp_features = list()
    for image in images:
        lbp_features.append(lbp(np.array(image)))
    return lbp_features

def get_hog_pattern(cell,cellSize = (8,8)):
    gx = np.zeros((cellSize[0]-2,cellSize[1]-2))
    gy = np.zeros((cellSize[0]-2,cellSize[1]-2))
    for i in range(1, cellSize[0] - 1):
        for j in range(1, cellSize[1] - 1):
            gx[i-1,j-1] = float(cell[i,j+1])-float(cell[i,j-1])
            gy[i-1,j-1] = float(cell[i+1,j])-float(cell[i-1,j])

    gra_mag = np.sqrt(gx ** 2 + gy ** 2)
    gra_dir = np.arctan2(gy,gx)/np.pi
    return gra_dir.flatten(),gra_mag.flatten()

def hog(input:np.ndarray,blockSize = (2,2),blockStride = (1,1),cellSize = (8,8),nbins = 9):
    if input.ndim!=2:
        print("input dim!=2")
    else:
        cell_feature = np.zeros((int(input.shape[0]/cellSize[0]),int(input.shape[1]/cellSize[1]),9))
        row_index = 0
        row=0
        while row_index + cellSize[0] <= input.shape[0]:
            column_index = 0
            column=0
            while column_index+cellSize[1] <=input.shape[1]:
                one_cell_feature = np.zeros(11)
                cell = input[row_index:row_index + cellSize[0], column_index:column_index + cellSize[1]]
                gradient_dir,gradient_mag =get_hog_pattern(cell,cellSize)
                gradient_dir = gradient_dir*nbins
                for i in range(gradient_dir.shape[0]):
                    low = int(np.floor(gradient_dir[i]))
                    high = low+1
                    one_cell_feature[low]+=(high-gradient_dir[i])*gradient_mag[i]
                    one_cell_feature[high]+=(gradient_dir[i]-low)*gradient_mag[i]
                one_cell_feature[0]+=one_cell_feature[9]
                cell_feature[row,column] = one_cell_feature[:9]
                column_index+=cellSize[1]
                column+=1
            row_index+=cellSize[0]
            row+=1
        print((cell_feature<0).sum())
        row_index = 0
        block_feature = []
        while row_index + blockSize[0] <= cell_feature.shape[0]:
            column_index = 0
            while column_index+blockSize[1] <=cell_feature.shape[1]:
                one_block_feature=cell_feature[row_index:row_index+blockSize[0],column_index:column_index+blockSize[1]].flatten()
                block_feature.append(one_block_feature/(np.linalg.norm(one_block_feature)))
                column_index+=blockStride[1]
            row_index+=blockStride[0]
        return np.concatenate(block_feature, axis=0)
    
def extract_hog(images):
    hog_features = list()
    for image in images:
        hog_features.append(hog(np.array(image)))
    return np.concatenate(hog_features,axis=1)

def get_cslbp_pattern(block,block_size,threshold):
    pattern = np.zeros(4)
    feature = []
    for i in range(1, block_size[0] - 1):
        for j in range(1, block_size[1] - 1):
            pattern[0] = block[i - 1, j - 1] > block[i + 1, j + 1]+threshold
            pattern[1] = block[i - 1, j] > block[i + 1, j ]+threshold
            pattern[2] = block[i - 1, j + 1] > block[i + 1, j - 1]+threshold
            pattern[3] = block[i, j + 1] > block[i, j - 1]+threshold
            feature.append(np.dot(pattern,[2**i for i in range(4)]))
    return np.array(feature)

def cslbp(input:np.ndarray,block_size=(10,10),threshold=0):
    if input.ndim!=2:
        print("input dim!=2")
    else:
        row_index = 0
        feature = []
        while row_index + block_size[0] <= input.shape[0]:
            column_index = 0
            while column_index + block_size[1] <= input.shape[1]:
                block = input[row_index:row_index + block_size[0], column_index:column_index + block_size[1]]
                pattern = get_cslbp_pattern(block, block_size,threshold)
                block_feature = np.array([np.sum(pattern == i) for i in range(16)])
                feature.append(block_feature / block_feature.sum())
                column_index += block_size[1]
            row_index += block_size[0]
        return np.concatenate(feature, axis=0)

def extract_cslbp(images):
    cslbp_features = list()
    for image in images:
        cslbp_features.append(cslbp(np.array(image)))
    return cslbp_features
    
def WPCA(features, dimension=200, epsilon=1e-5):
    ### compute cov matrix
    print("start computing cov matrix")
    mean = np.mean(features, axis=0)
    features-= mean
    cov = np.dot(np.transpose(features), features)
    print(np.linalg.matrix_rank(cov))
    print(np.shape(cov))
    print("start eigen-decomposition")
    ### eigen-decomposition
    evalue, evector = np.linalg.eig(cov)
    print(evalue)
    ### return the reduce matrix and reduced features
    revector = evector[:,:dimension]
    reduced_feature = np.dot(features, revector)
    print("start whiten")
    ### whiten
    n = len(features)
    stds = list()
    for i in range(dimension):
        std = np.linalg.norm(reduced_feature[:,i])
        stds.append(std)
        reduced_feature[:,i] = reduced_feature[:,i]/np.sqrt(std**2+epsilon)
    return revector, reduced_feature, mean, stds

def test_WPCA(features, revector, mean, stds, epsilon=1e-5, dimension=200):
    features -= mean
    reduced_feature = np.dot(features, revector)
    for i in range(dimension):
        reduced_feature[:, i] /= np.sqrt(stds[i]**2+epsilon)
    return reduced_feature

In [None]:
### train and test

In [None]:
labels = list()
for i in range(1600):
    labels.append(1)
for i in range(1600):
    labels.append(-1)

In [None]:
images = get_imgs(sourcePath)

In [None]:
flbp = extract_lbp(images)
fcslbp = extract_cslbp(images)
fhog = extract_hog(images)
fhog = np.transpose(fhog)

In [None]:
train, test, train_label, test_label = split_features([flbp, fhog, fcslbp], 1280)

In [None]:
reduce_matrix_flbp, reduced_flbp, mean_flbp, stds_flbp = WPCA(train[0])
reduce_matrix_fhog, reduced_fhog, mean_fhog, stds_fhog = WPCA(train[1])
reduce_matrix_fcslbp, reduced_fcslbp, mean_fcslbp, stds_fcslbp = WPCA(train[2])

In [None]:
### train
feature_hog = np.transpose(reduced_fhog)
feature_lbp = np.transpose(reduced_flbp)
feature_cslbp = np.transpose(reduced_fcslbp)
features = [feature_lbp,feature_hog, feature_cslbp]
LM3L = LM3L_classifier(rate=2, T=300)
M, w = LM3L.training(features, train_label, len(features))

In [None]:
### test
test[0] = test_WPCA(test[0], reduce_matrix_flbp, mean_flbp, stds_flbp)
test[1] = test_WPCA(test[1], reduce_matrix_fhog, mean_fhog, stds_fhog)
test[2] = test_WPCA(test[2], reduce_matrix_fcslbp, mean_fcslbp, stds_fcslbp)
tf_lbp = np.transpose(test[0])
tf_hog = np.transpose(test[1])
tf_cslbp = np.transpose(test[1])

In [None]:
test_feature = [tf_lbp, tf_hog, tf_cslbp]
dists, result = LM3L.testing(test_feature)
evaluate(np.array(result),np.array(test_label))

In [None]:
### for comming test
test_source_path = ""
test_images = get_imgs(sourcePath)

In [None]:
test_flbp = extract_lbp(test_images)
test_fhog = extract_hog(test_images)
test_fhog = np.transpose(test_fhog)

In [None]:
test_flbp = np.transpose(test_WPCA(test_flbp, reduce_matrix_flbp, mean_flbp, stds_flbp))
test_fhog = np.transpose(test_WPCA(test_fhog, reduce_matrix_fhog, mean_fhog, stds_fhog))

In [None]:
dists, result = LM3L.testing(test_feature)

In [None]:
f = open("results.txt", 'w')
for i in range(len(result)):
    if(result[i]==1):
        f.write('1\n')
    else:
        f.write('0\n')