# Deep Learning Project

## **1.** Environment setup

In [None]:
!pip install torch torchvision --extra-index-url https://download.pytorch.org/whl/cu116
!pip install pyspark
!pip install sparktorch 
!pip install gdown 
!pip install torchvision
!pip install pyarrow

In [2]:
import os
import time

import torch
import torch.optim as optim
from torch.nn import TripletMarginLoss
from torch.optim.lr_scheduler import MultiStepLR

import pyspark
import pyspark.sql.functions as F
from pyspark.sql import *
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark import SparkContext, SparkConf

from sparktorch import (SparkTorch, serialize_torch_obj,
                        serialize_torch_obj_lazy)

from models.utils import *
from models.loss import ChanferLoss, ChanferLoss3d
from models.autoencoder import PointcloudAutoencoder
from models.spark_model import SparkPointcloudAutoencoder

### **1.1** Parameters

In [4]:
# Device
USE_GPU = True

# Hyperparameters
LEARNING_RATE = 0.0001
WEIGHT_DECAY = 0.001
NUM_POINTS = 2048
NUM_EPOCHS = 200
BATCH_SIZE = 32
NUM_CLASSES = 10

# Reproducibility
RANDOM_SEED = 42

# Spark 
SPARK_MAX_RECORDS_PER_BATCH = 1e3
SPARK_MAX_PARTITION_BYTES = 1e8
SPARK_NUM_CORES = 4

# Dataset
DATASET_FOLDER = "data"

# Model
USE_TRAINED_MODEL = True
USE_CONTRASTIVE_LEARNING = True

In [5]:
device = torch.device(f'cuda:0' if USE_GPU and torch.cuda.is_available() else 'cpu')

In [6]:
print('Using device:', device)
print()

# Clear cache
torch.cuda.empty_cache()

#Additional Info when using cuda
if device.type == 'cuda':
    print('Device:', torch.cuda.get_device_name(0))
    print('Memory Usage:')
    print('Allocated:', torch.cuda.memory_allocated(0)/1024**3, 'GB')
    print('Cached:   ', torch.cuda.memory_reserved(0)/1024**3, 'GB')

Using device: cuda:0

Device: NVIDIA GeForce GTX 1660 SUPER
Memory Usage:
Allocated: 0.0 GB
Cached:    0.0 GB


### **1.2** Reproducibility

### **1.3** Create Spark context

In [7]:
# create the session
conf = SparkConf() \
    .set("spark.ui.port", "4050") \
    .set('spark.executor.memory', '10G') \
    .set('spark.driver.memory', '10G') \
    .set('spark.driver.maxResultSize', '10G') \
    .set("spark.sql.execution.arrow.enabled", True) \
    .set("spark.sql.execution.arrow.maxRecordsPerBatch", int(SPARK_MAX_RECORDS_PER_BATCH)) \
    .set("spark.sql.files.maxPartitionBytes", int(SPARK_MAX_PARTITION_BYTES))

# create the context
sc = pyspark.SparkContext(conf=conf)
sc.setLogLevel("ERROR")

# create spark 
spark = SparkSession.builder.master("local[{}]".format(SPARK_NUM_CORES)).getOrCreate()

RuntimeError: Java gateway process exited before sending its port number

In [None]:
spark

In [None]:
sc._conf.getAll()

### **1.3** Data retrieval

In [8]:
print(f"Downloading dataset into {DATASET_FOLDER} folder...")
download_dataset(DATASET_FOLDER)

Downloading dataset into data folder...


['data\\part_0000.parquet',
 'data\\part_0001.parquet',
 'data\\part_0002.parquet',
 'data\\part_0003.parquet',
 'data\\part_0004.parquet',
 'data\\part_0005.parquet']

In [9]:
df = get_dataset(spark)

NameError: name 'spark' is not defined

In [10]:
df.show(n=5)

NameError: name 'df' is not defined

In [11]:
df.printSchema()

NameError: name 'df' is not defined

In [None]:
df2 = spark.read.parquet(*glob(os.path.join("data", "*.parquet")))

window = Window.orderBy("split").partitionBy("split") 
df2 = df2.withColumn("index", row_number().over(window) - 1) 

