In [9]:
# !pip install nltk
# !pip install --upgrade Pillow
# !pip3 install torch torchvision torchaudio
# !pip install -U "ray[default]"
# !pip install -U "ray[tune]"
# !pip install boto3
# !pip install ipywidgets

In [10]:
import os
import shutil
import nltk
import boto3
import pandas as pd
from pathlib import Path
from tqdm import tqdm
from coco_proc.vocabulary import Vocabulary
from functools import partial
import numpy as np
import os
from ray import tune
from ray.tune import CLIReporter
from ray.tune.schedulers import ASHAScheduler
nltk.download("punkt")

## Defining paths

In [16]:
ROOT_PATH = "/home/ubuntu/data/"

train_metadata_url = "s3://cerebro-coco-dataset-ca/annotations/captions_val2017.json"
train_multimedia_url = "s3://cerebro-coco-dataset-ca/coco/val2017"
train_annotations_path = ROOT_PATH + "annotations/captions_val2017.json"
train_metadata_download_path = ROOT_PATH + "train_metadata.csv"
train_download_path = ROOT_PATH + "train/downloaded"
train_output_path = ROOT_PATH + "train/post_etl"

valid_metadata_url = "s3://cerebro-coco-dataset-ca/annotations/captions_val2017.json"
valid_multimedia_url = "s3://cerebro-coco-dataset-ca/coco/val2017"
valid_annotations_path = ROOT_PATH + "annotations/captions_val2017.json"
valid_metadata_download_path = ROOT_PATH + "valid_metadata.csv"
valid_download_path = ROOT_PATH + "valid/downloaded"
valid_output_path = ROOT_PATH + "valid/post_etl"

test_metadata_url = "s3://cerebro-coco-dataset-ca/annotations/captions_val2017.json"
test_multimedia_url = "s3://cerebro-coco-dataset-ca/coco/val2017"
test_annotations_path = ROOT_PATH + "annotations/captions_val2017.json"
test_metadata_download_path = ROOT_PATH + "test_metadata.csv"
test_download_path = ROOT_PATH + "test/downloaded"
test_output_path = ROOT_PATH + "test/post_etl"

checkpoints_dir = ROOT_PATH + "model_checkpoints"

In [17]:
ACCESS_ID = "AKIA3MKZP7D2JR67FQFD"
ACCESS_KEY = "QFOsggpi44VYZzudZP1IXZgw4nKMPW9ZmGOcNXNH"

In [18]:
Path(ROOT_PATH).mkdir(parents=True, exist_ok=True)
Path(train_download_path).mkdir(parents=True, exist_ok=True)
Path(valid_download_path).mkdir(parents=True, exist_ok=True)
Path(test_download_path).mkdir(parents=True, exist_ok=True)

Path(train_output_path).mkdir(parents=True, exist_ok=True)
Path(valid_output_path).mkdir(parents=True, exist_ok=True)
Path(test_output_path).mkdir(parents=True, exist_ok=True)
Path(checkpoints_dir).mkdir(parents=True, exist_ok=True)

### Data Pre Processing Helper Functions

In [20]:
def get_metadata(annotations_path, mode="train"):
    import json
    import pandas as pd
    
    largest_caption_len = -1
    data_json = Nonen
        'id': [],
        'file_name': [],
        'height': [],
        'width': [],
        'captions': [],
        'date_captured': []
    }

    dataset_modified = {
        'id': [],
        'file_name': [],
        'height': [],
        'width': [],
        'captions': [],
        'date_captured': []
    }

    annotations = {}
    annotations_list = data_json['annotations']
    for i in annotations_list:
        if not i["image_id"] in annotations:
            annotations[i["image_id"]] = []
        annotations[i["image_id"]].append(i["caption"])

    for i in range(len(data_json['images'])):
        if mode == "train":
            for caption in annotations[data_json["images"][i]['id']]:
                tokens = nltk.tokenize.word_tokenize(str(caption).lower())
                if len(tokens) > largest_caption_len:
                    largest_caption_len = len(tokens)
                dataset['id'].append(data_json["images"][i]['id'])
                dataset['file_name'].append(data_json["images"][i]['file_name'])
                dataset['height'].append(data_json["images"][i]['height'])
                dataset['width'].append(data_json["images"][i]['width'])
                dataset['captions'].append(caption)
                dataset['date_captured'].append(data_json["images"][i]['date_captured'])
        else:
            caption = annotations[data_json["images"][i]['id']][0]
            tokens = nltk.tokenize.word_tokenize(str(caption).lower())
            if len(tokens) > largest_caption_len:
                largest_caption_len = len(tokens)
            dataset['id'].append(data_json["images"][i]['id'])
            dataset['file_name'].append(data_json["images"][i]['file_name'])
            dataset['height'].append(data_json["images"][i]['height'])
            dataset['width'].append(data_json["images"][i]['width'])
            dataset['captions'].append(caption)
            dataset['date_captured'].append(data_json["images"][i]['date_captured'])

    pd_df = pd.DataFrame(dataset)
    return pd_df, largest_caption_len

