In [None]:
import os
import numpy as np
import pandas as pd
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow.keras import Input
from tensorflow.keras import Model, Sequential
from tensorflow.keras.layers import Reshape, Dense, Dropout, Flatten, LeakyReLU, Conv2D, MaxPooling2D
from tensorflow.keras.optimizers import Adam
import PrivacyGAN as pg
import warnings

print(tf.__version__)

#https://github.com/keras-team/keras/wiki/Keras-2.0-release-notes
#https://stackoverflow.com/questions/60289143/migrating-code-to-tensorflow-2-0-gives-invalid-argument-error-default-maxpoolin
tf.keras.backend.set_image_data_format("channels_last")

In [None]:
# Load MNIST data
(X_train, y_train), (X_test, y_test) = tf.keras.datasets.mnist.load_data()
X_train = (X_train.astype(np.float32) - 127.5)/127.5
X_test = (X_test.astype(np.float32) - 127.5)/127.5
X_train = X_train.reshape(X_train.shape[0], X_train.shape[1]*X_train.shape[2])
X_test = X_test.reshape(X_test.shape[0], X_test.shape[1]*X_test.shape[2])
print(X_train.shape)
print(X_test.shape)

NUM_CLASSES = 10

In [None]:
#generate simple synthetic images of same size as X_train with same balance
X_c = []
y_c = []

for i in range(NUM_CLASSES):
    print(i)
    In = np.where(y_train==i)
    X = X_train[In]
    tf.keras.backend.clear_session()
    optim = Adam(lr=0.0002, beta_1=0.5)
    gen = pg.MNIST_Generator(optim=optim)
    dis = pg.MNIST_Discriminator(optim=optim)
    
    #learn generator per digit 
    (generator, _, _, _) = pg.SimpGAN(X, generator = gen, discriminator = dis, 
                                      optim = optim, 
                                      epochs = 200, batchSize = 256)
    
    noise = np.random.normal(0, 1, size=[len(X), 100])
    X_c += [generator.predict(noise)]
    y_c += [i]*len(X)
    
X_c = np.concatenate(X_c)    
y_c = np.array(y_c)


## Shuffle labels around
arr = np.arange(len(X_c))
np.random.shuffle(arr)
X_c = X_c[arr]
y_c = y_c[arr]

In [None]:
#train CNN model
tf.keras.backend.clear_session()
tf.keras.utils.to_categorical
y_tr = tf.keras.utils.to_categorical(y_c, NUM_CLASSES)
y_t = tf.keras.utils.to_categorical(y_test, NUM_CLASSES)



x_train = X_c.reshape(X_c.shape[0], 28, 28, 1)
x_test = X_test.reshape(X_test.shape[0], 28, 28, 1)

model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3),
                 activation='relu',
                 input_shape=(28,28, 1)))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(tf.keras.layers.MaxPooling2D(pool_size=(2, 2), data_format = 'channels_last'))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(10, activation='softmax'))

model.compile(loss=tf.keras.losses.categorical_crossentropy,
              optimizer=tf.keras.optimizers.Adadelta(learning_rate=1.0, rho=0.95, epsilon=1e-06),
              metrics=['accuracy'])

model.fit(x_train, y_tr,
          batch_size=256,
          epochs=25,
          verbose=1,
          validation_data=(x_test, y_t))
score = model.evaluate(x_test, y_t, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])

In [None]:
#Classification accuracy on test set
score = model.evaluate(x_test, y_t, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])
r_0 = [score[0],score[1]]

In [None]:
#train CNN model
tf.keras.backend.clear_session()
y_tr = tf.keras.utils.to_categorical(y_train, NUM_CLASSES)
y_t = tf.keras.utils.to_categorical(y_test, NUM_CLASSES)

x_train = X_train.reshape(X_c.shape[0], 28, 28, 1)
x_test = X_test.reshape(X_test.shape[0], 28, 28, 1)

model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3),
                 activation='relu',
                 input_shape=(28,28, 1)))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
# 10 classes so a dense layer of 10
model.add(Dense(10, activation='softmax'))

