In [None]:
%matplotlib inline
import numpy as np
from skimage.io import imread, imsave, imshow
from skimage import exposure
from matplotlib import pyplot as plt
import os
import csv
import cv2
import pandas as pd
plt.rcParams['image.cmap'] = 'gray'

In [None]:
FINAL_IMAGE_SIZE=48
emotion_dict = {0:'Neutral', 1:'Anger', 2:'Disgust', 3:'Fear', 4:'Happy', 5:'Sad', 6:'Surprise'}
cmu_emotion_dict = {0:0, 1:1, 3:2, 4:3, 5:4, 6:5, 7:6}
kaggle_emotion_dict = {0:1, 1:2, 2:3, 3:4, 4:5, 5:6, 6:0}
kdef_emotion_dict = {'AF':3, 'AN':1, 'DI':2, 'HA':4, 'NE':0, 'SA':5, 'SU':6}
jaffe_emotion_dict = {'AN':1, 'DI':2, 'FE':3, 'HA':4, 'NE':0, 'SA':5, 'SU':6}
kdef_code = lambda x: kdef_emotion_dict[x]
jaffe_code = lambda x: jaffe_emotion_dict[x]

In [2]:
class FileCleaner:
    def __init__(self):
        pass
    
    def get_image_filenames(self, root_dir='CMU Data/'):
        """
        Traverse directory structure to retrieve filenames of images
        Collects all images contained in all subdirectores of root_dir
        root_dir: The folder in which to begin the walk
        returns: a list of image file names 
        """
        image_filenames = []
        for dirName, subdirList, fileList in os.walk(root_dir):
            if root_dir == 'CMU Data/':
                fileList = fileList[:4] + fileList[-4:]
            for fname in fileList:
                if '.JPG' in fname or '.png' in fname or '.jpg' in fname or '.tiff' in fname:
                    if (root_dir == 'KDEF/' and fname[-5] == 'S') or root_dir != 'KDEF/':
                        image_filenames.append(dirName + '/' + fname)
        return image_filenames   

    def square_image(self, image, new_size=128, source='CMU'):
        """
        Transforms images from regtangles into squares
        image: the image to be squared
        new_size: the resulting size of the image
        source: the source of the image (for determing offset from center)
        returns: a squared image of the desired size
        """
        if len(image.shape) == 3:
            image = rgb2gray(image)
        height, width = image.shape[0], image.shape[1]
        height_start = (height - new_size)//2
        width_start = (width - new_size)//2

        if source == 'CMU':
            width_start += 20
        if source == 'KDEF':
            height_start += 50
        return image[height_start:height_start+new_size, width_start:width_start+new_size]

    def downsample_image(self, image, source='CMU'):
        """
        Reduce granularity of image
        image: the image to downsample
        source: the source of the image
        returns: a downsampled image (48x48)
        """
        if source=='CMU':
            reshape = 5
        if source=='JAFFE':
            reshape = 4
        if source == 'KDEF':
            reshape=8
        image = image[reshape:-reshape, reshape:-reshape]
        block_size = (reshape,reshape)
        return block_reduce(image, block_size, func=np.mean)

    def generate_num(self, i):
        """
        Generate an 8 digit string number from an int
        """
        num = str(i)
        while(len(num)) < 8:
            num = '0' + num
        return num

    def process_images(self, image_links, new_size=300, source='CMU', 
                       root_dir='CMU Data/', file_extension='CMU Images/'):
        """
        image_links: list of image filenames
        new_size: new size of the images
        source: the source of the images
        root_dir: the directory containing the original images
        file_extension: extension directory to save images
        """
        for i, paths in enumerate(image_links):
            raw_path, final_path = paths[0], paths[1]
            if i == 10:
                break
            try:
                if i % 100 == 0:
                    print(f'Processing image {i}')

                im = imread(raw_path).astype(np.float32)
                im /= np.max(im)
                im = exposure.rescale_intensity(im)
                new_im = square_image(im, new_size=new_size, source=source)
                imshow(new_im)
                plt.show()
                filename = 'All Images/Raw Images/' + file_extension + final_path
                imsave(fname=filename, arr=new_im)
                im = imread(filename)
                print(np.max(im))
                imshow(im)
                plt.show()
            except Exception as e:
                print(f'failed on iteration {i}')
                print(f'filename: {raw_path}')

    def convert_tiff_to_jpg(self, root_dir='jaffe/', end_dir='JPG Images/'):
        """
        Converts images in tiff format to jpg format.
        root_dir: the directory to find images
        end_dir: the dirctory to save images
        """
        image_paths = get_image_filenames(root_dir, single=True)
        for i, path in enumerate(image_paths):
            if i % 100 == 0:
                print(f'iteration {i}')
            try:
                im = tifffile.imread( path ).astype(np.float32)
                if len(im.shape) > 2:
                    im = im[:,:,0]
                im /= np.max(im)

                path_extension = path.split('/')[-1]
                imsave(root_dir + end_dir + path_extension[:-5] + '.jpg', im, cmap='grey')
            except Exception as e:
                print(f'iteration {i}')
                print(f'path: {path}')

    def read_images_from_csv(self, file_paths, filename='Kaggle Data/fer2013.csv'):
        """
        Reads images from a csv file
        file_paths: list of image paths
        filename: the filename of the csv file containing 
                  pixel intensity values
        """
        with open(filename) as csvfile:
            spamreader = csv.reader(csvfile)
            next(spamreader)
            i = 0
            for row in spamreader:
                if i % 1000 == 0:
                    print(f'iteration {i}')
                image = np.array(row[1].strip().split(), dtype=np.float32)
                image = image.reshape((48,48))
                image /= 255
                image /= np.max(image)
                num = generate_num(i)
                filename = 'All Images/Downsampled Images/' + file_paths[i]
                i += 1
            print(i)

    def downsample_images(self, image_paths, base_dir='All Images/Raw Images/', 
                          extension_dir='CMU Images/', source='CMU'):
        """
        Downsample a batch of images. Saves downsampled images.
        base_dir: the base directory to which we save the images
        extension_dir: extension directory to save the images
        source: the source of the images
        """
        for i, path in enumerate(image_paths):
            path = path[1]
            if i % 1000 == 0:
                print(f'iteration {i}')
            try:
                im = imread(base_dir + extension_dir + path).astype(np.float32)
                im /= 255.
                filename = 'All Images/Downsampled Images/' + path
                imsave(fname=filename, arr=downsample_image(im, source=source), cmap='gray')
            except Exception as e:
                print(f'iteration {i}')
                print(f'filename: {path}')

    def find_cmu_image_labels(self, path):
        """
        Determine the label for CMU image
        path: image path
        returns: the emotion label asociated with the image
        """
        path = path.split('/')
        # get image number
        number = int(path[-1][-9:-4])
        if number < 5:
            return 0
        else:
            path[1] = 'Emotion'
            emo_file_dir = '/'.join(path[:-1])
            # find file containing emotion label
            for dirName, subdirList, fileList in os.walk(emo_file_dir):
                if len(fileList) == 0:
                    return 'NONE'
                with open(dirName + '/' + fileList[0]) as f:
                    label = int(float(f.readline().strip()))
                    if label == 2:
                        return 'NONE'
                    return cmu_emotion_dict[label]

    def find_image_labels(self, image_paths, source='CMU'):
        """
        Locate the emotion label for each image
        image_paths: a list of paths to images
        source: the source of the image
        returns: list of emotion labels for the image paths 
        """
        emotion_labels = []
        for path in image_paths:
            if source == 'CMU':
                emotion_labels.append(find_cmu_image_labels(path))
            elif source == 'KDEF':
                code = path.split('/')[-1][4:6]
                emotion_labels.append(kdef_code(code))
            elif source == 'JAFFE':
                code = path.split('/')[-1][3:5]
                emotion_labels.append(jaffe_code(code))
            else:
                emotion_labels.append('NONE')
        return emotion_labels

    def label_images(self, image_paths, source='CMU', filename='All Images/emotion_labels.csv'):
        """
        Label images with their corresponding emotions
        image_paths: a list of paths to original images
        source: the source of the images
        filename: the file to write out the emotion labels
        return: a list of (original image path, updated image path) tuples
        """
        labeled_paths = []
        emotion_labels = find_image_labels(image_paths, source)
        with open(filename, 'a') as f:
            writer = csv.writer(f)
            i = 0
            for path, label in zip(image_paths, emotion_labels):
                if label == 'NONE':
                    continue
                num = generate_num(i)
                final_path = source + '_' + num + '.jpg'
                writer.writerow([final_path, label])
                labeled_paths.append((path, final_path))
                i += 1
        return labeled_paths

    def label_csv_images(self, data_file='Kaggle Data/fer2013.csv', 
                         label_file='All Images/emotion_labels.csv'):
        """
        Label images originally in csv format
        data_file: the file containing the labels for the csv images
        label_file: the file to which I am writing labels
        """
        labels = []
        labeled_paths = []
        with open(data_file, 'r') as csvfile:
            spamreader = csv.reader(csvfile)
            next(spamreader)
            for row in spamreader:
                label = kaggle_emotion_dict[int(row[0])]
                labels.append(label)

        with open(label_file, 'a') as f:
            writer = csv.writer(f)
            i = 0
            for label in labels:
                if label == 'NONE':
                    continue
                num = generate_num(i)
                final_path = 'KAGGLE_' + num + '.jpg'
                writer.writerow([final_path, label])
                labeled_paths.append(final_path)
                i += 1
        return labeled_paths    