In [21]:
def row_preprocessing_routine(mode, row, to_root_path, kwargs):
    """
    Convert a given dataset row to tensor format (suitable for training)
    Data processing is a data parallel map operation. So, the same row_preprocessing_routine() will
    be called on every row of the dataset.

    Parameters
    ----------
    mode : str
        train/valid/test
    row : pandas.core.series.Series
        metadata pandas dataframe row
    to_root_path : str
        Path where all multimedia files will be stored on a node
    kwargs : dict
        A dictionary containing any extra arguments required specific to a ML task

    Returns
    -------
    id : str
         uniquely identifying this row
    input_tensor : torch.Tensor
        All the input features should be combined to form a single input tensor
    output_tensor : torch.Tensor
        All the output features should be combined to form a single output tensor

    """
    import nltk
    from PIL import Image
    from torchvision import transforms
    import torch
        
    max_caption_len = kwargs["max_caption_len"]
    vocab = kwargs["vocab"]
    
    if mode == "train":
        # Convert image to tensor and pre-process using transform
        img_transform = transforms.Compose([ 
            transforms.Resize(256),                          # smaller edge of image resized to 256
            transforms.RandomCrop(224),                      # get 224x224 crop from random location
            transforms.RandomHorizontalFlip(),               # horizontally flip image with probability=0.5
            transforms.ToTensor(),                           # convert the PIL Image to a tensor
            transforms.Normalize((0.485, 0.456, 0.406),      # normalize image for pre-trained model
                                 (0.229, 0.224, 0.225))])
    else:
        img_transform = transforms.Compose([ 
            transforms.Resize(256),                          # smaller edge of image resized to 256
            transforms.CenterCrop(224),                      # get 224x224 crop from the center
            transforms.ToTensor(),                           # convert the PIL Image to a tensor
            transforms.Normalize((0.485, 0.456, 0.406),      # normalize image for pre-trained model
                                 (0.229, 0.224, 0.225))])
    
    # reading input features and converting to tensor
    input_image_path = to_root_path + "/" + str(row["file_name"])
    image = Image.open(input_image_path).convert("RGB")
    image_tensor = img_transform(image)
    
    # reading output features and converting to tensor
    output_caption = row["captions"]
    tokens = nltk.tokenize.word_tokenize(str(output_caption).lower())
    caption = []
    caption.append(vocab(vocab.start_word))
    caption.extend([vocab(token) for token in tokens])
    caption.append(vocab(vocab.end_word))
    
    # padding
    nremaining = max_caption_len - len(tokens)
    if nremaining > 0:
        for i in range(nremaining):
            caption.append(vocab(vocab.end_word))
    
    caption_tensor = torch.Tensor(caption).long()
    
    # unique identifier for the row
    image_id = row["id"]

    return image_id, image_tensor, caption_tensor

In [22]:
def download_metadata():
    remote_urls = [
        train_metadata_url,
        valid_metadata_url,
        test_metadata_url
    ]
    
    for remote_url in remote_urls:
#         shutil.copy(remote_url, ROOT_PATH)
        s3 = boto3.client('s3', aws_access_key_id=ACCESS_ID, aws_secret_access_key= ACCESS_KEY)
        bucket_name = remote_url.split("/")[2]
        from_path = "/".join(remote_url.split("/")[3:])
        to_path = os.path.join(ROOT_PATH, from_path)

        # create the to_path directory if it doesn't exist
        to_path_dir = "/".join(to_path.split("/")[:-1])
        Path(to_path_dir).mkdir(parents=True, exist_ok=True)

        s3.download_file(bucket_name, from_path, to_path)

        print("Downloaded metadata")
#         etl_logger.info("Downloaded {} metadata".format(mode if mode else "all"))

In [23]:
def load_data(metadata_file_path,fraction=1):
        pandas_df = pd.read_csv(metadata_file_path)
        metadata_df = pandas_df.sample(frac=fraction)
        shuffled_df = metadata_df.sample(frac=1)
        print("Loaded {} metadata to Pandas".format(metadata_file_path))
        return shuffled_df

