In [1]:
from pathlib import Path
import tensorflow as tf

AUTOTUNE = tf.data.experimental.AUTOTUNE

class PathTF:
    def __init__(self, path_list):
        self.path_ds = tf.data.Dataset.from_tensor_slices(path_list)
        
    def __len__(self):
        return len(self.path_ds)
    
    def split_iter(self):
        n = len(self)
        batch_size = int(tf.math.ceil(n/3))
        # 打乱
        path_ds = self.path_ds.shuffle(n)
        return path_ds.batch(batch_size)

def preprocess_image(image, width=224, height=224):
    image = tf.image.decode_jpeg(image, channels=3)
    image = tf.image.resize(image, [width, height])
    image /= 255.0  # normalize to [0,1] range
    return image

def load_and_preprocess_image(path):
    image = tf.io.read_file(path)
    return preprocess_image(image)

class Dataset:
    def __init__(self, left_paths, right_paths):
        left = PathTF(left_paths)
        right = PathTF(right_paths)
        self.left_path_ds = [ds for ds in left.split_iter()]
        self.right_path_ds = [ds for ds in right.split_iter()]
        
    def __getitem__(self, index):
        left_xs = self.left_path_ds[index]
        left_ys = tf.zeros(len(left_xs), dtype=tf.int64)
        right_xs = self.right_path_ds[index]
        right_ys = tf.ones(len(right_xs), dtype=tf.int64)
        xs = tf.concat((left_xs, right_xs), axis=0)
        ys = tf.concat((left_ys, right_ys), axis=0)
        paths = tf.data.Dataset.from_tensor_slices(xs)
        image_ds = paths.map(load_and_preprocess_image)
        label_ds = tf.data.Dataset.from_tensor_slices(ys)
        image_label_ds = tf.data.Dataset.zip((image_ds, label_ds))
        return image_label_ds
    
    def finish(self, image_label_ds, batch_size):
        # 设置一个和数据集大小一致的 shuffle buffer size（随机缓冲区大小）以保证数据
        # 被充分打乱。
        n = len(image_label_ds)
        ds = image_label_ds.shuffle(buffer_size=n)
        ds = ds.repeat()
        ds = ds.batch(batch_size)
        # 当模型在训练的时候，`prefetch` 使数据集在后台取得 batch。
        ds = ds.prefetch(buffer_size=AUTOTUNE)
        return ds

In [2]:
data_root = 'data/'
batch_size = 32

data_root = Path(data_root)
left_paths = [path.as_posix() for path in data_root.rglob('**/left/**/*png')]
right_paths =[path.as_posix() for path in data_root.rglob('**/right/**/*png')]

In [3]:
loader = Dataset(left_paths, right_paths)

train, val, test = loader[0], loader[1], loader[2]
trainset = loader.finish(train, batch_size)

valset = loader.finish(val, batch_size)
testset = loader.finish(test, batch_size)

In [4]:
for k, batch in enumerate(trainset):
    tf.print(batch[1])
    if k > 7:
        break

[0 1 1 ... 0 1 1]
[1 1 1 ... 1 1 1]
[1 0 1 ... 0 1 0]
[1 0 1 ... 0 1 1]
[0 0 1 ... 0 0 0]
[1 1 1 ... 0 0 1]
[1 0 0 ... 0 0 0]
[1 0 0 ... 0 1 0]
[1 0 1 ... 0 1 1]


In [5]:
ResNet50 = tf.keras.applications.resnet_v2.ResNet50V2(weights='imagenet', input_shape=(224,224,3))
for layer in ResNet50.layers:
    layer.trainable = False
net = tf.keras.models.Sequential()
net.add(ResNet50)
net.add(tf.keras.layers.Flatten())
net.add(tf.keras.layers.Dense(2, activation='softmax'))

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/resnet/resnet50v2_weights_tf_dim_ordering_tf_kernels.h5


In [8]:
net.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
resnet50v2 (Functional)      (None, 1000)              25613800  
_________________________________________________________________
flatten (Flatten)            (None, 1000)              0         
_________________________________________________________________
dense (Dense)                (None, 2)                 2002      
Total params: 25,615,802
Trainable params: 2,002
Non-trainable params: 25,613,800
_________________________________________________________________


In [9]:
loss_object = tf.keras.losses.SparseCategoricalCrossentropy()

optimizer = tf.keras.optimizers.Adam()

In [10]:
train_loss = tf.keras.metrics.Mean(name='train_loss')
train_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='train_accuracy')

test_loss = tf.keras.metrics.Mean(name='test_loss')
test_accuracy = tf.keras.metrics.SparseCategoricalAccuracy(name='test_accuracy')

In [14]:
model = net

@tf.function
def train_step(images, labels):
  with tf.GradientTape() as tape:
    predictions = model(images)
    loss = loss_object(labels, predictions)
  gradients = tape.gradient(loss, model.trainable_variables)
  optimizer.apply_gradients(zip(gradients, model.trainable_variables))

  train_loss(loss)
  train_accuracy(labels, predictions)

In [15]:
@tf.function
def test_step(images, labels):
  predictions = model(images)
  t_loss = loss_object(labels, predictions)

  test_loss(t_loss)
  test_accuracy(labels, predictions)

In [None]:
EPOCHS = 5

for epoch in tf.range(EPOCHS):
  # 在下一个epoch开始时，重置评估指标
  train_loss.reset_states()
  train_accuracy.reset_states()
  test_loss.reset_states()
  test_accuracy.reset_states()

  for images, labels in trainset:
    train_step(images, labels)

  for test_images, test_labels in valset:
    test_step(test_images, test_labels)

  template = 'Epoch {}, Loss: {}, Accuracy: {}, Test Loss: {}, Test Accuracy: {}'
  print (template.format(epoch+1,
                         train_loss.result(),
                         train_accuracy.result()*100,
                         test_loss.result(),
                         test_accuracy.result()*100))