In [None]:
sample = 'All Images/Raw Images/CMU Images/CMU_00003169.jpg'
im = imread(sample)
down_im = downsample_image(im, source='CMU')
print(down_im.shape)



In [3]:
class FeatureEngineer():
    def __init__(self):
        pass
    
    def load_dataframe(self, filename='All Images/emotion_labels.csv'):
        """
        Load dataframe from file
        filename: csv containing image information
        returns: dataframe with image features
        """
        image_features = pd.read_csv(filename)
        image_features.columns = ['Filename', 'Emotion']
        return image_features

    def edge_detection(self, image_filenames):
        """
        Detects edges in images with 4 methods
        image_filenames: list of image filenames
        returns: laplacian, sobel_x, sobel_y and sobel_comb images
        """
        laplacian = []
        sobel_x = []
        sobel_y = []
        sobel_comb = []
        for i, filename in enumerate(image_filenames):
            if i % 1000 == 0:
                print(f'iteration {i}')
            img = cv2.imread('All Images/Downsampled Images/' + filename,0)

            laplacian_img = cv2.Laplacian(img,cv2.CV_64F)
            sobel_x_img = cv2.Sobel(img,cv2.CV_64F,1,0,ksize=5)
            sobel_y_img = cv2.Sobel(img,cv2.CV_64F,0,1,ksize=5)
            sobel_comb_img = (sobel_x_img + sobel_y_img)/2.

            laplacian_img += abs(np.min(laplacian_img))
            laplacian_img /= np.max(laplacian_img)

            sobel_x_img += abs(np.min(sobel_x_img))
            sobel_x_img /= np.max(sobel_x_img)

            sobel_y_img += abs(np.min(sobel_y_img))
            sobel_y_img /= np.max(sobel_y_img)

            sobel_comb_img += abs(np.min(sobel_comb_img))
            sobel_comb_img /= np.max(sobel_comb_img)

            imsave('All Images/Laplacian Images/' + filename, arr=laplacian_img)
            imsave('All Images/SobelX Images/' + filename, arr=sobel_x_img)
            imsave('All Images/SobelY Images/' + filename, arr=sobel_y_img)
            imsave('All Images/SobelComb Images/' + filename, arr=sobel_comb_img)

            laplacian.append('Laplacian Images/' + filename)
            sobel_x.append('SobelX Images/' + filename)
            sobel_y.append('SobelY Images/' + filename)
            sobel_comb.append('SobelComb Images/' + filename)
        return laplacian, sobel_x, sobel_y, sobel_comb

    def feature_detection(self, image_filenames):
        """
        Detects strong corners in images
        image_filenames: list of image filenames
        returns: list of corner coordinates
        """
        corner_coordinates = []
        for i, filename in enumerate(image_filenames):
            if i % 1000 == 0:
                print(f'iteration {i}')
            img = cv2.imread('All Images/Downsampled Images/' + filename,0)
            corners = cv2.goodFeaturesToTrack(img, minDistance=10, qualityLevel=0.01,maxCorners=10)

            if corners is None:
                print(filename)
                corner_coordinates.append('NA')
            else:
                corners = np.int0(corners)
                corners.reshape(corners.shape[0],2)
                corner_coordinates.append(str(corners))
        return corner_coordinates

    def get_image_sources(self, df):
        """
        Get source of images
        df: dataframe containing image filenames
        returns: list of image sources
        """
        image_sources = []
        fileList = df['Filename']
        image_types = ['CMU', 'JAFFE', 'KDEF', 'KAGGLE']
        for file in fileList:
            for image_type in image_types:
                if image_type in file:
                    image_sources.append(image_type)
                    continue
        return image_sources

    def get_image_emotion_str(self, df):
        """
        Get string representation of image emotions
        df: dataframe containing integer representation of emotions
        """
        emotion_strs = []
        emotion_ints = df['Emotion']
        file_list = df['Filename']
        for emotion, file in zip(emotion_ints, file_list):
            try:
                emotion_strs.append(emotion_dict[int(emotion)])
            except Exception as e:
                print(file)
                im = imread('All Images/Downsampled Images/' + file)
                imshow(im)
                plt.show()
        return emotion_strs

