### Mask Detection with Deep Learning

### Loading necessary libraries

In [0]:
from pyspark.sql.functions import col
from petastorm.spark import SparkDatasetConverter, make_spark_converter
import io
import numpy as np
import torch
import torchvision
from PIL import Image
from functools import partial 
from petastorm import TransformSpec
from torchvision import transforms
from hyperopt import fmin, tpe, hp, SparkTrials, STATUS_OK
import horovod.torch as hvd
from sparkdl import HorovodRunner
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
import numpy as np
import matplotlib.pyplot as plt
import PIL
from pyspark.context import *

from time import time


### Initiating SparkSession and setting appName as Mask Detection

In [0]:
spark = SparkSession.builder \
            .appName("mask_detection") \
            .getOrCreate()

### Setting Spark Configuration for Spark and Connection Configuration for AWS and MongoDB

In [0]:
aws_access_key = 'Insert_Access_Key'
aws_secret_key = 'Insert_Secret_Key'
spark._jsc.hadoopConfiguration().set('spark.jars.packages', 'org.apache.hadoop:hadoop-aws:3.3.1')
spark._jsc.hadoopConfiguration().set('spark.jars.packages', 'org.mongodb.spark:mongo-spark-connector_2.12:3.0.1') 
spark._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
spark._jsc.hadoopConfiguration().set("fs.s3a.access.key", aws_access_key)
spark._jsc.hadoopConfiguration().set("fs.s3a.secret.key", aws_secret_key)
spark._jsc.hadoopConfiguration().set("spark.executor.heartbeatInterval","1200s")
spark._jsc.hadoopConfiguration().set("spark.network.timeout","7200s")
spark._jsc.hadoopConfiguration().set("spark.executor.memory","8g")
spark._jsc.hadoopConfiguration().set("spark.driver.memory", "12g")
spark._jsc.hadoopConfiguration().set("spark.executor.instances", "4")
spark._jsc.hadoopConfiguration().set("spark.executor.cores", "3")

database = 'group7_db'
collection = 'test_images'
user_name = 'group7'
password = 'Karotan207!'
address = 'group7cluster.y0pkn.mongodb.net'
connection_string = f"mongodb+srv://{user_name}:{password}@{address}/{database}.{collection}"
connection_string

### Reading Pre-Processed Binaries Mask Dataset from MongoDB

In [0]:
df_mask_read = spark.read.format("mongo").option("uri",connection_string).load()
#df_mask_read.cache()

In [0]:
### Displaying the sample records from the dataframe
df_mask_read.show(2)

In [0]:
# mongo_data.repartition(1).write.parquet("mask_read_training")

In [0]:
# df_mask_read = spark.read.parquet("/mask_read_training")
# df_mask_read.show(5)

### Creating the train and validation dataset

In [0]:
df = df_mask_read.select('content', 'label').withColumnRenamed('label', 'label_index') #.sampleBy("label_index", {0: 0.1, 1: 0.1})
df_train, df_val = df.randomSplit([0.8, 0.2], seed=7)
num_classes = 2
# Make sure the number of partitions is at least the number of workers which is required for distributed training.
df_train = df_train.repartition(2)
df_val = df_val.repartition(2)

### Caching training and validation dataset using petaStorm

In [0]:
# Set a cache directory on DBFS FUSE for intermediate data.
spark.conf.set(SparkDatasetConverter.PARENT_CACHE_DIR_URL_CONF, "file:///dbfs/tmp/petastorm/cache5")

converter_train = make_spark_converter(df_train)
converter_val = make_spark_converter(df_val)

In [0]:
### Checking the length of petaStorm converter train and validation dataset
print(f"train: {len(converter_train)}, val: {len(converter_val)}")

### Creating a generic get_model function for different deep learning models

In [0]:
def get_model(model, lr=0.001):
    # Load a model

    # Freeze parameters in the feature extraction layers
    for param in model.parameters():
        param.requires_grad = False

    # Add a new classifier layer for transfer learning
    if 'classifier' in dir(model):
        num_ftrs = model.classifier[-1].in_features
        model.classifier[-1] = torch.nn.Linear(num_ftrs, 2)
    else:
        num_ftrs = model.fc.in_features
        model.fc = torch.nn.Linear(num_ftrs, 2)
    #num_ftrs = model.classifier[1].in_features
    # Parameters of newly constructed modules have requires_grad=True by default

    return model