print("The shape of the dataset is {:d} rows by {:d} columns".format(df2.count(), len(df2.columns)))

In [None]:
print("The shape of the dataset is {:d} rows by {:d} columns".format(df.count(), len(df.columns)))

In [None]:
df.groupBy("split").count().show()

In [None]:
unbalanced_df = df.groupBy("label").count().collect()
labels = [ row['label'] for row in unbalanced_df ]
count = [ row['count'] for row in unbalanced_df ]

fig = plt.figure()
ax = fig.add_axes([0,0,1,1])
ax.bar(labels, count)
plt.show()

In [None]:
# balance the dataset
df = undersample(df)

In [None]:
df.groupBy("split").count().show()

In [None]:
balanced_df = df.groupBy("label").count().collect()
labels = [ row['label'] for row in balanced_df ]
count = [ row['count'] for row in balanced_df ]

fig = plt.figure()
ax = fig.add_axes([0,0,1,1])
ax.bar(labels, count)
plt.show()

In [12]:
train_set = PointCloudData(df, num_classes=NUM_CLASSES, split='train')
test_set = PointCloudData(df, num_classes=NUM_CLASSES, split='test')
val_set = PointCloudData(df, num_classes=NUM_CLASSES, split='val')

train_loader = DataLoader(dataset=train_set, batch_size=BATCH_SIZE, shuffle=True, num_workers=0, pin_memory=True)
test_loader = DataLoader(dataset=test_set, batch_size=BATCH_SIZE, num_workers=0, pin_memory=True)
val_loader = DataLoader(dataset=val_set, batch_size=BATCH_SIZE, num_workers=0, pin_memory=True)

NameError: name 'df' is not defined

In [None]:
print("No. of training samples:", len(train_loader.dataset))
print("No. of testing samples:", len(test_loader.dataset))
print("No. of val samples:", len(val_loader.dataset))

### **1.4** Data Visualization

In [None]:
show_dataset(train_set, NUM_CLASSES)

In [None]:
# get random sample
features, id = train_set[np.random.choice(range(len(train_set)))]
label = train_set.id2label[id]

# convert from flatten to 2048x3 object
features = features.view(2048, 3)

# show sample
print(label)
pcshow(*features.T)

### **1.5** Setup network

In [None]:
model = PointcloudAutoencoder(NUM_POINTS)
model.to(device)

In [None]:
# create optimizer
optimizer = optim.Adam(model.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)

In [None]:
# create scheduler
scheduler = MultiStepLR(optimizer, milestones=[100, 175, 250, 400, 800], gamma=0.1, verbose=True)

## Train Autoencoder

In [None]:
def get_triplet(features, labels):
    anchor = []
    positive = []
    negative = []

    for index, sample in enumerate(features):
        anchor.append(sample)
        label = labels[index]
       
        poswith_anchor = (labels == label).nonzero().flatten()
        pos = poswith_anchor[poswith_anchor != index]
        try:
            pos_pick = random.choice(pos).item() 
        except IndexError:
            pos_pick = index
        positive.append(features[pos_pick])

        neg = (labels != label).nonzero().flatten()
        try:
            negative_pick = random.choice(neg).item()
        except IndexError:
            negative_pick = index
        negative.append(features[negative_pick])

    return anchor, positive, negative