In [None]:
file_cleaner = FileCleaner()

# Create file with all labels
file_cleaner.convert_tiff_to_jpg(root_dir='jaffe/', end_dir='JPG Images/')
cmu_final_paths = file_cleaner.label_images(get_image_filenames(root_dir='CMU Data/'), source='CMU')
jaffe_final_paths = file_cleaner.label_images(get_image_filenames(root_dir='jaffe/JPG Images/'), source='JAFFE')
kdef_final_paths = file_cleaner.label_images(get_image_filenames(root_dir='KDEF/'), source='KDEF')
kaggle_final_paths = file_cleaner.label_csv_images()

# Filter images to remove duplicates, and keep only images with labels
file_cleaner.process_images(cmu_final_paths, new_size=250, source='CMU', file_extension='CMU Images/')
file_cleaner.process_images(jaffe_final_paths, new_size=200, source='JAFFE', root_dir='jaffe/JPG Images', file_extension='JAFFE Images/')
file_cleaner.process_images(kdef_final_paths, new_size=400, source='KDEF', root_dir='KDEF/', file_extension='KDEF Images/')
read_images_from_csv(kaggle_final_paths, filename='Kaggle Data/fer2013.csv')

# Downsampling images to increase size of dataset
downsample_images(cmu_final_paths, extension_dir='CMU Images/', source='CMU')
downsample_images(jaffe_final_paths, extension_dir='JAFFE Images/', source='JAFFE')
downsample_images(kdef_final_paths, extension_dir='KDEF Images/', source='KDEF')

In [None]:
feature_engineer = FetureEngineer()

# enhance image feature dataframe with new columns
image_features = pd.DataFrame.from_csv('image_features.csv')
image_features = feature_engineer.load_dataframe()
laplacian, sobel_x, sobel_y, sobel_comb = feature_engineer.edge_detection(image_features['Filename'])
corners = feature_engineer.feature_detection(image_features['Filename'])
sources = feature_engineer.get_image_sources(image_features)
emotion_strs = feature_engineer.get_image_emotion_str(image_features)

image_features['Source'] = sources
image_features['EmotionStr'] = emotion_strs
image_features['Laplacian'] = laplacian
image_features['SobelX'] = sobel_x
image_features['SobelY'] = sobel_y
image_features['SobelComb'] = sobel_comb
image_features['Corners'] = corners

image_features = image_features.dropna()
image_features.to_csv(path_or_buf='image_features.csv')