In [1]:
import os
import pathlib
from glob import glob

import pandas as pd
import numpy as np
from matplotlib import pyplot as plt
from PIL import Image
import pickle

import tensorflow as tf
import tensorflow_datasets as tfdata
import tensorflow_docs as tfdocs
import tensorflow_docs.plots
from tensorflow.python.saved_model import signature_constants

from tensorflow.keras import Model
from tensorflow.keras.models import *
from tensorflow.keras.layers import *
from tensorflow.keras.optimizers import RMSprop
from tensorflow.keras.preprocessing.image import *
from tensorflow.keras.losses import SparseCategoricalCrossentropy

import cv2


import mlflow
from mlflow.tracking import MlflowClient
from mlflow.exceptions import MlflowException
from mlflow.models.signature import ModelSignature
from mlflow.types.schema import Schema, TensorSpec

from prefect import flow, task
from prefect.task_runners import SequentialTaskRunner

print(f'Pandas:{pd.__version__}, Numpy:{np.__version__}, Tensorflow:{tf.__version__}')

os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
#print('Device:', tf.config.list_physical_devices('GPU'))
#print("----Num GPUs Available: ", len(tf.config.list_physical_devices('GPU')))

2023-04-01 09:29:24.106313: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2023-04-01 09:29:24.862484: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer.so.7'; dlerror: libnvinfer.so.7: cannot open shared object file: No such file or directory
2023-04-01 09:29:24.862757: W tensorflow/compiler/xla/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libnvinfer_plugin.so.7'; dlerror: libnvinfer_plugin.so.7: cannot open shared object file: No such file or directory
  from .autonotebook import tqdm as notebook_tqdm


Pandas:1.3.4, Numpy:1.21.4, Tensorflow:2.11.0


In [2]:
def mlflow_setup():
    MLFLOW_TRACKING_URI ="sqlite:////home/pmspraju/tracking-server/mlflow.db" 
    mlflow.set_tracking_uri(MLFLOW_TRACKING_URI)
    client = MlflowClient(tracking_uri=MLFLOW_TRACKING_URI)

    return client

In [3]:
@task
def create_mlflow_experiment(experiment_name):
    try:
        experiment_id = mlflow.create_experiment(
            experiment_name,
            #artifact_location=Path.cwd().joinpath("mlruns").as_uri(),
            artifact_location='//home/pmspraju/tracking-server/mlruns/',
            tags={"version": "v1", "priority": "P1"},
        )
    except Exception as MlflowException:
        print(f"Experiment exists")
        experiment= mlflow.set_experiment(experiment_name)
        # Examine the experiment details.
        print("Experiment_id: {}".format(experiment.experiment_id))
        print("Name: {}".format(experiment.name))
        print("Artifact Location: {}".format(experiment.artifact_location))
        print("Tags: {}".format(experiment.tags))
        print("Lifecycle_stage: {}".format(experiment.lifecycle_stage))
        print("Last Updated timestamp: {}".format(experiment.last_update_time))

In [4]:
#Normalize the images in the dataset. We must also normalize the masks so that the classes are numbered from 0 through 2, 
# instead of from 1 through 3
def normalize(input_image, input_mask):
    input_image = tf.cast(input_image, tf.float32) / 255.0
    input_mask -= 1
    return input_image, input_mask

In [5]:
# Load an image, given an element from a TensorFlow dataset data structure. Note that we resize both the image and the mask to 256x256. 
# Also, if the train flag is set to True, we perform augmentation by randomly mirroring the image and its mask. 
# Finally, we normalize the inputs:
@tf.function
def load_image(dataset_element, train=True):

    input_image = tf.image.resize(dataset_element['image'], (256, 256))
    input_mask = tf.image.resize(dataset_element['segmentation_mask'],(256, 256))

    if train and np.random.uniform() > 0.5:
        input_image = tf.image.flip_left_right(input_image)
        input_mask = tf.image.flip_left_right(input_mask)

    input_image, input_mask = normalize(input_image, input_mask)

    return input_image, input_mask

In [23]:
def log_mlflow_image(name, remove_file=False):
    work_dir = pathlib.Path('/home/pmspraju/MLOps/imageResolution')
    img_path = os.path.join(work_dir, name)
    im = Image.open(img_path)
    mlflow.log_image(im, name)
    
    if remove_file:
        os.remove(img_path)

