## Multi Scale CNN Cascade Network

This notebook implements the multi scale cnn cascasde network proposed in the paper WIDER FACES

In [120]:
import os
import re
import requests
import zipfile
import shutil
import cv2 as cv
from glob import iglob
import numpy as np
import numpy.random as np_rand
import tensorflow as tf
from tensorflow.keras import layers, models, regularizers

In [2]:
def download_file_from_google_drive(id, destination):
    
    def get_confirm_token(response):
        for key, value in response.cookies.items():
            if key.startswith('download_warning'):
                return value

        return None

    def save_response_content(response, destination):
        CHUNK_SIZE = 32768

        with open(destination, "wb") as f:
            for chunk in response.iter_content(CHUNK_SIZE):
                if chunk: # filter out keep-alive new chunks
                    f.write(chunk)

    URL = "https://docs.google.com/uc?export=download"

    session = requests.Session()

    response = session.get(URL, params = { 'id' : id }, stream = True)
    token = get_confirm_token(response)

    if token:
        params = { 'id' : id, 'confirm' : token }
        response = session.get(URL, params = params, stream = True)

    save_response_content(response, destination)

In [3]:
# define paths
current_path = os.getcwd()
data_path = os.path.join(current_path, "data")

# make data directory
try:
    os.makedirs(data_path)
except Exception as e:
    pass

In [4]:
# download wider face training data
if not os.path.exists(os.path.join(data_path, "train.zip")) and \
    not os.path.exists(os.path.join(data_path, 'WIDER_train')):
    
    print("downloading ... train.zip -- 1.47 GB")
    download_file_from_google_drive(
        "0B6eKvaijfFUDQUUwd21EckhUbWs", 
        os.path.join(data_path,"train.zip"))

In [5]:
# download wider face validation data
if not os.path.exists(os.path.join(data_path,"val.zip")) and \
    not os.path.exists(os.path.join(data_path, 'WIDER_val')):
    print("downloading ... val.zip -- 362.8 MB")
    download_file_from_google_drive(
        "0B6eKvaijfFUDd3dIRmpvSk8tLUk", 
        os.path.join(data_path,"val.zip"))


In [6]:
# download annotations
url = 'http://mmlab.ie.cuhk.edu.hk/projects/WIDERFace/support/bbx_annotation/wider_face_split.zip'

if not os.path.exists(os.path.join(data_path, 'wider_face_split.zip')) and \
    not os.path.exists(os.path.join(data_path, 'wider_face_split')):
    print("downloading ... wider_face_split.zip -- 3.6 MB")
    r = requests.get(url, allow_redirects = True)
    open(os.path.join(data_path, 'wider_face_split.zip'), 'wb').write(r.content)

In [7]:
# unzip training data
if not os.path.exists(os.path.join(data_path,"WIDER_train")):
    with zipfile.ZipFile(os.path.join(data_path,"train.zip"),"r") as zip_ref:
        zip_ref.extractall(data_path)

In [8]:
# unzip validation data
if not os.path.exists(os.path.join(data_path,"WIDER_val")):
    with zipfile.ZipFile(os.path.join(data_path,"val.zip"),"r") as zip_ref:
        zip_ref.extractall(data_path)

In [9]:
# unzip annotations
if not os.path.exists(os.path.join(data_path,"wider_face_split")):
    with zipfile.ZipFile(os.path.join(data_path,"wider_face_split.zip"),"r") as zip_ref:
        zip_ref.extractall(data_path)

In [10]:
# remove zip files
if os.path.exists(os.path.join(data_path, "wider_face_split.zip")):
    os.remove(os.path.join(data_path, "wider_face_split.zip"))
if os.path.exists(os.path.join(data_path, "train.zip")):
    os.remove(os.path.join(data_path, "train.zip"))
if os.path.exists(os.path.join(data_path, "val.zip")):
    os.remove(os.path.join(data_path, "val.zip"))

