Skip to content

Commit

Permalink
[Angelica] Decouple feature extraction from data processing class
Browse files Browse the repository at this point in the history
  • Loading branch information
aperez-rai committed Dec 20, 2017
1 parent d3126de commit 053e4fa
Show file tree
Hide file tree
Showing 6 changed files with 104 additions and 138 deletions.
127 changes: 41 additions & 86 deletions data/dataProcessor.py → data/imageprocessor.py
Original file line number Diff line number Diff line change
@@ -1,142 +1,97 @@
import os, csv, cv2, math, datetime

from skimage import color, io
import numpy as np
from skimage import io
from matplotlib import pyplot as plt
from keras.preprocessing.image import ImageDataGenerator

from feature import Feature

# TODO: Do we need this global?
EMOTION_DIMENSION_COUNT = 4 # emotional dimensions: arousal, valence, expectation, power

class DataProcessor:
"""
Class containing all necessary data preprocessing methods.
"""

def __init__(self):
self.feature_parameters = dict()
self.possible_features = ['hog', 'lbp']
self.required_feature_parameters = dict()
self.required_feature_parameters['hog'] = ['orientations', 'pixels_per_cell', 'cells_per_block']
self.required_feature_parameters['lbp'] = ['radius', 'n_points']

def add_feature(self, feature_type, params):
if feature_type not in self.possible_features:
raise ValueError('Cannot extract specified feature. Use one of: ' + ', '.join(self.possible_features))

if set(params.keys()) != set(self.required_feature_parameters[feature_type]):
raise ValueError(('Expected %s parameters: ' + ', '.join(self.required_feature_parameters[feature_type])) % feature_type)

self.feature_parameters[feature_type] = params
class ImageProcessor:

def get_training_data(self, from_csv, dataset_location, target_image_dims, initial_image_dims=None, label_index=None, image_index=None, vector=True, time_series=True, test_data_percentage=0.20):
def __init__(self, from_csv, datapath, target_dimensions, raw_dimensions, csv_label_col=None, csv_image_col=None):
self.from_csv = from_csv
self.datapath = datapath
self.target_dimensions = target_dimensions
self.raw_dimensions = raw_dimensions
self.csv_label_col = csv_label_col
self.csv_image_col = csv_image_col

if from_csv:
return self.get_training_data_from_csv(dataset_location, initial_image_dims, target_image_dims, label_index, image_index, vector, test_data_percentage)
def get_training_data(self):
if self.from_csv:
return self.get_training_data_from_csv()
else:
if time_series:
return self.get_time_series_image_feature_array_from_directory(dataset_location, target_image_dims, vector)
else:
return self.get_image_feature_array_from_directory(dataset_location, target_image_dims, vector)


def get_image_feature_array_from_directory(self, root_directory, target_image_dims, vector=True):
"""
Extracts features vectors of all images found in root_directory.
:param root_directory: location of image data
:param vector: if true returns features as vectors, otherwise as 2D arrays
:return: numpy array of extracted feature vectors
"""
feature_type_index = 0 if vector else 1
feature = Feature()
features = list()
for sub_directory in os.listdir(root_directory):
images = self.get_image_feature_array_from_directory()
labels = self.get_training_label_array()
return images, labels

def get_image_feature_array_from_directory(self):
images = list()
for sub_directory in os.listdir(self.datapath):
if not sub_directory.startswith('.'):
sub_directory_path = root_directory + '/' + sub_directory
sub_directory_path = self.datapath + '/' + sub_directory
for image_file in os.listdir(sub_directory_path):
if not image_file.startswith('.'):
image_file_path = sub_directory_path + '/' + image_file
features.append(feature.extract_features(target_image_dims, self.feature_parameters, feature_type_index=feature_type_index, image_file=image_file_path))
image = io.imread(image_file_path)
image.resize(self.target_dimensions)
image = color.rgb2gray(image)
images.append(image)
return np.array(images)


return np.array(features)

def get_time_series_image_feature_array_from_directory(self, root_directory, target_image_dims, vector=True):
"""
Extracts features vectors of images found in root_directory and groups them
by time_series batch. Subdirectories of root_directory must contain a single
time series batch.
:param root_directory: location of image data
:param vector: if true returns features as vectors, otherwise as 2D arrays
:return: numpy array of arrays which contain time series batch features
"""
def get_time_series_image_feature_array_from_directory(self, datapath, target_image_dims, vector=True):
feature_type_index = 0 if vector else 1
feature = Feature()
features = list()
for sub_directory in os.listdir(root_directory):
for sub_directory in os.listdir(datapath):
if not sub_directory.startswith('.'):
sub_directory_path = root_directory + '/' + sub_directory
sub_directory_path = datapath + '/' + sub_directory
feature_batch = list()
for image_file in os.listdir(sub_directory_path):
if not image_file.startswith('.'):
image_file_path = sub_directory_path + '/' + image_file
feature_batch.append(feature.extract_features(target_image_dims, self.feature_parameters, feature_type_index=feature_type_index, image_file=image_file_path))