model.compile(loss=tf.keras.losses.categorical_crossentropy,
              optimizer=tf.keras.optimizers.Adadelta(learning_rate=1.0, rho=0.95, epsilon=1e-06),
              metrics=['accuracy'])

model.fit(x_train, y_tr,
          batch_size=256,
          epochs=25,
          verbose=1,
          validation_data=(x_test, y_t))

In [None]:
#Classification accuracy on test set
score = model.evaluate(x_test, y_t, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])
r_1 = [score[0],score[1]]

In [None]:
#generate simple synthetic images of same size as X_train with same balance
X_c2 = []
y_c2 = []

for i in range(NUM_CLASSES):
    print(i)
    In = np.where(y_train==i)
    X = X_train[In]
    tf.keras.backend.clear_session()
    optim = Adam(lr=0.0002, beta_1=0.5)
    generators = [pg.MNIST_Generator(optim = Adam(lr=0.0002, beta_1=0.5)),
                  pg.MNIST_Generator(optim = Adam(lr=0.0002, beta_1=0.5))]
    discriminators = [pg.MNIST_Discriminator(optim = Adam(lr=0.0002, beta_1=0.5))
                      ,pg.MNIST_Discriminator(optim = Adam(lr=0.0002, beta_1=0.5))]
    pDisc = pg.MNIST_DiscriminatorPrivate(OutSize = 2, 
                                          optim = Adam(lr=0.0002, beta_1=0.5))
    
    (generators, _, _, _, _, _)= pg.privGAN(X, epochs = 200, 
                                                                               disc_epochs=50,
                                                                               batchSize=256,
                                                                               generators = generators, 
                                                                               discriminators = discriminators,
                                                                               pDisc = pDisc,
                                                                               optim = optim,
                                                                               privacy_ratio = 1.0)    
    
    noise1 = np.random.normal(0, 1, size=[len(X)//2, 100])
    noise2 = np.random.normal(0, 1, size=[len(X)//2, 100])
    X_c2 += [generators[0].predict(noise1)]
    X_c2 += [generators[1].predict(noise2)]
    y_c2 += [i]*(len(noise1) + len(noise2))
    
X_c2 = np.concatenate(X_c2)    
y_c2 = np.array(y_c2)

In [None]:
## Shuffle labels around
arr = np.arange(len(X_c2))
np.random.shuffle(arr)
X_c2 = X_c2[arr]
y_c2 = y_c2[arr]

In [None]:
#train CNN model
tf.keras.backend.clear_session()
y_tr = tf.keras.utils.to_categorical(y_c2, NUM_CLASSES)
y_t = tf.keras.utils.to_categorical(y_test, NUM_CLASSES)

x_train = X_c2.reshape(X_c2.shape[0], 28, 28, 1)
x_test = X_test.reshape(X_test.shape[0], 28, 28, 1)

model = Sequential()
model.add(Conv2D(32, kernel_size=(3, 3),
                 activation='relu',
                 input_shape=(28,28, 1)))
model.add(Conv2D(64, (3, 3), activation='relu'))
model.add(MaxPooling2D(pool_size=(2, 2)))
model.add(Dropout(0.25))
model.add(Flatten())
model.add(Dense(128, activation='relu'))
model.add(Dropout(0.5))
model.add(Dense(10, activation='softmax'))

model.compile(loss=tf.keras.losses.categorical_crossentropy,
              optimizer=tf.keras.optimizers.Adadelta(learning_rate=1.0, rho=0.95, epsilon=1e-06),
              metrics=['accuracy'])

model.fit(x_train, y_tr,
          batch_size=256,
          epochs=25,
          verbose=1,
          validation_data=(x_test, y_t))

In [None]:
score = model.evaluate(x_test, y_t, verbose=0)
print('Test loss:', score[0])
print('Test accuracy:', score[1])
r_2 = [score[0],score[1]]

In [None]:
plt.bar([0,1,2],[r_1[1],r_0[1],r_2[1]])
plt.xticks([0,1,2],['Real','GAN','privGAN (1.0)'], rotation=45)
plt.ylabel('Accuracy')