In [24]:
def download_file(mode, filepath, s3, download_type, multimedia_url, multimedia_download_path):
    if download_type == "url":
        bucket_name = multimedia_url.split("/")[2]
        from_path_prefix = "/".join(multimedia_url.split("/")[3:])
        from_path = os.path.join(from_path_prefix, filepath)

        to_path = os.path.join(multimedia_download_path, filepath)
        to_path_dir ="/".join(to_path.split("/")[:-1])

        try:
            Path(to_path_dir).mkdir(parents=True, exist_ok=True)
            s3.download_file(bucket_name, from_path, to_path)
        except Exception as e:
            print("Error in copying file from {} to {}".format(from_path, to_path))
            print(str(e))
            print(str(traceback.format_exc()))

    else:
        shutil.copy(multimedia_url + "/" + filepath, multimedia_download_path)

In [25]:
def preprocess_data(df, mode, feature_names, is_feature_download, download_type, multimedia_url, multimedia_download_path, output_path, **kwargs):
    res_partition = []
    for _, row in tqdm(df.iterrows()):
        for i in range(len(feature_names)):
            feature_name = feature_names[i]
            if is_feature_download[i]:
                # io_a = time.time()
                if download_type == "url":
                    s3 = boto3.client('s3', aws_access_key_id=ACCESS_ID, aws_secret_access_key= ACCESS_KEY)
                else:
                    s3 = None
                download_file(mode, row[feature_name], s3, download_type, multimedia_url, multimedia_download_path)
                # io_b = time.time()
                # io_time += io_b - io_a

        # comp_a = time.time()
        to_path = multimedia_download_path
        row_id, input_tensor, output_tensor = row_preprocessing_routine(mode, row, to_path, kwargs)
        res_partition.append([row_id, input_tensor, output_tensor])
    
    result = pd.DataFrame(list(res_partition), columns=["id", "input_tensor", "output_tensor"])
    df_size_mb =  result.memory_usage(index=True, deep=True).sum() / (1024.0 * 1024.0)
    print("{} -- Size of data saved : {} MB".format(str(mode), str(df_size_mb)))
    print("{} -- Number of rows in the Worker dataframe: {}".format(str(mode),str(len(result.index))))
    
    Path(output_path).mkdir(parents=True, exist_ok=True)
    path = os.path.join(output_path, str(mode) + "_" + "data" + ".pkl")
    result.to_pickle(path)
    del result

In [None]:
fraction=0.3
is_feature_download = [False, True, False, False, False, False]
feature_names = ["id", "file_name", "height", "width", "captions", "date_captured"]
vocab_threshold = 5
train_vocab = Vocabulary(vocab_threshold, annotations_file=train_annotations_path, vocab_from_file=False)

In [None]:
download_metadata()

train_meta_df, largest_caption_len_train = get_metadata(train_annotations_path, mode="train")
val_meta_df, largest_caption_len_val = get_metadata(valid_annotations_path, mode="val")
test_meta_df, largest_caption_len_test = get_metadata(test_annotations_path, mode="test")

train_df_path = ROOT_PATH + "train_metadata.csv"
valid_df_path = ROOT_PATH + "valid_metadata.csv"
test_df_path = ROOT_PATH + "test_metadata.csv"

train_meta_df.to_csv(train_df_path, index=False)
val_meta_df.to_csv(valid_df_path, index=False)
test_meta_df.to_csv(test_df_path, index=False)


train_df = load_data(train_df_path, fraction)
valid_df = load_data(valid_df_path, fraction)
test_df = load_data(test_df_path, fraction)

In [30]:
preprocess_data(train_df, "train", feature_names, is_feature_download, "url", train_multimedia_url, train_download_path, train_output_path, max_caption_len=largest_caption_len_train, vocab=train_vocab)
preprocess_data(valid_df, "valid", feature_names, is_feature_download, "url", valid_multimedia_url, valid_download_path, valid_output_path, max_caption_len=largest_caption_len_val, vocab=train_vocab)
preprocess_data(test_df, "test", feature_names, is_feature_download, "url", test_multimedia_url, test_download_path, test_output_path, max_caption_len=largest_caption_len_test, vocab=train_vocab)

Loaded /home/ubuntu/data/train_metadata.csv metadata to Pandas
Loaded /home/ubuntu/data/valid_metadata.csv metadata to Pandas
Loaded /home/ubuntu/data/test_metadata.csv metadata to Pandas


## Defining Pytorch Dataset

In [89]:
import os
import gc
import glob
import pandas as pd
from torch.utils.data import IterableDataset