In [11]:
train_annotations = os.path.join(data_path, "wider_face_split/wider_face_train_bbx_gt.txt")
val_annotations = os.path.join(data_path, "wider_face_split/wider_face_val_bbx_gt.txt")

In [93]:
def read_annotations(path, size = -1):
    # open file and read each line
    f = open(path, 'r')
    lines = f.readlines()
    # create empty array for training data
    training_data = []
    # iterate over lines
    i = 0
    while i < len(lines):
        # return if enough samples have been found
        if size > 0 and len(training_data) >= size:
            return training_data
        # initialize new picture
        picture = {}
        # picture must start with file path
        assert lines[i].endswith(".jpg\n"), "read fault " + lines[i]
        picture['path'] = lines[i][:-1] # remove \n character
        # next line contains number of faces
        i += 1
        number_of_faces = max(1, int(lines[i]))
        i += 1
        faces = []
        for j in range(i, i + number_of_faces):
            face = {}
            features = lines[j]
            features = features.split(' ')
            x, y, w, h = features[:4]
            face['bb'] = [int(x),int(y), int(x) + int(w), int(y) + int(h)]
            face['blur'] = int(features[4])
            face['expression'] = int(features[5])
            face['illumination'] = int(features[6])
            face['occlusion'] = int(features[7])
            face['pose'] = int(features[8])
            face['invalid'] = int(features[9])
            faces.append(face)

        # increase the counter
        i += number_of_faces
        # add picture to training set
        picture['faces'] = faces
        training_data.append(picture)

    return training_data

In [94]:
train_data = read_annotations(train_annotations)
val_data = read_annotations(val_annotations)

In [95]:
print('Train Samples :', len(train_data))
print('Validation Samples: ', len(val_data))

Train Samples : 12880
Validation Samples:  3226


In [114]:
def intersection_over_union(box, faces):
    
    bbs = np.array([face['bb'] for face in faces], dtype = np.float32)
    box_area = (box[2] - box[0] + 1) * (box[3] - box[1] + 1)
    area = (bbs[:, 2] - bbs[:, 0] + 1) * (bbs[:, 3] - bbs[:, 1] + 1)
    xx1 = np.maximum(box[0], bbs[:, 0])
    yy1 = np.maximum(box[1], bbs[:, 1])
    xx2 = np.minimum(box[2], bbs[:, 2])
    yy2 = np.minimum(box[3], bbs[:, 3])

    # compute the width and height of the bounding box
    w = np.maximum(0, xx2 - xx1 + 1)
    h = np.maximum(0, yy2 - yy1 + 1)

    inter = w * h
    ovr = inter / (box_area + area - inter)
    return ovr