feature_batch.append(feature.extract(target_image_dims, self.feature_parameters, feature_type_index=feature_type_index, image_file=image_file_path))

features.append(feature_batch)

return np.array(features)

def get_training_data_from_csv(self, csv_file_path, image_dims, target_image_dims, label_index=0, image_index=1, vector=True, test_data_percentage=0.20):
"""
Extracts features vectors of all images found in specified csv file.
:param csv_file_path: location of dataset csv file
:param image_dims: dimensions of image
:param label_index: column index of label value
:param image_index: column index of image values
:param vector: if true returns features as vectors, otherwise as 2D arrays
:return: numpy array of extracted feature vectors
"""
def get_training_data_from_csv(self, test_data_percentage=0.2):

print('Extracting training data from csv...')
start = datetime.datetime.now()

feature_type_index = 0 if vector else 1

feature = Feature()
features = list()
images = list()
labels = list()
with open(csv_file_path) as csv_file:
with open(self.datapath) as csv_file:
reader = csv.reader(csv_file, delimiter=',', quotechar='"')

tempCount = 0

for row in reader:
if row[label_index] == 'emotion': continue
if row[self.csv_label_col] == 'emotion': continue

label = [0]*7
label[int(row[label_index])] = 1.0
label[int(row[self.csv_label_col])] = 1.0
labels.append(np.array(label))

image = np.asarray([int(pixel) for pixel in row[image_index].split(' ')], dtype=np.uint8).reshape(image_dims)
image = cv2.resize(image, target_image_dims, interpolation=cv2.INTER_LINEAR)
image_3d = np.array([image, image, image]).reshape((target_image_dims[0], target_image_dims[1], 3))

# image = np.array(feature.extract_features(target_image_dims, self.feature_parameters, feature_type_index=feature_type_index, image_array=image))
image = np.asarray([int(pixel) for pixel in row[self.csv_image_col].split(' ')], dtype=np.uint8).reshape(self.raw_dimensions)
image = cv2.resize(image, self.target_dimensions, interpolation=cv2.INTER_LINEAR)
image_3d = np.array([image, image, image]).reshape((self.target_dimensions[0], self.target_dimensions[1], 3))

# io.imshow(image)
# plt.show()

# image_3d = np.array([image, image, image]).reshape((target_image_dims[0], target_image_dims[1], 3))
features.append(image_3d)
images.append(image_3d)

if tempCount == 9: break # for now only processing 10 images, o/w training will take too long
tempCount += 1


X_test = np.array(features[int(math.ceil(len(features)*(1-test_data_percentage))):len(features)])
X_train = np.array(features[0:int(math.ceil(len(features)*(1-test_data_percentage)))])
X_test = np.array(images[int(math.ceil(len(images)*(1-test_data_percentage))):len(images)])
X_train = np.array(images[0:int(math.ceil(len(images)*(1-test_data_percentage)))])
y_test = np.array(labels[int(math.ceil(len(labels)*(1-test_data_percentage))):len(labels)])
y_train = np.array(labels[0:int(math.ceil(len(labels)*(1-test_data_percentage)))])

Expand Down
File renamed without changes.
4 changes: 2 additions & 2 deletions data/temp.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
import sys
sys.path.append('../feature')
from dataProcessor import DataProcessor
from imageprocessor import ImageProcessor

target_image_dims = (64,64)

d = DataProcessor()
d = ImageProcessor()
root_directory = "../data/cohn_kanade_images"
csv_file_path = "../data/fer2013/fer2013.csv"

Expand Down
33 changes: 0 additions & 33 deletions feature/feature.py

This file was deleted.

42 changes: 42 additions & 0 deletions feature/featureextractor.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
from skimage import color, io
from skimage.feature import hog, local_binary_pattern
import numpy as np


class FeatureExtractor:

def __init__(self, images, return_array=True):
# user-supplied parameters
self.images = images
self.return_array = return_array
self.feature_params = dict()

# feature requirements
self.possible_features = ['hog', 'lbp']
self.required_feature_parameters = dict()
self.required_feature_parameters['hog'] = ['orientations', 'pixels_per_cell', 'cells_per_block']
self.required_feature_parameters['lbp'] = ['radius', 'n_points']