In [26]:
# class, UNet(), that will contain all the logic necessary to build, train, and evaluate our U-Net.
class UNet(object):

    # output_channels is, by default, 3, because each pixel can be categorized into one of three classes.
    def __init__(self, input_size=(256, 256, 3), output_channels=3):
        self.input_size = input_size
        self.output_channels = output_channels
        self.model = self._create_model()
        loss = SparseCategoricalCrossentropy(from_logits=True)
        self.model.compile(optimizer=RMSprop(), loss=loss, metrics=['accuracy'])

    
    # This is a convolution that can be (optionally) batch normalized and that's activated with LeakyReLU:
    @staticmethod
    def _downsample(filters, size, batch_norm=True):

        initializer = tf.random_normal_initializer(0.0, 0.02)
        layers = Sequential()

        layers.add(Conv2D(filters=filters,
                          kernel_size=size,
                          strides=2,
                          padding='same',
                          kernel_initializer=initializer,
                          use_bias=False))
        
        if batch_norm:
            layers.add(BatchNormalization())

        layers.add(LeakyReLU())

        return layers
    
    # the _upsample() helper method expands its input through a transposed convolution, which is also batch normalized 
    # and ReLU activated (optionally, we can add a dropout layer to prevent overfitting):
    def _upsample(self, filters, size, drop_out=False):

        init = tf.random_normal_initializer(0.0, 0.02)

        layers = Sequential()

        layers.add(Conv2DTranspose(filters=filters,
                                    kernel_size=size,
                                    strides=2,
                                    padding='same',
                                    kernel_initializer=init,
                                    use_bias=False))
        
        layers.add(BatchNormalization())

        if drop_out:
            layers.add(Dropout(rate=0.5))

        layers.add(ReLU())

        return layers
    
    # The encoding part of the network is just a stack of downsampling blocks
    # the decoding portion is, as expected, comprised of a series of upsampling blocks
    def _create_model(self):

        down_stack = [self._downsample(64, 4, batch_norm=False)]

        for filters in (128, 256, 512, 512, 512, 512, 512):
            down_block = self._downsample(filters, 4)
            down_stack.append(down_block)

        up_stack = []
        for _ in range(3):
            up_block = self._upsample(512, 4, drop_out=True)
            up_stack.append(up_block)

        for filters in (512, 256, 128, 64):
            up_block = self._upsample(filters, 4)
            up_stack.append(up_block)

        inputs = Input(shape=self.input_size)

        x = inputs

        skip_layers = []

        for down in down_stack:
            x = down(x)
            skip_layers.append(x)

        skip_layers = reversed(skip_layers[:-1])

        for up, skip_connection in zip(up_stack, skip_layers):
            x = up(x)
            x = Concatenate()([x, skip_connection])

        init = tf.random_normal_initializer(0.0, 0.02)
        output = Conv2DTranspose(   filters=self.output_channels,
                                    kernel_size=3,
                                    strides=2,
                                    padding='same',
                                    kernel_initializer=init)(x)
        
        return Model(inputs, outputs=output)
    
    @staticmethod
    def _plot_model_history(model_history, metric, ylim=None):
        
        plt.style.use('seaborn-darkgrid')
        plotter = tfdocs.plots.HistoryPlotter()
        plotter.plot({'Model': model_history}, metric=metric)
        plt.title(f'{metric.upper()}')

        if ylim is None:
            plt.ylim([0, 1])
        else:
            plt.ylim(ylim)

        plt.savefig(f'{metric}.png')
        plt.close()

         # Log the image in mlflow
        log_mlflow_image(f'{metric}.png', True)
        

    def train(self, train_dataset, epochs, steps_per_epoch, validation_dataset, validation_steps):
        hist = self.model.fit(train_dataset,
                                epochs=epochs,
                                steps_per_epoch=steps_per_epoch,
                                validation_steps=validation_steps,
                                validation_data=validation_dataset)
        
        # write model summary
        path = pathlib.Path('/mnt/c/Users/pmspr/Documents/Machine Learning/Courses/Tensorflow Cert/Data/dogscats')
        summary = []
        self.model.summary(print_fn=summary.append)
        summary = "\n".join(summary)
        summary_path = os.path.join(path, "model_summary.txt")
        with open(summary_path, "w") as f:
            f.write(summary)
        mlflow.log_artifact(summary_path)
        os.remove(summary_path)

        # write model as json file
        model_json_path = os.path.join(path, "model.json")
        with open(model_json_path, "w") as f:
            f.write(self.model.to_json())
        mlflow.log_artifact(model_json_path)
        os.remove(model_json_path)
        
        # log model in mlflow
        input_schema = Schema([
                            TensorSpec(np.dtype(np.uint8), (-1, 256, 256, 3)),
                            ])
        output_schema = Schema([TensorSpec(np.dtype(np.uint8), (-1, 256, 256, 3))])
        signature = ModelSignature(inputs=input_schema, outputs=output_schema)


        mlflow.tensorflow.log_model(model=self.model,signature=signature,
                                   artifact_path="tf-models")
        
        
        self._plot_model_history(hist, 'loss', [0., 2.0])
        self._plot_model_history(hist, 'accuracy')
        
    @staticmethod
    def _process_mask(mask):
        mask = (mask.numpy() * 127.5).astype('uint8')
        mask = cv2.cvtColor(mask, cv2.COLOR_GRAY2RGB)

        return mask
    
    def _save_image_and_masks(self, image, ground_truth_mask, prediction_mask, image_id):

        image = (image.numpy() * 255.0).astype('uint8')
        gt_mask = self._process_mask(ground_truth_mask)
        pred_mask = self._process_mask(prediction_mask)

        mosaic = np.hstack([image, gt_mask, pred_mask])
        mosaic = cv2.cvtColor(mosaic, cv2.COLOR_RGB2BGR)

        cv2.imwrite(f'mosaic_{image_id}.jpg', mosaic)
        log_mlflow_image(f'mosaic_{image_id}.jpg', True)

    @staticmethod
    def _create_mask(prediction_mask):
        prediction_mask = tf.argmax(prediction_mask, axis=-1)
        prediction_mask = prediction_mask[...,tf.newaxis]
        
        return prediction_mask[0]

    def _save_predictions(self, dataset, sample_size=1):

        for id, (image, mask) in enumerate(dataset.take(sample_size), start=1):
            pred_mask = self.model.predict(image)
            pred_mask = self._create_mask(pred_mask)

            image = image[0]
            ground_truth_mask = mask[0]
            self._save_image_and_masks(image, ground_truth_mask, pred_mask, image_id=id)

    def evaluate(self, test_dataset, sample_size=5):

        result = self.model.evaluate(test_dataset)
        #print(f'Accuracy: {result[1] * 100:.2f}%')
        mlflow.log_metric("Accuracy", result[1] * 100)

        self._save_predictions(test_dataset, sample_size)