class GeneralPytorchDataset(IterableDataset):
    def __init__(self, mode, pickled_data_path):
        self.remaining_files = glob.glob(os.path.join(pickled_data_path, "*.pkl"))
        self.completed_files = []
        self.local_index = 0
        self.current_df = None
        self.mode = mode

    def __iter__(self):
        # initialize first file
        
        f = self.remaining_files.pop(0)
        self.completed_files.append(f)
        self.local_index = 0
        self.current_df = pd.read_pickle(f)
        self.current_df = self.current_df.reset_index(drop=True)
        
        return self

    def __next__(self):
        # load next and remove current dataframe if current index is more than the size of the current dataframe
        
        self.local_index += 1
        
        if self.local_index >= len(self.current_df.index):
            # completed all files
            if not self.remaining_files:
                raise StopIteration
            else:
                f = self.remaining_files.pop(0)
                self.completed_files.append(f)
                self.local_index = 0
                self.current_df = pd.read_pickle(f)
                self.current_df = self.current_df.reset_index(drop=True)
                gc.collect()
        
        input_tensor = self.current_df["input_tensor"][self.local_index]
        output_tensor = self.current_df["output_tensor"][self.local_index]
        row_id =  self.current_df["id"][self.local_index]

        return input_tensor, output_tensor, row_id
    
    def __len__(self):
        # parse through all files to determine total number of examples
        
        total_length = 0
        all_files = self.remaining_files + self.completed_files
        
        for f in all_files:
            df = pd.read_pickle(f)
            total_length += len(df.index)
        
        return total_length

In [92]:
def load_data(train_data_path, valid_data_path, test_data_path):
    trainset =  GeneralPytorchDataset("train", train_data_path)
    validset = GeneralPytorchDataset("valid", valid_data_path)
    testset = GeneralPytorchDataset("test", test_data_path)
    return trainset, validset, testset

## Training and Validation

In [171]:
def train_coco(val_annotations_path, train_data_path, valid_data_path, test_data_path, vocab, checkpoint_dir, config):
    import os
    import time
    import sys
    import math
    import numpy as np
    import json
    import torch
    import torch.nn as nn
    from torch.utils.data import DataLoader

    from coco_proc.model import EncoderCNN, DecoderRNN
    from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
    
    def get_actual_annotations(annotations_path):
        data_json = None
        with open(annotations_path) as f:
            data_json = json.load(f)
        annotations = {}
        annotations_list = data_json['annotations']
        for i in annotations_list:
            if not i["image_id"] in annotations:
                annotations[i["image_id"]] = []
            annotations[i["image_id"]].append(i["caption"])
        return annotations
    
    def word_list(word_idx_list, vocab):
        word_list = []
        for i in range(len(word_idx_list)):
            vocab_id = word_idx_list[i]
            word = vocab.idx2word[vocab_id]
            if word == vocab.end_word:
                break
            if word != vocab.start_word:
                word_list.append(word)
        return word_list

    annotations_valid = get_actual_annotations(val_annotations_path)
    # Initialize smoothing function
    smoothing = SmoothingFunction()
    
    vocab_size = len(vocab)
    learning_rate = config["learning_rate"]
    batch_size = config["batch_size"]
    embed_size = config["embed_size"]
    hidden_size = config["hidden_size"]
    
    encoder = EncoderCNN(embed_size)
    decoder = DecoderRNN(embed_size, hidden_size, vocab_size)
    
    device = "cpu"
    if torch.cuda.is_available():
        device = "cuda:0"
        if torch.cuda.device_count() > 1:
            encoder = nn.DataParallel(encoder)
            decoder = nn.DataParallel(decoder)
    encoder.to(device)
    decoder.to(device)
    
    criterion = nn.CrossEntropyLoss().cuda() if torch.cuda.is_available() else nn.CrossEntropyLoss()
    
    # Specify the learnable parameters of the model
    params = list(decoder.parameters()) + list(encoder.embed.parameters()) + list(encoder.bn.parameters())
    optimizer = torch.optim.Adam(params=params, lr=learning_rate)
#     if checkpoint_dir:
#         checkpoint = torch.load(
#             os.path.join(checkpoint_dir, "checkpoint"))
        