### Defining function for training one epoch

In [0]:
def train_one_epoch(model, criterion, optimizer, scheduler, train_dataloader_iter, steps_per_epoch, epoch, device):
    
    model.train()  # Set model to training mode

    # statistics
    running_loss = 0.0
    running_corrects = 0

    # Iterate over the data for one epoch.
    for step in range(steps_per_epoch):
        pd_batch = next(train_dataloader_iter)
        inputs, labels = pd_batch['features'].to(device), pd_batch['label_index'].to(device)

        # Track history in training
        with torch.set_grad_enabled(True):
            # zero the parameter gradients
            optimizer.zero_grad()

            # forward
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            outputs = outputs.squeeze(1)
            labels = labels.type(torch.LongTensor)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

        # statistics
        running_loss += loss.item() * inputs.size(0)
        running_corrects += torch.sum(preds == labels.data)
  
    scheduler.step()

    epoch_loss = running_loss / (steps_per_epoch * BATCH_SIZE)
    epoch_acc = running_corrects.double() / (steps_per_epoch * BATCH_SIZE)

    print('Train Loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))
    return epoch_loss, epoch_acc

def evaluate(model, criterion, val_dataloader_iter, validation_steps, device, metric_agg_fn=None):
    model.eval()  # Set model to evaluate mode

    # statistics
    running_loss = 0.0
    running_corrects = 0

    # Iterate over all the validation data.
    for step in range(validation_steps):
        pd_batch = next(val_dataloader_iter)
        inputs, labels = pd_batch['features'].to(device), pd_batch['label_index'].to(device)

        # Do not track history in evaluation to save memory
        with torch.set_grad_enabled(False):
            # forward
            outputs = model(inputs)
            _, preds = torch.max(outputs, 1)
            outputs = outputs.squeeze(1)
            labels = labels.type(torch.LongTensor)
            loss = criterion(outputs, labels)

        # statistics
        running_loss += loss.item()
        running_corrects += torch.sum(preds == labels.data)

    # The losses are averaged across observations for each minibatch.
    epoch_loss = running_loss / validation_steps
    epoch_acc = running_corrects.double() / (validation_steps * BATCH_SIZE)
  
    # metric_agg_fn is used in the distributed training to aggregate the metrics on all workers
    if metric_agg_fn is not None:
        epoch_loss = metric_agg_fn(epoch_loss, 'avg_loss')
        epoch_acc = metric_agg_fn(epoch_acc, 'avg_acc')

    print('Validation Loss: {:.4f} Acc: {:.4f}'.format(epoch_loss, epoch_acc))
    return epoch_loss, epoch_acc

### Applying Transformation to input train and validation images

In [0]:
def transform_row(is_train, pd_batch):
  """
  The input and output of this function must be pandas dataframes.
  Do data augmentation for the training dataset only.
  """
  transformers = [transforms.Lambda(lambda x: Image.open(io.BytesIO(x)).convert('RGB'))]
  #transformers = [transforms.Lambda(lambda x: Image.fromarray(x))]
  if is_train:
    transformers.extend([
      transforms.RandomResizedCrop(224),
      transforms.RandomHorizontalFlip(),
    ])
  else:
    transformers.extend([
      transforms.Resize(256),
      transforms.CenterCrop(224),
    ])
  transformers.extend([
    transforms.ToTensor(),
    transforms.Normalize([0.485, 0.456, 0.406], [0.229, 0.224, 0.225]),
  ])
  
  trans = transforms.Compose(transformers)
  #print (pd_batch)
  
  pd_batch['features'] = pd_batch['content'].map(lambda x: trans(x).numpy())
  #print (pd_batch)
  pd_batch = pd_batch.drop(labels=['content'], axis=1)
  return pd_batch

def get_transform_spec(is_train=True):
  # Note that the output shape of the `TransformSpec` is not automatically known by petastorm, 
  # so we need to specify the shape for new columns in `edit_fields` and specify the order of 
  # the output columns in `selected_fields`.
  return TransformSpec(partial(transform_row, is_train), 
                       edit_fields=[('features', np.float32, (3, 224, 224), False)], 
                       selected_fields=['features', 'label_index'])

### Global Variables for BATCH_SIZE and NUM_EPOCHS

In [0]:
BATCH_SIZE = 256
NUM_EPOCHS = 10

### Creating Train and Evaluate Function

In [0]:
def train_and_evaluate(model, lr=1e-6):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    device = 'cpu'
    model = get_model(model, lr=lr)
    model = model.to(device)

    criterion = torch.nn.functional.cross_entropy #torch.nn.functional.binary_cross_entropy_with_logits#torch.nn.CrossEntropyLoss()

    # Only parameters of final layer are being optimized.
    optimizer = torch.optim.AdamW(model.classifier[-1].parameters(), lr=lr)

    # Decay LR by a factor of 0.1 every 7 epochs
    exp_lr_scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=7, gamma=0.1)

    with converter_train.make_torch_dataloader(transform_spec=get_transform_spec(is_train=True), batch_size=BATCH_SIZE) as train_dataloader,converter_val.make_torch_dataloader(transform_spec=get_transform_spec(is_train=False), batch_size=BATCH_SIZE) as val_dataloader:

        train_dataloader_iter = iter(train_dataloader)
        steps_per_epoch = len(converter_train) // BATCH_SIZE

        val_dataloader_iter = iter(val_dataloader)
        #print (len(converter_val), BATCH_SIZE, len(converter_val))
        validation_steps = (len(converter_val) // BATCH_SIZE)
    
        for epoch in range(NUM_EPOCHS):
            print('Epoch {}/{}'.format(epoch + 1, NUM_EPOCHS))
            print('-' * 10)

            train_loss, train_acc = train_one_epoch(model, criterion, optimizer, exp_lr_scheduler, train_dataloader_iter, steps_per_epoch, epoch, device)
            val_loss, val_acc = evaluate(model, criterion, val_dataloader_iter, validation_steps, device)

    return val_loss


### Exploring different Classification Architecture
1. MobileNetV2
2. MobileNetV3_small
3. MnasNet
4. EfficientNet

### Model 1: MobileNetV2 
##### Abdus Khan

In [0]:
tick = time()
model = torchvision.models.mobilenet_v2(pretrained=True)
loss = train_and_evaluate(model, lr=1e-6)
print(f'Time taken: {time() - tick} seconds')

In [0]:
# def train_fn(lr):
#     loss = train_and_evaluate(lr)
#     return {'loss': loss, 'status': STATUS_OK}

# search_space = hp.loguniform('lr', -10, -4)

# argmin = fmin(
#   fn=train_fn,\
#   space=search_space,\
#   algo=tpe.suggest,\
#   max_evals=2,\
#   trials=SparkTrials(parallelism=2))

### Model 2: MobileNetV3_small 
##### Ankush Gupta

In [0]:
BATCH_SIZE = 256
NUM_EPOCHS = 5
tick = time()
model = torchvision.models.mobilenet_v3_small(pretrained=True)
loss = train_and_evaluate(model, lr=1e-6)
print(f'Time taken: {time() - tick} seconds')

### Model 3: MnasNet1_0 
##### Ronica Gupta

In [0]:
BATCH_SIZE = 256
NUM_EPOCHS = 5
tick = time()
model = torchvision.models.mnasnet1_0(pretrained=True)
loss = train_and_evaluate(model, lr=1e-6)
print(f'Time taken: {time() - tick} seconds')

### Model 4: EfficientNet_b0 
###### Tanjin Sharma

In [0]:
BATCH_SIZE = 256
NUM_EPOCHS = 5
tick = time()
model = torchvision.models.efficientnet_b0(pretrained=True)
loss = train_and_evaluate(model, lr=1e-6)
print(f'Time taken: {time() - tick} seconds')

##### While Training large models, we were facing memory issues, that's why we chose models which have comparatively lesser parameters

### References
1. <a href= "https://arxiv.org/pdf/1801.04381.pdf"> MobileNetV2 </a>
2. <a href= "https://arxiv.org/pdf/1905.02244.pdf"> MobileNetV3 </a>
3. <a href= "https://arxiv.org/pdf/1807.11626.pdf"> MnasNet </a>
4. <a href= "https://github.com/uber/petastorm"> PetaStorm </a>
5. <a href= "https://spark.apache.org"> Apache Spark </a>
6. <a href= "https://www.mongodb.com/"> MongoDB </a>
7. <a href= "https://aws.amazon.com"> Amazon AWS </a>