In [28]:
def load_data(AUTOTUNE, BUFFER_SIZE, BATCH_SIZE):

    dataset, info = tfdata.load('oxford_iiit_pet', with_info=True)
    #print(info)

    TRAIN_SIZE = info.splits['train[:80%]'].num_examples
    VALIDATION_SIZE = info.splits['test[:80%]'].num_examples

    train_dataset = (dataset['train'].take(TRAIN_SIZE) #dataset['train']
                        .map(load_image, num_parallel_calls=AUTOTUNE)
                        .cache()
                        .shuffle(BUFFER_SIZE)
                        .batch(BATCH_SIZE)
                        .repeat()
                        .prefetch(buffer_size=AUTOTUNE))
    
    test_dataset = (dataset['test'].take(VALIDATION_SIZE) #dataset['test']
                    .map(lambda d: load_image(d, train=False), num_parallel_calls=AUTOTUNE)
                    .batch(BATCH_SIZE))
    
    return train_dataset, test_dataset, TRAIN_SIZE, VALIDATION_SIZE

In [8]:
def train_model(train_dataset, test_dataset, EPOCHS, STEPS_PER_EPOCH, VALIDATION_STEPS):

    unet = UNet()
    unet.train(train_dataset,
                epochs=EPOCHS,
                steps_per_epoch=STEPS_PER_EPOCH,
                validation_steps=VALIDATION_STEPS,
                validation_dataset=test_dataset)
    unet.evaluate(test_dataset)