#         encoder.load_state_dict(checkpoint['encoder'])
#         decoder.load_state_dict(checkpoint['decoder'])
#         optimizer.load_state_dict(checkpoint['optimizer'])
        
    trainset, validset, testset = load_data(train_data_path, valid_data_path, test_data_path)
    print("trainset length:" + str(len(trainset)))
    print("valset length:" + str(len(validset)))
    
    train_loader = torch.utils.data.DataLoader(
        trainset,
        batch_size=int(batch_size),
        num_workers=8, drop_last=True)
    val_loader = torch.utils.data.DataLoader(
        validset,
        batch_size=int(batch_size),
        num_workers=8, drop_last=True) # num workers here given explicitly
    
    print(config)
    for epoch in range(1):
        total_train_loss = 0.0
        total_val_loss = 0.0
        epoch_steps = 0
        val_steps = 0
        total_bleu_4 = 0.0


        for i, batch in enumerate(train_loader, 0):
            images, captions = batch[0].to(device), batch[1].to(device)
            features = encoder(images)
            outputs = decoder(features, captions)
            loss = criterion(outputs.view(-1, vocab_size), captions.view(-1))
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_train_loss += loss.item()
            
            sys.stdout.flush()
            epoch_steps += 1
#             print("[%d, %d] loss: %.3f" % (epoch + 1, i + 1,
#                                                 total_train_loss / epoch_steps))
    
    
        with torch.no_grad():
            print("started validation")
            # Obtain the batch
            for batch in val_loader:
                images, captions, row_ids = batch[0].to(device), batch[1].to(device), batch[2]

                # Pass the inputs through the CNN-RNN model
                features = encoder(images)
                outputs = decoder(features, captions).to("cpu")

                # move outputs back to CPU
                captions = captions.to("cpu")

                # Calculate the total Bleu-4 score for the batch
                batch_bleu_4 = 0.0
                # Iterate over outputs. Note: outputs[i] is a caption in the batch
                # outputs[i, j, k] contains the model's predicted score i.e. how 
                # likely the j-th token in the i-th caption in the batch is the 
                # k-th token in the vocabulary.
                for i in range(len(outputs)):
                    predicted_ids = []
                    for scores in outputs[i]:
                        # Find the index of the token that has the max score
                        predicted_ids.append(scores.argmax().item())
                    # Convert word ids to actual words
                    predicted_word_list = word_list(predicted_ids, vocab)
                    caption_word_list = word_list(captions[i].numpy(), vocab)
                    # Calculate Bleu-4 score and append it to the batch_bleu_4 list
                    tokenized_references = [nltk.tokenize.word_tokenize(str(caption).lower())
                                           for caption in annotations_valid[row_ids[i].item()]]
                    batch_bleu_4 += sentence_bleu(tokenized_references, 
                                                   predicted_word_list, 
                                                   smoothing_function=smoothing.method1)
                total_bleu_4 += batch_bleu_4 / len(outputs)
                val_steps += 1
                # Calculate the batch loss
                loss = criterion(outputs.view(-1, len(vocab)), captions.view(-1))
                total_val_loss += loss.item()
                
                
                # Print validation statistics (on same line)
                sys.stdout.flush()
#                 print("Validation Stats: {}".format(stats))
                
        with tune.checkpoint_dir(epoch) as checkpoint_dir:
            path = os.path.join(checkpoint_dir, "checkpoint")
            torch.save({"encoder": encoder.state_dict(),
                "decoder": decoder.state_dict(),
                "optimizer" : optimizer.state_dict()
               }, path)

        tune.report(loss=(total_val_loss / val_steps), bleu=total_bleu_4 / val_steps)
    print("Finished training")
                

In [172]:
train_data_path = train_output_path
valid_data_path = valid_output_path
test_data_path = test_output_path

In [173]:
max_num_epochs = 1
# config = {
#         "learning_rate": tune.grid_search([1e-2, 1e-3]),
#         "embed_size": tune.grid_search([256, 512]),
#         "hidden_size": tune.grid_search([256, 512]),
#         "batch_size": tune.grid_search([2, 4, 8, 16])
#     }
config = {
        "learning_rate": tune.grid_search([1e-3]),
        "embed_size": tune.grid_search([256,512]),
        "hidden_size": tune.grid_search([256]),
        "batch_size": tune.grid_search([2])
    }

scheduler = ASHAScheduler(
        metric="loss",
        mode="min",
        max_t=max_num_epochs,
        grace_period=1,
        reduction_factor=2)


# checkpoint_dir = "."
reporter = CLIReporter(
        # parameter_columns=["l1", "l2", "lr", "batch_size"],
        metric_columns=["loss", "bleu", "training_iteration"])
result = tune.run(
    partial(train_coco, valid_annotations_path, train_data_path, valid_data_path, test_data_path, train_vocab, checkpoints_dir),
    resources_per_trial={"cpu": 2, "gpu": 0},
    config=config,
    scheduler=scheduler,
    progress_reporter=reporter)

In [None]:
# https://pytorch.org/tutorials/beginner/hyperparameter_tuning_tutorial.html
# implement this for hyperparameter tuning (reference)