In [83]:
'''
Built on fMoW Baseline: https://github.com/fMoW/baseline
'''
import hops.hdfs as hdfs
import cv2
import json
import os
import errno
import numpy as np
import string
import dateutil.parser as dparser
#from PIL import Image
#from sklearn.utils import class_weight
from keras.preprocessing import image
from concurrent.futures import ProcessPoolExecutor
from functools import partial
from tqdm import tqdm
import warnings

In [88]:
import os
from multiprocessing import cpu_count

params = {
    "num_workers":  cpu_count(),

    "use_metadata":  True,

    "batch_size_cnn":  128,
    "batch_size_lstm":  512,
    "batch_size_eval":  128,
    "metadata_length":  45,
    "num_channels":  3,
    "cnn_last_layer_length":  4096,
    "cnn_lstm_layer_length":  2208,

    "target_img_size":  (224,224),

    "image_format":  'jpg',

    "train_cnn":  False,
    "generate_cnn_codes":  False,
    "train_lstm":  False,
    "test_cnn":  False,
    "test_lstm":  False,

    #LEARNING PARAMS,
    "cnn_adam_learning_rate":  1e-4,
    "cnn_adam_loss":  'categorical_crossentropy',
    "cnn_epochs":  50,

    "lstm_adam_learning_rate":  1e-4,
    "lstm_epochs":  100,
    "lstm_loss":  'categorical_crossentropy',
}

#DIRECTORIES AND FILES,
params["directories"] = dict()
params["directories"]['dataset'] = '../../fmow_dataset'
params["directories"]['input'] = os.path.join('..', 'data', 'input')
params["directories"]['output'] = os.path.join('..', 'data', 'output')
params["directories"]['working'] = os.path.join('..', 'data', 'working')
params["directories"]['train_data'] = os.path.join(params["directories"]['input'], 'train_data')
params["directories"]['test_data'] = os.path.join(params["directories"]['input'], 'test_data')
params["directories"]['cnn_models'] = os.path.join(params["directories"]['working'], 'cnn_models')
params["directories"]['lstm_models'] = os.path.join(params["directories"]['working'], 'lstm_models')
params["directories"]['predictions'] = os.path.join(params["directories"]['output'], 'predictions')
params["directories"]['cnn_checkpoint_weights'] = os.path.join(params["directories"]['working'], 'cnn_checkpoint_weights')
params["directories"]['lstm_checkpoint_weights'] = os.path.join(params["directories"]['working'], 'lstm_checkpoint_weights')
params["directories"]['cnn_codes'] = os.path.join(params["directories"]['working'], 'cnn_codes')

params["files"] = {}
params["files"]['training_struct'] = os.path.join(params["directories"]['working'], 'training_struct.json')
params["files"]['test_struct'] = os.path.join(params["directories"]['working'], 'test_struct.json')
params["files"]['dataset_stats'] = os.path.join(params["directories"]['working'], 'dataset_stats.json')
params["files"]['class_weight'] = os.path.join(params["directories"]['working'], 'class_weights.json')



params["category_names"] = ['false_detection', 'airport', 'airport_hangar', 'airport_terminal', 'amusement_park', 'aquaculture', 'archaeological_site', 'barn', 'border_checkpoint', 'burial_site', 'car_dealership', 'construction_site', 'crop_field', 'dam', 'debris_or_rubble', 'educational_institution', 'electric_substation', 'factory_or_powerplant', 'fire_station', 'flooded_road', 'fountain', 'gas_station', 'golf_course', 'ground_transportation_station', 'helipad', 'hospital', 'interchange', 'lake_or_pond', 'lighthouse', 'military_facility', 'multi-unit_residential', 'nuclear_powerplant', 'office_building', 'oil_or_gas_facility', 'park', 'parking_lot_or_garage', 'place_of_worship', 'police_station', 'port', 'prison', 'race_track', 'railway_bridge', 'recreational_facility', 'impoverished_settlement', 'road_bridge', 'runway', 'shipyard', 'shopping_mall', 'single-unit_residential', 'smokestack', 'solar_farm', 'space_facility', 'stadium', 'storage_tank','surface_mine', 'swimming_pool', 'toll_booth', 'tower', 'tunnel_opening', 'waste_disposal', 'water_treatment_facility', 'wind_farm', 'zoo'],

