In [1]:
import numpy as np
import matplotlib.pyplot as plt
import cv2
import pywt
from scipy.signal import wiener
import skimage as skimg 

from sklearn.preprocessing import (MinMaxScaler, StandardScaler)
# from sklearn.model_selection import (train_test_split,)
from sklearn.model_selection import (train_test_split, StratifiedKFold,)
# from sklearn.ensemble import GradientBoostingClassifier
from sklearn.svm import (SVC)

from sklearn.base import (BaseEstimator, TransformerMixin)
from sklearn.pipeline import (make_pipeline, make_union,)
from sklearn.metrics import (classification_report, accuracy_score)

import os
from tqdm import tqdm

In [2]:
class CLASS():
    FAKE = 0
    REAL = 1

### Load image

In [3]:
PATH_PREFIX = '/home/thienn17/Documents/ICL/'
os.path.exists(PATH_PREFIX)

True

In [4]:
class LoadImage():
    def __init__(self, path_prefix, colorcvt=None):
        assert os.path.exists(path_prefix), "LoadImage, Path does not exist"
        self.path_prefix = path_prefix
        self.colorcvt = colorcvt

        self.cls_folders = [f for f in os.listdir(path_prefix) if f == 'dummy']
        self.cls_id = 0
        self.path_walk = os.walk(os.path.join(path_prefix, self.cls_folders[0]))
        
        self.images = []
        self.labels = []
    
    def reset(self):
        self.images = []
    
    def next_batch(self):
        try:
            while(True):
                abspath, _, files = next(self.path_walk)
                if len(files) > 0: break
            self.reset()
            print(abspath)
            for file in tqdm(files):
                img = cv2.imread(os.path.join(abspath, file))
                
                # up_left = (np.array(img.shape[:2], dtype=np.int16) // 2) - 500
                # # print(up_left)
                # img = img[up_left[0]:up_left[0]+1000, up_left[0]:up_left[0]+1000]
                # # print(img.shape)
                # # return
                
                if img is not None:
                    if self.colorcvt is not None:
                        img = cv2.cvtColor(img, self.colorcvt)
                    self.images.append(img)
                    self.labels.append(CLASS.REAL if self.cls_folders[self.cls_id][0] == 'S' else CLASS.FAKE)
            return self.images, self.labels
        except StopIteration:
            if self.cls_id < len(self.cls_folders)-1:
                self.cls_id += 1
                self.path_walk = os.walk(os.path.join(self.path_prefix, self.cls_folders[self.cls_id]))
                return self.next_batch()

        return None

    def next_batch_new(self):
        pass

### Feature modules

In [5]:
class WaveletTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, wtname = 'haar', level = 3):
        '''
        waveletname = ['haar', 'db3', 'db5', 'sym2', 'bior5.5', etc.]
        level: total number of decomposite level
        '''
        self.wtname = wtname
        self.level = level
    
    def fit(self, X, y):
        return self

    def transform(self, X):
        features = []
        for imgBGR in tqdm(X):
            img_features = []
            for img in np.moveaxis(imgBGR, -1, 0):
                wt = pywt.wavedec2(data=img, wavelet=self.wtname, level=self.level)
                appr = wt[0]
                details = wt[1:]
                wt = [appr]
                for levels in details:
                    for detail in levels:
                        wt.append(detail)
                for _wt in wt:
                    img_features.append(np.mean(_wt))
                    img_features.append(np.var(_wt))
                    # img_features.append(np.mean((_wt - np.mean(_wt))**3))
            features.append(img_features)
        return features