In [119]:
def generate_training_data(pixels, annotations, data_path, im_dir):
    '''
    pixels = 12
    annotations = train_data
    im_dir = 'WIDER_train/images/'
    '''

    directory = os.path.join(data_path, 'raw_' + str(pixels))
    positives = os.path.join(directory, 'pos')
    negatives = os.path.join(directory, 'neg')
    partials = os.path.join(directory, 'part')

    files = []
    # create directories and textfiles
    for dir in [directory, positives, negatives, partials]:
        if not os.path.exists(dir):
            os.mkdir(dir)
        
    for dir in [positives, negatives, partials]:
        if not os.path.exists(os.path.join(dir, str(pixels) + '.txt')):
            f = open(os.path.join(dir, str(pixels) + '.txt'), 'w')
            files.append(f)

    if len(files) == 3:
        pos_file, neg_file, part_file = files

    idx, p_idx, n_idx, d_idx, face_idx = [0]*5


    for anno in annotations:

        img_path = anno['path']
        faces = anno['faces']
        img = cv.imread(os.path.join(data_path, im_dir + img_path))

        if idx % 1000 == 0:
            print(idx, ' images done')

        idx += 1

        height, width, channels = img.shape

        # generate negatives
        num_negs = 0
        while num_negs < 50:
            size = np_rand.randint(40, min(width, height) / 2)
            x = np_rand.randint(0, width - size)
            y = np_rand.randint(0, height - size)
            box = np.array([x, y, x + size, y + size])

            cropped_im = img[y : y + size, x : x + size]
            resized_im = cv.resize(cropped_im, (pixels, pixels), interpolation = cv.INTER_LINEAR)

            if np.max(intersection_over_union(box, faces)) < 0.3:
                save_path = os.path.join(negatives, '%s.jpg' % n_idx)
                neg_file.write(save_path + ' 0\n')
                cv.imwrite(save_path, resized_im)
                n_idx += 1
                num_negs += 1
            
        # print('%s images done, pos: %s part: %s neg: %s' % (idx, p_idx, d_idx, n_idx))

        # generate positives
        for face in faces:
            x1, y1, x2, y2 = face['bb']
            w = x2 - x1 + 1
            h = y2 - y1 + 1

            if max(w,h) < 40 or x1 < 0 or y1 < 0:
                continue

            for i in range(20):
                size = np_rand.randint(int(min(w,h) * 0.8), np.ceil(1.25 * max(w,h)))
                delta_x = np_rand.randint(-w * 0.2, w * 0.2)
                delta_y = np_rand.randint(-h * 0.2, h * 0.2)

                nx1 = int(max(x1 + w / 2 + delta_x - size / 2, 0))
                ny1 = int(max(y1 + h / 2 + delta_y - size / 2, 0))
                nx2 = nx1 + size
                ny2 = ny1 + size

                if nx2 > width or ny2 > height:
                    continue
                box = np.array([nx1, ny1, nx2, ny2])

                offset_x1 = (x1 - nx1) / float(size)
                offset_y1 = (y1 - ny1) / float(size)
                offset_x2 = (x2 - nx2) / float(size)
                offset_y2 = (y2 - ny2) / float(size)

                cropped_im = img[ny1: ny2, nx1: nx2, :]
                resized_im = cv.resize(cropped_im, (pixels, pixels),interpolation=cv.INTER_LINEAR)
                
                i_o_u = intersection_over_union(box, [face])

                if i_o_u >= 0.65:
                    save_path = os.path.join(positives, '%s.jpg' % p_idx)
                    pos_file.write(save_path + 
                        ' 1 %.2f %.2f %.2f %.2f\n' % \
                        (offset_x1, offset_y1, offset_x2, offset_y2))
                    cv.imwrite(save_path, resized_im)
                    p_idx += 1
                elif i_o_u >= 0.4:
                    save_path = os.path.join(partials, '%s.jpg' % d_idx)
                    part_file.write(save_path + 
                        ' -1 %.2f %.2f %.2f %.2f\n' % \
                        (offset_x1, offset_y1, offset_x2, offset_y2))
                    cv.imwrite(save_path, resized_im)
                    d_idx += 1
            face_idx += 1
            # print('%s images done, pos: %s part: %s neg: %s' % (idx, p_idx, d_idx, n_idx))

    for f in files:
        f.close()

In [121]:
def bytes_features(value):
    return tf.train.Feature(byes_list = tf.train.BytesList(value=[value]))

def view_bar(num, total):
    rate = float(num) / total
    rate_num = int(rate * 100) + 1
    r = '\r[%s%s]%d%%' % ("#" * rate_num, " " * (100 - rate_num), rate_num, )
    sys.stdout.write(r)
    sys.stdout.flush()