In [None]:
def compute_loss(model, features, labels):
    chamfer_loss = ChanferLoss3d() if next(model.parameters()).is_cuda else ChanferLoss()

    if USE_CONTRASTIVE_LEARNING:
        anchor, positive, negative = get_triplet(features, labels)

        anchor = torch.stack(anchor).to(device).float()
        positive = torch.stack(positive).to(device).float()
        negative = torch.stack(negative).to(device).float()

        # convert from flatten to object
        anchor = anchor.view(-1, 2048, 3)
        positive = positive.view(-1, 2048, 3)
        negative = negative.view(-1, 2048, 3)

        embed_anchor = model.embed(anchor.permute(0,2,1))
        decode_anchor = model.reconstruct(embed_anchor)
        embed_positive = model.embed(positive.permute(0,2,1))
        decode_positive = model.reconstruct(embed_positive)
        embed_negative = model.embed(negative.permute(0,2,1))
        decode_negative = model.reconstruct(embed_negative)

        criterion = TripletMarginLoss(margin=0.5)
        triplet_loss = criterion(embed_anchor, embed_positive, embed_negative)
        
        anchor_ch = chamfer_loss(anchor, decode_anchor)
        pos_ch = chamfer_loss(positive, decode_positive)
        negative_ch = chamfer_loss(negative, decode_negative)
        
        chamfer_losses = pos_ch + negative_ch + anchor_ch
        chamfer_losses_mean = torch.mean(chamfer_losses)
        total_loss = triplet_loss + chamfer_losses_mean

        return total_loss

    else:
        y_train = features.to(device).float().view(-1, 2048, 3)
        y_pred = model(y_train.permute(0,2,1))
        return chamfer_loss(y_train, y_pred)

In [None]:
def compute_epoch_loss_autoencoder(model, data_loader, loss_fn, device):
    model.eval()
    curr_loss, num_examples = 0., 0

    with torch.no_grad():
        for features, labels in data_loader:
            features = features.to(device).view(-1, 2048, 3)
            loss = compute_loss(model, features, labels)
            num_examples += features.size(0)
            curr_loss += loss

        curr_loss = curr_loss / num_examples
        return curr_loss

In [None]:
def train_autoencoder(num_epochs, model, optimizer, scheduler, device, train_loader, test_loader,
                         logging_interval=100, skip_epoch_stats=False, save_model=None):
    
    log_dict = {'train_loss_per_batch': [],
                'train_loss_per_epoch': [],
                'test_loss_per_epoch': []}
    
    start_time = time.time()
    for epoch in range(num_epochs):
        model.train()

        for batch_idx, (features, labels) in enumerate(train_loader):
            optimizer.zero_grad()
            features, labels = features.to(device).float(), labels.to(device)

            # forward and back propagation
            loss = compute_loss(model, features, labels)
            loss.backward()

            # update model parameters
            optimizer.step()

            # logging loss
            log_dict['train_loss_per_batch'].append(loss.item())

            if not batch_idx % logging_interval:
                print('Epoch: %03d/%03d | Batch %04d/%04d | Loss: %.4f'
                      % (epoch+1, num_epochs, batch_idx,
                          len(train_loader), loss))

        if not skip_epoch_stats:
            model.eval()

            with torch.set_grad_enabled(False):  # save memory during inference
                train_loss = compute_epoch_loss_autoencoder(model, train_loader, loss, device)
                test_loss = compute_epoch_loss_autoencoder(model, test_loader, loss, device)
                log_dict['train_loss_per_epoch'].append(train_loss.item())
                log_dict['test_loss_per_epoch'].append(test_loss.item())

                print('***Epoch: %03d/%03d | Train Loss: %.3f | Test Loss: %.3f' % (epoch+1, num_epochs, train_loss, test_loss))

                # plot train/test loss graph
                plt.plot(log_dict['train_loss_per_epoch'], label="Train")
                plt.plot(log_dict['test_loss_per_epoch'], label="Test")
                plt.legend()

                # save loss
                plt.savefig("output/autoencoder_loss.png")
                plt.close()

        print('Time elapsed: %.2f min' % ((time.time() - start_time)/60))
        
        if save_model is not None:
            save_autoencoder_state(model, NUM_CLASSES, scheduler, optimizer, log_dict)

        scheduler.step()

    print('Total Training Time: %.2f min' % ((time.time() - start_time)/60))
    return log_dict