params["num_labels"] = len(params["category_names"]),
#for directory in params["directories"].values():
#    if not os.path.isdir(directory):
#        os.makedirs(directory)

In [68]:
def prepare_data(params):
    """
    Saves sub images, converts metadata to feature vectors and saves in JSON files, 
    calculates dataset statistics, and keeps track of saved files so they can be loaded as batches
    while training the CNN.
    :param params: global parameters, used to find location of the dataset and json file
    :return: 
    """

    # suppress decompression bomb warnings for Pillow
    warnings.simplefilter('ignore', Image.DecompressionBombWarning)

    walkDirs = ['train', 'val', 'test']

    executor = ProcessPoolExecutor(max_workers=params.num_workers)
    futures = []
    paramsDict = vars(params)
    keysToKeep = ['image_format', 'target_img_size', 'metadata_length', 'category_names']
    paramsDict = {keepKey: paramsDict[keepKey] for keepKey in keysToKeep}
    
    for currDir in walkDirs:
        isTrain = (currDir == 'train') or (currDir == 'val')
        if isTrain:
            outDir = params.directories['train_data']
        else:
            outDir = params.directories['test_data']

        print('Queuing sequences in: ' + currDir)
        #for root, dirs, files in tqdm(os.walk(os.path.join(params.directories['dataset'], currDir))):
        for file in hdfs.get_fs().walk(os.path.join(params.directories['dataset'], currDir)):
            if len(files) > 0:
                slashes = [i for i,ltr in enumerate(root) if ltr == '/']
                        
            for file in files:
                if file.endswith('_rgb.json'): #skip _msrgb images
                    task = partial(_process_file, file, slashes, root, isTrain, outDir, paramsDict)
                    futures.append(executor.submit(task))
    '''
    print('Wait for all preprocessing tasks to complete...')
    results = []
    [results.extend(future.result()) for future in tqdm(futures)]
    allTrainFeatures = [np.array(r[0]) for r in results if r[0] is not None]
    
    metadataTrainSum = np.zeros(params.metadata_length)bcpu
    for features in allTrainFeatures:
        metadataTrainSum += features

    trainingData = [r[1] for r in results if r[1] is not None]
    trainCount = len(trainingData)
    testData = [r[2] for r in results if r[2] is not None]

    # Shutdown the executor and free resources
    executor.shutdown()

    metadataMean = metadataTrainSum / trainCount
    metadataMax = np.zeros(params.metadata_length)
    for currFeat in allTrainFeatures:
        currFeat = currFeat - metadataMean
        for i in range(params.metadata_length):
            if abs(currFeat[i]) > metadataMax[i]:
                metadataMax[i] = abs(currFeat[i])
    for i in range(params.metadata_length):
        if metadataMax[i] == 0:
            metadataMax[i] = 1.0
    metadataStats = {}
    metadataStats['metadata_mean'] = metadataMean.tolist()
    metadataStats['metadata_max'] = metadataMax.tolist()
    json.dump(testData, open(params.files['test_struct'], 'w'))
    json.dump(trainingData, open(params.files['training_struct'], 'w'))
    json.dump(metadataStats, open(params.files['dataset_stats'], 'w'))    '''

Queuing sequences in: train


NameError: global name 'params' is not defined

In [None]:


prepare_data()



