In [1]:
def create_dataset():
    from hops import hdfs
    import tensorflow as tf
    from tensorflow.python import keras
    from tensorflow.python.keras import backend as K
    
    local_path = hdfs.copy_to_local('Resources/mnist.npz')
    
    (x_train, y_train), (x_test, y_test) = tf.keras.datasets.mnist.load_data(path=local_path)
    
    batch_size = 512
    num_classes = 10
    
    
    # Input image dimensions
    img_rows, img_cols = 28, 28
    
    if K.image_data_format() == 'channels_first':
        x_train = x_train.reshape(x_train.shape[0], 1, img_rows, img_cols)
        x_test = x_test.reshape(x_test.shape[0], 1, img_rows, img_cols)
        input_shape = (1, img_rows, img_cols)
    else:
        x_train = x_train.reshape(x_train.shape[0], img_rows, img_cols, 1)
        x_test = x_test.reshape(x_test.shape[0], img_rows, img_cols, 1)
        input_shape = (img_rows, img_cols, 1)

    x_train = x_train.astype('float32')
    x_test = x_test.astype('float32')
    x_train /= 255
    x_test /= 255
    print('x_train shape:', x_train.shape)
    print(x_train.shape[0], 'train samples')
    print(x_test.shape[0], 'test samples')

    # Convert class vectors to binary class matrices
    y_train = keras.utils.to_categorical(y_train, num_classes)
    y_test = keras.utils.to_categorical(y_test, num_classes)
    
    return x_train, y_train, x_test, y_test

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log
96,application_1596125182098_0100,pyspark,idle,Link,Link


SparkSession available as 'spark'.


In [2]:
def base_model_generator():
    
    import tensorflow as tf
    from tensorflow.python.keras.models import Sequential
    from tensorflow.python.keras.layers import Dense, Dropout, Flatten
    from tensorflow.python.keras.layers import Conv2D, MaxPooling2D
    
    kernel = 4
    pool = 7
    dropout = 0.85
    num_classes = 10

    
    model = Sequential()
    model.add(Conv2D(32, kernel_size=(kernel, kernel),
                     activation='relu',
                     input_shape=(28, 28, 1)))
    model.add(Conv2D(64, (kernel, kernel), activation='relu', name='second_conv'))
    model.add(MaxPooling2D(pool_size=(pool, pool)))
    model.add(Dropout(dropout, name='first_dropout'))
    model.add(Flatten())
    model.add(Dense(128, activation='relu', name='dense_layer'))
    model.add(Dropout(dropout, name='second_dropout'))
    model.add(Dense(num_classes, activation='softmax'))
    
    return model

In [3]:
from maggy.ablation import AblationStudy

ablation_study = AblationStudy("mnist", 1, "number",)
ablation_study.set_dataset_generator(create_dataset)
ablation_study.model.set_base_model_generator(base_model_generator)

ablation_study.model.layers.include('second_conv', 'first_dropout', 'dense_layer', 'second_dropout')

ablation_study.model.layers.print_all()

Included single layers are: 

dense_layer
first_dropout
second_dropout
second_conv

In [4]:
def training_fn(dataset_function, model_function):
    
    from tensorflow.python import keras
    import tensorflow as tf
    
    #### enable GPU support for tf v1
    tf.enable_eager_execution()
    gpus = tf.config.experimental.list_physical_devices('GPU')
    if gpus:
      # Restrict TensorFlow to only use the first GPU
      try:
        tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
        tf.config.experimental.set_memory_growth(gpus[0], True)
        logical_gpus = tf.config.experimental.list_logical_devices('GPU')
        print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPU")
      except RuntimeError as e:
        # Visible devices must be set before GPUs have been initialized
        print(e)
    #####

    batch_size = 512
    
    x_train, y_train, x_test, y_test = dataset_function()
    
    model = model_function()

    opt = keras.optimizers.Adadelta(1.0)

    model.compile(loss=keras.losses.categorical_crossentropy,
                  optimizer=opt,
                  metrics=['accuracy'])

    model.fit(x_train, y_train,
              batch_size=batch_size,
              epochs=10,
              verbose=1,
             )
    score = model.evaluate(x_test, y_test, verbose=1)
    print('Test loss:', score[0])
    print('Test accuracy:', score[1])
    
    return score[1]

In [5]:
from maggy import experiment

result = experiment.lagom(map_fun=training_fn, experiment_type='ablation',
                           ablation_study=ablation_study, 
                           ablator='loco', 
                           name='MNIST_LOCO_10_epochs'
                          )

HBox(children=(FloatProgress(value=0.0, description='Maggy experiment', max=5.0, style=ProgressStyle(descripti…

0: 1 Physical GPUs, 1 Logical GPU
0: Started copying hdfs://rpc.namenode.service.consul:8020/Projects/mnist_experiment/Resources/mnist.npz to local disk on path /srv/hops/hopsdata/tmp/nm-local-dir/usercache/uFyNu0LHDiOecDnqtljDXsHQTvTCgSHB68VNua-PzPk/appcache/application_1596125182098_0100/container_e01_1596125182098_0100_01_000002/

0: Finished copying

0: x_train shape: (60000, 28, 28, 1)
0: 60000 train samples
0: 10000 test samples
0: Train on 60000 samples
0: Epoch 1/10
0: Epoch 2/10
0: Epoch 3/10
0: Epoch 4/10
0: Epoch 5/10
0: Epoch 6/10
0: Epoch 7/10
0: Epoch 8/10
0: Epoch 9/10
0: Epoch 10/10
0: Test loss: 0.5315898368835449
0: Test accuracy: 0.9193
0: 1 Physical GPUs, 1 Logical GPU
0: File hdfs://rpc.namenode.service.consul:8020/Projects/mnist_experiment/Resources/mnist.npz is already localized, skipping download...
0: x_train shape: (60000, 28, 28, 1)
0: 60000 train samples
0: 10000 test samples
0: Train on 60000 samples
0: Epoch 1/10
0: Epoch 2/10
0: Epoch 3/10
0: Epoch 4/10
0