In [6]:
class LBPTransformer(BaseEstimator, TransformerMixin):
    def __init__(self, num_points = 8, radius = 1, gray=False, noise=False):
        self.num_points = num_points
        self.radius = radius
        self.gray = gray
        self.noise = noise

    def fit(self, X, y):
        return self
    
    def transform(self, X):
        features = []
        for img in tqdm(X):
            if self.gray:
                img = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)
                if self.noise:
                    img = img - wiener(img, 5)
                features.append(self.get_lbp_features((img,)))
            else:
                # print(img.shape)
                features.append(self.get_lbp_features(np.moveaxis(img, -1, 0)))
        return features

    def local_binary_pattern(self, img, normalized=True):
        lbp = skimg.feature.local_binary_pattern(
            img, self.num_points, self.radius, method="nri_uniform").ravel()
        (hist, bins) = np.histogram(lbp.ravel(), bins=59)
        # bins, hist = np.unique(lbp, return_counts=True)
        
        if normalized is False:
            return hist
        hist = hist / len(lbp)
        return hist

    def get_lbp_features(self, img_channels):
        lbp_features = np.zeros(59)
        for img in img_channels:
            lbp_features += self.local_binary_pattern(img)
        return lbp_features / len(img_channels)

### Extract & Dump

In [7]:
# Wavelet
LEVEL = 3
extractor = WaveletTransformer(wtname='db5',level=LEVEL)
A = np.empty((1, 3*2*(3*LEVEL+1)))  # channels * #feature * (#high_img * levels + #low-img)
# A = np.empty((1,3*2*3*LEVEL))

# # LBP
# extractor = LBPTransformer()
# A = np.empty((1, 59))

In [8]:
b = None
loader = LoadImage(PATH_PREFIX, colorcvt=None)
batch = 1
while(True):
    print("Batch {}".format(batch))
    batch += 1

    res = loader.next_batch()
    if res is None:
        break
    images, b = res
    A = np.concatenate((A, extractor.transform(images)), axis=0)

Batch 1
/home/thienn17/Documents/ICL/dummy


100%|██████████| 2/2 [00:00<00:00,  8.60it/s]
100%|██████████| 2/2 [00:00<00:00,  2.14it/s]

Batch 2





In [9]:
A = A[1:]
b = np.array(b)
A.shape

(2345, 60)

In [10]:
PATH_DUMP = "./object dump/no union"
os.path.exists(PATH_DUMP)

True

In [18]:
FILE = "a"
np.save(os.path.join(PATH_DUMP, FILE+".npy"), A)
np.save(os.path.join(PATH_DUMP, FILE+"_label.npy"), b)

### Load

In [25]:
PATH_DUMP = "./object dump/no union"
os.path.exists(PATH_DUMP)

True

In [26]:
FILE = "wt_YCC_3lv_db5"
A = np.load(os.path.join(PATH_DUMP, FILE+".npy"))
b = np.load(os.path.join(PATH_DUMP, FILE+"_label.npy"))
A.shape

(2345, 60)

In [118]:
FILE = "wt_BGR_3lv_db5"
FILE2 = "lbp_HSV"
assert np.all(np.load(os.path.join(PATH_DUMP, FILE+"_label.npy")) == np.load(os.path.join(PATH_DUMP, FILE2+"_label.npy"))), "Different order"

A = np.concatenate((np.load(os.path.join(PATH_DUMP, FILE+".npy")),
                    np.load(os.path.join(PATH_DUMP, FILE2+".npy"))),
                   axis=1)
b = np.load(os.path.join(PATH_DUMP, FILE+"_label.npy"))
A.shape

(2345, 119)

## Train

### Single fit

In [20]:
A_train, A_test, b_train, b_test = train_test_split(A, b,
                                                    train_size=0.8,
                                                    random_state=42,
                                                    stratify=b)

In [23]:
pipe = make_pipeline(StandardScaler(), SVC())
pipe = pipe.fit(A_train, b_train)

In [25]:
b_pred = pipe.predict(A_test)
print(classification_report(b_test, b_pred))

              precision    recall  f1-score   support

           0       0.25      0.00      0.01       246
           1       0.50      0.99      0.66       246

    accuracy                           0.50       492
   macro avg       0.37      0.50      0.34       492
weighted avg       0.37      0.50      0.34       492



### KFold