In [55]:
def _process_file(file, slashes, root, isTrain, outDir, params):
    """
    Helper for prepare_data that actually loads and resizes each image and computes
    feature vectors. This function is designed to be called in parallel for each file
    :param file: file to process
    :param slashes: location of slashes from root walk path
    :param root: root walk path
    :param isTrain: flag on whether or not the current file is from the train set
    :param outDir: output directory for processed data
    :param params: dict of the global parameters with only the necessary fields
    :return (allFeatures, allTrainResults, allTestResults)
    """
    noResult = [(None, None, None)]
    baseName = file[:-5]

    imgFile = baseName + '.' + params['image_format']

    if not os.path.isfile(os.path.join(root, imgFile)):
        return noResult

    try:
        img = image.load_img(os.path.join(root, imgFile))
        img = image.img_to_array(img)
    except:
        return noResult

    jsonData = json.load(open(os.path.join(root, file)))
    if not isinstance(jsonData['bounding_boxes'], list):
        jsonData['bounding_boxes'] = [jsonData['bounding_boxes']]

    allResults = []
    for bb in jsonData['bounding_boxes']:
        if isTrain:
            category = bb['category']
        box = bb['box']

        outBaseName = '%d' % bb['ID']
        if isTrain:
            outBaseName = ('%s_' % category) + outBaseName

        if isTrain:
            currOut = os.path.join(outDir, root[slashes[-3] + 1:], outBaseName)
        else:
            currOut = os.path.join(outDir, root[slashes[-2] + 1:], outBaseName)

        if not os.path.isdir(currOut):
            try:
                os.makedirs(currOut)
            except OSError as e:
                if e.errno == errno.EEXIST:
                    pass

        featuresPath = os.path.join(currOut, baseName + '_features.json')
        imgPath = os.path.join(currOut, imgFile)

        # don't train on tiny boxes
        if box[2] <= 2 or box[3] <= 2:
            continue

        # train with context around box
        
        contextMultWidth = 0.15
        contextMultHeight = 0.15
        
        wRatio = float(box[2]) / img.shape[0]
        hRatio = float(box[3]) / img.shape[1]
        
        if wRatio < 0.5 and wRatio >= 0.4:
            contextMultWidth = 0.2
        if wRatio < 0.4 and wRatio >= 0.3:
            contextMultWidth = 0.3
        if wRatio < 0.3 and wRatio >= 0.2:
            contextMultWidth = 0.5
        if wRatio < 0.2 and wRatio >= 0.1:
            contextMultWidth = 1
        if wRatio < 0.1:
            contextMultWidth = 2
            
        if hRatio < 0.5 and hRatio >= 0.4:
            contextMultHeight = 0.2
        if hRatio < 0.4 and hRatio >= 0.3:
            contextMultHeight = 0.3
        if hRatio < 0.3 and hRatio >= 0.2:
            contextMultHeight = 0.5
        if hRatio < 0.2 and hRatio >= 0.1:
            contextMultHeight = 1
        if hRatio < 0.1:
            contextMultHeight = 2
        
        
        widthBuffer = int((box[2] * contextMultWidth) / 2.0)
        heightBuffer = int((box[3] * contextMultHeight) / 2.0)

        r1 = box[1] - heightBuffer
        r2 = box[1] + box[3] + heightBuffer
        c1 = box[0] - widthBuffer
        c2 = box[0] + box[2] + widthBuffer

        if r1 < 0:
            r1 = 0
        if r2 > img.shape[0]:
            r2 = img.shape[0]
        if c1 < 0:
            c1 = 0
        if c2 > img.shape[1]:
            c2 = img.shape[1]

        if r1 >= r2 or c1 >= c2:
            continue

        subImg = img[r1:r2, c1:c2, :]
        subImg = image.array_to_img(subImg)
        subImg = subImg.resize(params['target_img_size'])
        subImg.save(imgPath)

        features = json_to_feature_vector(params, jsonData, bb)
        features = features.tolist()

        json.dump(features, open(featuresPath, 'w'))
        

        if isTrain:
            allResults.append((features, {"features_path": featuresPath, "img_path": imgPath, "category": params['category_names'].index(category)}, None))
        else:
            allResults.append((None, None, {"features_path": featuresPath, "img_path": imgPath}))

    return allResults

In [None]:
train_folder = "hdfs:///Projects/labs/fMoW/train"

count = 0

files = hdfs.get_fs().walk(train_folder)

for file in hdfs.get_fs().walk(train_folder):
    if file['name'].endswith('_rgb.jpg'):
        count = count + 1
        if (count % 1000 == 0):
            print(count)
    
with hdfs.hdfs.open(temp_img, "r") as curr_file:
    img_data = curr_file.read()
    img = cv2.imdecode(np.fromstring(img_data, dtype=np.uint8), -1)
    print(img.shape)