In [1]:
import boto3
import io
import re
import numpy as np
import os
import xml.etree.ElementTree as ET 
from google.cloud import vision
import requests
from nltk.corpus import stopwords as stpword
from nltk.stem import WordNetLemmatizer
import json
from nltk.corpus import stopwords as stpword
from nltk.stem import WordNetLemmatizer
from utilities.image_description_scoring import matching_label_score
from time import time
from utilities.pythonDB import writeToDB_label, recordsExists_label

In [2]:
stopwords = set(stpword.words('english'))
wordNetLemma = WordNetLemmatizer()

In [3]:
def extract_clean_list(original_list):
    cleaned = list(set([wordNetLemma.lemmatize(obj_item.lower().strip()) for item in original_list for obj_item in item.split() if obj_item not in stopwords]))
    return [item.strip() for item in [re.sub('[^a-zA-Z0-9]+', ' ', item, flags=re.UNICODE) for item in cleaned]]

def get_mapped(path):
    key_map, dict_idx = {}, 0
    for r, _, f in os.walk(path):
        for file in f:
            if '.JPG' in file or '.jpg' in file:
                key_map[dict_idx] = file
                dict_idx += 1
    idxs = list(key_map.keys())          
    shuffled_idxs = np.random.randint(0, len(idxs), len(idxs))
    return key_map, shuffled_idxs

def get_captions(base_path, annotation_file):
    with open(os.path.join(base_path, annotation_file)) as json_file:
        caption_file = json.load(json_file)
    captions, mapped_annotation = {}, {}

    for annotation in caption_file['annotations']:
        mapped_annotation[annotation['image_id']] = annotation['caption']

    for image in caption_file['images']:
        captions[image['file_name']] = mapped_annotation.get(image['id'])
    return captions

In [6]:
class DetectLabels():
    def __init__(self, base, file, max_labels=30):
        
        with io.open(os.path.join(base, file), 'rb') as image_file:
            content = (image_file.read())
        self.content = content
        self.base, self.actual_str, self.detected_str = base, '', '' 
        self.file = file.split('_')[-1]
        
        self.label = ''
        self.start = time()
        self.max_labels = max_labels
        
    def return_function(self, name):
        dataset = self.base.split('/')[1]
        if recordsExists_label(self.file, dataset, name):
            return
        
        status, score =  getattr(self, 'if_' + name)()
        compute_time = time() - self.start
        self.label = name + '-noisy'
        
        score1, score2, score3 = score[0], score[1], score[2]
        
        bag = (self.file, dataset, self.label, status, score1, score2, score3, compute_time, self.actual_str, self.detected_str)
        print (bag)
        writeToDB_label(bag)
        
    def compute_ground_truth(self):
        ''' Status = {-1:'API Error', -2: 'Ground Truth Empty', 0: 'All Correct'}, Return = Ground Truth, Status
        '''
        ground_truth_str = captions.get(self.file)
        
        if ground_truth_str is None or len(ground_truth_str) == 0:
            return '', -2
        else:
            ground_truth_str = extract_clean_list(ground_truth_str.lower().split(' '))
            self.actual_str = ground_truth_str
            return ground_truth_str, 0
            
    def if_aws(self):
        ground_truth, xml_status = self.compute_ground_truth()
        if xml_status != 0: return xml_status, 0
        
        try:
            imgobj = {'Bytes': self.content}
            client=boto3.client('rekognition', region_name='us-east-1')
            response=client.detect_labels(Image=imgobj, MaxLabels=self.max_labels)
            detected_label = [label['Name'] for label in response['Labels']]
            score1 = matching_label_score(detected_label, ground_truth, 1)
            score2 = matching_label_score(detected_label, ground_truth, 2)
            score3 = matching_label_score(detected_label, ground_truth, 3)
            score = [score1, score2, score3]
            
            self.detected_str = detected_label
            return 0, score
        except:
            return -1, 0

In [8]:
annotation_dir_path = 'datasets/image_labeling/annotations'
train_dir_path = 'datasets/image_labeling/noises'
captions_validation = get_captions(annotation_dir_path, 'captions_val2017.json')
captions = get_captions(annotation_dir_path, 'captions_train2017.json')
captions.update(captions_validation)
del captions_validation
dict_files, files_idx = get_mapped(train_dir_path)
shuffled_idx = np.random.randint(0, len(files_idx), len(files_idx))[:1]

for idx in shuffled_idx:
    file_name = dict_files.get(idx)
    print ("Iterating for File Name = {}".format(file_name))
    DetectLabels(train_dir_path, file_name).return_function('aws')

Iterating for File Name = add_impulse_noise_2_000000405530.jpg
('000000405530.jpg', 'image_labeling', 'aws-noisy', 0, 1, 1, 1, 0.7973294258117676, ['man', 'sitting', 'statue', 'bench'], ['Water', 'Outdoors', 'Nature', 'Human', 'Person', 'Waterfront', 'Sundial', 'Pier', 'Port', 'Dock', 'Hook', 'Sea', 'Ocean', 'Landscape'])