In [19]:
def kfold(_estimator):
    # print(_estimator)
    
    kf = StratifiedKFold(n_splits=5, shuffle=True, random_state=42)
    score = 0
    for k, (i_train, i_test) in enumerate(kf.split(A, b)):
        model = _estimator.fit(A[i_train,], b[i_train])

        y_pred = model.predict(A[i_test,])
        acc = accuracy_score(y_true=b[i_test], y_pred=y_pred)
        score += acc

        print("[Fold {}] Acc: {:.3f}".format(k+1, acc))
        # print(classification_report(y_true=b[i_test], y_pred=y_pred))

    print("Mean acc: {:.3f}\n".format(score/kf.get_n_splits()))

In [20]:
pipe = make_pipeline(StandardScaler(), SVC())
kfold(pipe)

[Fold 1] Acc: 1.000
[Fold 2] Acc: 1.000
[Fold 3] Acc: 1.000
[Fold 4] Acc: 0.990
[Fold 5] Acc: 1.000
Mean acc: 0.998



In [None]:
'''
[Fold 1] Acc: 0.987
[Fold 2] Acc: 0.972
[Fold 3] Acc: 0.977
[Fold 4] Acc: 0.985
[Fold 5] Acc: 0.983
Mean acc: 0.981
'''

## Cable

Ignore this

In [8]:
PATH_PREFIX = '/home/thienn17/Documents/Cable/'
os.path.exists(PATH_PREFIX)

True

In [9]:
class LoadImageCable():
    def __init__(self, path_prefix, colorcvt=None):
        assert os.path.exists(path_prefix), "LoadImage, Path does not exist"
        self.path_prefix = path_prefix
        self.colorcvt = colorcvt

        self.cls_folders = [f for f in os.listdir(path_prefix) if f != 'dummy']
        self.cls_id = 0
        self.path_walk = os.walk(os.path.join(path_prefix, self.cls_folders[0]))
        
        self.images = []
        self.labels = []
    
    def reset(self):
        self.images = []
    
    def next_batch(self):
        try:
            while(True):
                abspath, _, files = next(self.path_walk)
                if len(files) > 0: break
            self.reset()
            print(abspath)
            for file in tqdm(files):
                img = cv2.imread(os.path.join(abspath, file))
                
                if img is not None:
                    if self.colorcvt is not None:
                        img = cv2.cvtColor(img, self.colorcvt)
                    self.images.append(img)
                    self.labels.append(CLASS.REAL if self.cls_folders[self.cls_id] == 'real' else CLASS.FAKE)
            return self.images, self.labels
        except StopIteration:
            if self.cls_id < len(self.cls_folders)-1:
                self.cls_id += 1
                self.path_walk = os.walk(os.path.join(self.path_prefix, self.cls_folders[self.cls_id]))
                return self.next_batch()

        return None

In [10]:
# Wavelet
LEVEL = 3
extractor = WaveletTransformer(wtname='db5',level=LEVEL)
A = np.empty((1, 3*2*(3*LEVEL+1)))  # channels * #feature * (#high_img * levels + #low-img)
# A = np.empty((1,3*2*3*LEVEL))

# # LBP
# extractor = LBPTransformer()
# A = np.empty((1, 59))

In [11]:
b = None
loader = LoadImageCable(PATH_PREFIX, colorcvt=cv2.COLOR_BGR2YCrCb)
batch = 1
while(True):
    print("Batch {}".format(batch))
    batch += 1

    res = loader.next_batch()
    if res is None:
        break
    images, b = res
    A = np.concatenate((A, extractor.transform(images)), axis=0)

Batch 1
/home/thienn17/Documents/Cable/recap


100%|██████████| 246/246 [01:12<00:00,  3.39it/s]
100%|██████████| 246/246 [05:42<00:00,  1.39s/it]


Batch 2
/home/thienn17/Documents/Cable/real


100%|██████████| 246/246 [00:01<00:00, 208.68it/s]
100%|██████████| 246/246 [00:04<00:00, 51.58it/s]

Batch 3





In [12]:
A = A[1:]
b = np.array(b)
A.shape

(492, 60)