def add_feature(self, feature_type, params):
if feature_type not in self.possible_features:
raise ValueError('Cannot extract specified feature. Use one of: ' + ', '.join(self.possible_features))
if set(params.keys()) != set(self.required_feature_parameters[feature_type]):
raise ValueError(('Expected %s parameters: ' + ', '.join(self.required_feature_parameters[feature_type])) % feature_type)
self.feature_params[feature_type] = params

def extract(self):
features = list()
for image in self.images:
feature = list()
for feature_type in self.feature_params.keys():
feature += list(getattr(self, 'extract_%s_feature' % feature_type)(self.feature_params[feature_type], image)[self.return_array])
features.append(feature)
return np.array(features)

def extract_hog_feature(self, params, image):
feature_vector, hog_image = hog(image, orientations=params['orientations'], pixels_per_cell=params['pixels_per_cell'], cells_per_block=params['cells_per_block'], visualise=True)
return feature_vector, hog_image

def extract_lbp_feature(self, params, image):
feature_image = local_binary_pattern(image, params['n_points'], params['radius'])
return feature_image.flatten(), feature_image
36 changes: 19 additions & 17 deletions neuralnets/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,19 @@
sys.path.append('../feature')
sys.path.append('../data')
sys.path.append('../svr_plus_tdnn') # TODO: temporary
from dataProcessor import DataProcessor
from imageprocessor import ImageProcessor
from transfer_model import TransferModel
from tdnn import TDNN
from regressionModel import RegressionModel
from featureextractor import FeatureExtractor

runInceptionV3 = False
runRegressionPlusTDNN = True
runConvLSTM = False

verbose = True
target_dimensions = (128, 128)
raw_dimensions = (48, 48)

if runInceptionV3:
print('--------------- Inception-V3 Model -------------------')
Expand All @@ -20,15 +23,14 @@

print('Extracting training data...')

target_image_dims = (128,128)

d = DataProcessor()
root_directory = "../data/cohn_kanade_images"
csv_file_path = "../data/fer2013/fer2013.csv"
root_directory = "../data/cohn_kanade_images"
imageProcessor = ImageProcessor(from_csv=True, datapath=csv_file_path, target_dimensions=target_dimensions, raw_dimensions=raw_dimensions, csv_label_col=0, csv_image_col=1)

d.add_feature('hog', {'orientations': 8, 'pixels_per_cell': (4, 4), 'cells_per_block': (1, 1)})
# imageProcessor.add_feature('hog', {'orientations': 8, 'pixels_per_cell': (4, 4), 'cells_per_block': (1, 1)})

X_train, y_train, X_test, y_test = d.get_training_data(from_csv=True, dataset_location=csv_file_path, target_image_dims=target_image_dims, initial_image_dims=(48, 48), label_index=0, image_index=1, vector=False, time_series=False)
X_train, y_train, X_test, y_test = imageProcessor.get_training_data()

print('X_train shape: ' + str(X_train.shape))
print('y_train shape: ' + str(y_train.shape))
Expand All @@ -47,28 +49,28 @@
if runRegressionPlusTDNN:

print('--------------- Regression + TDNN Model -------------------')
print('Extracting features...')
d = DataProcessor()
print('Collecting data...')
root_directory = '../data/cohn_kanade_images'
imageProcessor = ImageProcessor(from_csv=False, datapath=root_directory, target_dimensions=target_dimensions, raw_dimensions=None)
images, labels = imageProcessor.get_training_data()

d.add_feature('hog', {'orientations': 8, 'pixels_per_cell': (16, 16), 'cells_per_block': (1, 1)})
# d.add_feature('lbp', {'n_points': 24, 'radius': 3})

features = d.get_training_data(from_csv=False, dataset_location=root_directory, initial_image_dims=None, target_image_dims=(64, 64), vector=True, time_series=False)

print ('images shape: ' + str(images.shape))
print('Extracting features...')

# if False:
featureExtractor = FeatureExtractor(images, return_array=False)
featureExtractor.add_feature('hog', {'orientations': 8, 'pixels_per_cell': (16, 16), 'cells_per_block': (1, 1)})
# featureExtractor.add_feature('lbp', {'n_points': 24, 'radius': 3})

# TODO: Add label processing to DataProcessor class
labels = d.get_training_label_array()
features = featureExtractor.extract()
print ("features shape: " + str(features.shape))

print('Training regression model...')
model = RegressionModel(features, labels)
model.fit()
predictions = model.predict()

print('Applying time-delay to regression output...')
X_train, y_train, X_test, y_test = d.get_time_delay_training_data(predictions, predictions)
X_train, y_train, X_test, y_test = imageProcessor.get_time_delay_training_data(predictions, predictions)

if verbose:
print ('X_train: ' + str(X_train.shape))
Expand Down

0 comments on commit 053e4fa

Please sign in to comment.