In [30]:
@flow(task_runner=SequentialTaskRunner())
def main():
    
    client = mlflow_setup()
    experiment_name = 'IMAGE-SEGMENTATION'
    create_mlflow_experiment(experiment_name)

    BATCH_SIZE = 32
    BUFFER_SIZE = 1000
    AUTOTUNE = tf.data.experimental.AUTOTUNE

    train_dataset, test_dataset, TRAIN_SIZE, VALIDATION_SIZE = load_data(AUTOTUNE, BUFFER_SIZE, BATCH_SIZE)

    EPOCHS = 50
    STEPS_PER_EPOCH = TRAIN_SIZE // BATCH_SIZE
    VALIDATION_SUBSPLITS = 5
    VALIDATION_STEPS = VALIDATION_SIZE // BATCH_SIZE
    VALIDATION_STEPS //= VALIDATION_SUBSPLITS
    
    train_dataset, test_dataset, TRAIN_SIZE, VALIDATION_SIZE = load_data(AUTOTUNE, BUFFER_SIZE, BATCH_SIZE)

    with mlflow.start_run() as run:
        
        print("MLflow:")
        print("  run_id:",run.info.run_id)
        print("  experiment_id:",run.info.experiment_id)

        mlflow.set_tag("version.mlflow", mlflow.__version__)
        mlflow.set_tag("version.tensorflow", tf.__version__)

        mlflow.log_param("epochs", EPOCHS)
        mlflow.log_param("batch_size", BATCH_SIZE)
        mlflow.log_param("Train_size", TRAIN_SIZE)
        mlflow.log_param("Test_size", VALIDATION_SIZE)

        train_model(train_dataset, test_dataset, EPOCHS, STEPS_PER_EPOCH, VALIDATION_STEPS)

In [31]:
if __name__ == "__main__":
    main()

13:38:18.389 | INFO    | prefect.engine - Created flow run 'magenta-python' for flow 'main'
13:38:18.390 | INFO    | Flow run 'magenta-python' - Using task runner 'SequentialTaskRunner'
13:38:18.492 | INFO    | Flow run 'magenta-python' - Created task run 'create_mlflow_experiment-863ae521-7' for task 'create_mlflow_experiment'
13:38:18.578 | INFO    | Task run 'create_mlflow_experiment-863ae521-7' - Finished in state Completed()


Experiment exists
Experiment_id: 4
Name: IMAGE-SEGMENTATION
Artifact Location: //home/pmspraju/tracking-server/mlruns/
Tags: {'version': 'v1', 'priority': 'P1'}
Lifecycle_stage: active
Last Updated timestamp: 1679840590072
MLflow:
  run_id: c4b23c0aaa50484d94ebcf11dde3a37e
  experiment_id: 4
Epoch 1/50


Corrupt JPEG data: 240 extraneous bytes before marker 0xd9
Corrupt JPEG data: premature end of data segment


Epoch 2/50
Epoch 3/50
Epoch 4/50
Epoch 5/50
Epoch 6/50
Epoch 7/50
Epoch 8/50
Epoch 9/50
Epoch 10/50
Epoch 11/50
Epoch 12/50
Epoch 13/50
Epoch 14/50
Epoch 15/50
Epoch 16/50
Epoch 17/50
Epoch 18/50
Epoch 19/50
Epoch 20/50
Epoch 21/50
Epoch 22/50
Epoch 23/50
Epoch 24/50
Epoch 25/50
Epoch 26/50
Epoch 27/50
Epoch 28/50
Epoch 29/50
Epoch 30/50
Epoch 31/50
Epoch 32/50
Epoch 33/50
Epoch 34/50
Epoch 35/50
Epoch 36/50
Epoch 37/50
Epoch 38/50
Epoch 39/50
Epoch 40/50
Epoch 41/50
Epoch 42/50
Epoch 43/50
Epoch 44/50
Epoch 45/50
Epoch 46/50
Epoch 47/50
Epoch 48/50
Epoch 49/50
Epoch 50/50




INFO:tensorflow:Assets written to: /tmp/tmpbsoofv8w/model/data/model/assets


03:04:14.570 | INFO    | tensorflow - Assets written to: /tmp/tmpbsoofv8w/model/data/model/assets
  plt.style.use('seaborn-darkgrid')
  plt.style.use('seaborn-darkgrid')




03:09:43.220 | INFO    | Flow run 'magenta-python' - Finished in state Completed('All states completed.')