In [None]:
def generate_tf_records_pnet(data_path):
    
    pixels = 12
    directory = os.path.join(data_path, 'raw_' + str(pixels))

    # read files
    files = []
    for dir in ['pos_%s.txt' % pixels, 'neg_%s.txt' % pixels, 'part_%s' % pixels]:
        with open(os.path.join(directory, dir), 'r') as f:
            files.append(f.readlines())
    pos, neg, part = files

    print('positives \n')

    filename_cls = 'pnet_data_for_cls.tfrecords'
    print('Writing')
    examples = []

    writer = tf.python_io.TFRecordWriter(filename_cls)
    cur_ = 0
    sum_ = len(pos)

    for line in pos:
        view_bar(cur_, sum_)
        cur_ += 1
        words = line.split()
        image_file_name = words[0]
        im = cv.imread(image_file_name)
        h, w, ch = im.shape
        if h is not pixels or w is not pixels:
            im = cv.resize(im, (pixels, pixels))
        im = im.astype('uint8')
        label = np.array([0,1], dtype = 'float32')
        label_raw = label.tostring()
        image_raw = im.tostring()
        example = tf.train.Example(features = tf.train.Features(feature = {
            'label_raw' : bytes_feature(label_raw),
            'image_raw' : bytes_feature(image_raw)}))
        examples.append(example)

    print('negatives \n')
    cur_ = 0
    neg_keep = np_rand.choice(len(neg), size = 1000000, replace = False)
    sum_ = len(neg_keep)
    for i in neg_keep:
        line = neg[i]
        view_bar(cur_, sum_)
        cur_ += 1
        words = line.split()
        image_file_name = words[0]
        im = cv.imread(image_file_name)
        h, w, ch = im.shape

        if h is not pixels or w is not pixels:
            im = cv.resize(im, (pixels, pixels))

        im = im.astype('uint8')
                label = np.array([1, 0], dtype='float32')
        label_raw = label.tostring()
        image_raw = im.tostring()
        example = tf.train.Example(features=tf.train.Features(feature={
            'label_raw': bytes_feature(label_raw),
            'image_raw': bytes_feature(image_raw)}))
        examples.append(example)
    print(len(examples))
    random.shuffle(examples)
    for example in examples:
        writer.write(example.SerializeToString())
    writer.close()

    print('\n'+'pos')
    cur_ = 0
    filename_roi = 'pnet_data_for_bbx.tfrecords'
    print('Writing')
    sum_ = len(pos)
    examples = []
    writer = tf.python_io.TFRecordWriter(filename_roi)
    for line in pos:
        view_bar(cur_, sum_)
        cur_ += 1
        words = line.split()
        image_file_name = words[0]
        im = cv.imread(image_file_name)
        h, w, ch = im.shape
        if is not pixels or w is not pixels:
            im = cv2.resize(im, (pixels, pixels))
        im = im.astype('uint8')
        label = np.array([float(words[2]), float(words[3]),
                          float(words[4]), float(words[5])],
                         dtype='float32')
        label_raw = label.tostring()
        image_raw = im.tostring()
        example = tf.train.Example(features=tf.train.Features(feature={
            'label_raw': bytes_feature(label_raw),
            'image_raw': bytes_feature(image_raw)}))
        examples.append(example)

    print('\n'+'part')
    cur_ = 0
    part_keep = np_rand.choice(len(part), size=300000, replace=False)
    sum_ = len(part_keep)
    for i in part_keep:
        view_bar(cur_, sum_)
        line = part[i]
        cur_ += 1
        words = line.split()
        image_file_name = words[0]
        im = cv.imread(image_file_name)
        h, w, ch = im.shape
        if h is not pixels or is not pixels:
            im = cv2.resize(im, (pixels, pixels))
        im = im.astype('uint8')
        label = np.array([float(words[2]), float(words[3]),
                          float(words[4]), float(words[5])],
                         dtype='float32')
        label_raw = label.tostring()
        image_raw = im.tostring()
        example = tf.train.Example(features=tf.train.Features(feature={
            'label_raw': bytes_feature(label_raw),
            'image_raw': bytes_feature(image_raw)}))
        examples.append(example)
    print(len(examples))
    random.shuffle(examples)
    for example in examples:
        writer.write(example.SerializeToString())
    writer.close()
