In [1]:
#-----------------------------------------------------------------------------------------------------------------
# Script to create custom classification models HybridA described in
# Towards Asteroid Detection in Microlensing Surveys with Deep Learning by Cowan et al.
#-----------------------------------------------------------------------------------------------------------------
import tensorflow as tf
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Input, GlobalAveragePooling2D
from tensorflow.keras.layers import Activation, Dropout, Flatten, Dense, add
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.constraints import max_norm

In [2]:
# Directory paths

# Each data directory is expected to contain 2 directories: 
#  "Yes" containing tracklet images and 
#  "No" containing no-tracklet images

TRAIN_DIR = r"/path_to_training_set/"
VALID_DIR = r"/path_to_validation_set/"
TEST_DIR = r"/path_to_test_set/"

# Path to weight
WEIGHTS_DIR = r"/path_to_weights/"

In [None]:
# Model parameters
epochs = 50
batch_size = 32
image_size = 128  
image_shape = (image_size,image_size,3)

# Loading the data in a compatible format and standardizing values to lie between 0 & 1
train_datagen = ImageDataGenerator(rescale=1./255)
valid_datagen = ImageDataGenerator(rescale=1./255)
test_datagen = ImageDataGenerator(rescale=1./255)

train_generator = train_datagen.flow_from_directory(
    TRAIN_DIR,
    batch_size=batch_size,
    target_size=(image_size, image_size),
    class_mode='binary')

valid_generator = valid_datagen.flow_from_directory(
    VALID_DIR,
    batch_size=batch_size,
    target_size=(image_size, image_size),
    class_mode='binary')

test_generator = test_datagen.flow_from_directory(
    TEST_DIR,
    batch_size=batch_size,
    target_size=(image_size, image_size),
    class_mode='binary',
    shuffle=False)

In [3]:
def hybrid_module(layer_in, f1, bname):
    merge_input = layer_in
    
    # adding identity layer
    if layer_in.shape[-1] != f1:
        merge_input = Conv2D(f1, (1,1), padding='same', activation='relu', kernel_constraint=max_norm(3), bias_constraint=max_norm(3), name=("identity_" + bname))(layer_in)
    
    # 1x1 conv
    conv1 = Conv2D(f1, (1,1), padding='same', activation='relu', kernel_constraint=max_norm(3), bias_constraint=max_norm(3), name="hybrid_1_"+bname)(layer_in)
    
    # 3x3 conv
    conv3 = Conv2D(f1, (1,1), padding='same', activation='relu', kernel_constraint=max_norm(3), bias_constraint=max_norm(3), name="hybrid_3in_"+bname)(layer_in)
    conv3 = Conv2D(f1, (3,3), padding='same', activation='relu', kernel_constraint=max_norm(3), bias_constraint=max_norm(3), name="hybrid_3out_"+bname)(conv3)
    
    # 5x5 conv replaced with 2 3x3
    conv5 = Conv2D(f1, (1,1), padding='same', activation='relu', kernel_constraint=max_norm(3), bias_constraint=max_norm(3), name="hybrid_5in_"+bname)(layer_in)
    conv5 = Conv2D(f1, (3,3), padding='same', activation='relu', kernel_constraint=max_norm(3), bias_constraint=max_norm(3), name="hybrid_5a_out_"+bname)(conv5)
    conv5 = Conv2D(f1, (3,3), padding='same', activation='relu', kernel_constraint=max_norm(3), bias_constraint=max_norm(3), name="hybrid_5b_out_"+bname)(conv5)

    # add filters, assumes filters/channels last
    layer_out = add([conv1, conv3, conv5, merge_input])
    
    # activation function
    layer_out = Activation('relu')(layer_out)
    
    return layer_out

In [4]:
# define model input
visible = Input(shape=image_shape, name="input")

# Layer 1
layer0 = Conv2D(32, (3, 3), activation='relu', kernel_constraint=max_norm(3), bias_constraint=max_norm(3), name="conv1")(visible)
layer0 = BatchNormalization(name="norm1")(layer0)
layer0 = MaxPooling2D(pool_size=(2, 2), name="maxpool1")(layer0)

# hybrid block 1
layer = hybrid_module(layer0, 32, "b1")

# hybrid block 2
layer = hybrid_module(layer, 64, "b2")
layer = MaxPooling2D(pool_size=(2, 2), name="maxpool_b2")(layer)

# hybrid block 3
layer = hybrid_module(layer, 96, "b3")

# hybrid block 4
layer = hybrid_module(layer, 128, "b4")
layer = MaxPooling2D(pool_size=(2, 2), name="maxpool_b4")(layer)

# hybrid block 5
layer = hybrid_module(layer, 160,"b5")

# hybrid block 6
layer = hybrid_module(layer, 192, "b6")
layer = MaxPooling2D(pool_size=(2, 2), name="maxpool_b6")(layer)

# hybrid block 7
layer = hybrid_module(layer, 224, "b7")

# hybrid block 8
layer = hybrid_module(layer, 256, "b8")

# Output layers
avgPool = GlobalAveragePooling2D()(layer)

flatten = Flatten(name="flatten")(avgPool)

output = Dense(128, activation='relu', name="fc1")(flatten)
output = Dropout(0.4)(output)
output = Dense(1, activation='sigmoid', name="output")(output)

# create model
model = Model(inputs=visible, outputs=output, name="HybridA")

# summarize model
model.summary()

Model: "HybridA"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input (InputLayer)              [(None, 128, 128, 3) 0                                            
__________________________________________________________________________________________________
conv1 (Conv2D)                  (None, 126, 126, 32) 896         input[0][0]                      
__________________________________________________________________________________________________
norm1 (BatchNormalization)      (None, 126, 126, 32) 128         conv1[0][0]                      
__________________________________________________________________________________________________
maxpool1 (MaxPooling2D)         (None, 63, 63, 32)   0           norm1[0][0]                      
____________________________________________________________________________________________

In [5]:
# Load weights
model.build(input_shape = (1, 128, 128, 3))
model.load_weights(WEIGHTS_DIR) 

# Compile model (required to make predictions)
model.compile(loss='binary_crossentropy', optimizer='adam', metrics=['accuracy'])

# Evaluate
model.evaluate(test_generator)



[0.6899896860122681, 0.9546789526939392]