In [None]:
def plot_training_loss(minibatch_losses, num_epochs, averaging_iterations=100, custom_label=''):

    iter_per_epoch = len(minibatch_losses) // num_epochs

    plt.figure()
    ax1 = plt.subplot(1, 1, 1)
    ax1.plot(range(len(minibatch_losses)),
             (minibatch_losses), label=f'Minibatch Loss{custom_label}')
    ax1.set_xlabel('Iterations')
    ax1.set_ylabel('Loss')

    if len(minibatch_losses) < 1000:
        num_losses = len(minibatch_losses) // 2
    else:
        num_losses = 1000

    ax1.set_ylim([0, np.max(minibatch_losses[num_losses:])*1.5])
    ax1.plot(np.convolve(minibatch_losses,
                         np.ones(averaging_iterations,)/averaging_iterations,
                         mode='valid'),
             label=f'Running Average{custom_label}')
    
    ax1.legend()

    ###################
    # Set scond x-axis
    ax2 = ax1.twiny()
    newlabel = list(range(num_epochs+1))

    newpos = [e*iter_per_epoch for e in newlabel]

    ax2.set_xticks(newpos[::50])
    ax2.set_xticklabels(newlabel[::50])

    ax2.xaxis.set_ticks_position('bottom')
    ax2.xaxis.set_label_position('bottom')
    ax2.spines['bottom'].set_position(('outward', 45))
    ax2.set_xlabel('Epochs')
    ax2.set_xlim(ax1.get_xlim())
    ###################

    plt.tight_layout()

In [None]:
def plot_generated_images(data_loader, model, device, n_images=1):
    features, labels = list(data_loader)[0]
    features, labels = features.to(device).float(), labels.to(device)
    
    with torch.no_grad():
        model.eval()
        features = features.view(-1, 2048, 3)
        decoded_images = model(features.permute(0, 2, 1))

    orig_images = features[:n_images]
    
    for orig, decoded in zip(orig_images, decoded_images):
        pcshow(*orig.cpu().T)
        pcshow(*decoded.cpu().T)

In [None]:
if not USE_TRAINED_MODEL:
    log_dict = train_autoencoder(num_epochs=NUM_EPOCHS, model=model, 
                                 optimizer=optimizer, scheduler=scheduler, 
                                 device=device, save_model=True,
                                 train_loader=train_loader,
                                 test_loader=test_loader,
                                 skip_epoch_stats=False,
                                 logging_interval=10)
else:
    log_dict = load_autoencoder_state(model, NUM_CLASSES, USE_CONTRASTIVE_LEARNING, scheduler, optimizer)

In [None]:
# plot training loss
plot_training_loss(log_dict['train_loss_per_batch'], num_epochs=NUM_EPOCHS, averaging_iterations=len(train_loader))
plt.show()

In [None]:
# plot generated images
plot_generated_images(data_loader=train_loader, model=model, device=device)

# SparkTorch Training

## Vectorize Features Column

In [None]:
seqAsVector = udf(lambda x: Vectors.dense(x), VectorUDT())
df = df.select(*df.columns, seqAsVector(F.col("features")).alias("vectorized_features"))

## Build the PyTorch object

In [None]:
# create torch object
torch_obj = serialize_torch_obj_lazy(
    model=SparkPointcloudAutoencoder,
    criterion=ChanferLoss,
    optimizer=torch.optim.Adam,
    optimizer_params={'lr': LEARNING_RATE },
    model_parameters={ 'num_points': NUM_POINTS }
)

In [None]:
# setup features
vector_assembler = VectorAssembler(inputCols=["vectorized_features"], outputCol="assembler_features")

In [None]:
# create spark model
spark_model = SparkTorch(
    inputCol='assembler_features',
    labelCol='assembler_features',
    predictionCol='predictions',
    torchObj=torch_obj,
    iters=50,
    verbose=1,
    miniBatch=32,
    partitions=SPARK_NUM_CORES,
    earlyStopPatience=20,
    validationPct=0,
    useVectorOut=True
)

In [None]:
# filter dataset
dataset = df.filter(df['split'] == 'train')

# create dataset for training
spark_dataset = vector_assembler.transform(dataset)
spark_dataset = spark_dataset.select("assembler_features")
spark_dataset.cache()
spark_dataset.show()

In [None]:
pymodel = spark_model.fit(spark_dataset).getPytorchModel()

In [None]:
first = spark_dataset.first()
input = np.array(first.assembler_features.toArray()).reshape(2048, 3)
output = pymodel(torch.from_numpy(np.array([ np.array(first.assembler_features) ])).to("cpu").float())

In [None]:
pcshow(*input.T)
pcshow(*output[0].detach().numpy().T)