In [0]:
# http://pytorch.org/
from os.path import exists
from wheel.pep425tags import get_abbr_impl, get_impl_ver, get_abi_tag
platform = '{}{}-{}'.format(get_abbr_impl(), get_impl_ver(), get_abi_tag())
cuda_output = !ldconfig -p|grep cudart.so|sed -e 's/.*\.\([0-9]*\)\.\([0-9]*\)$/cu\1\2/'
accelerator = cuda_output[0] if exists('/dev/nvidia0') else 'cpu'

!pip install -q http://download.pytorch.org/whl/{accelerator}/torch-0.4.1-{platform}-linux_x86_64.whl torchvision
import torch

In [0]:
import numpy as np
import matplotlib.pyplot as plt
import torch 
import torch.utils.data
import torch.nn as nn

In [0]:
from google.colab import files
uploaded = files.upload()
!ls

In [0]:
!ls

In [0]:
#now access your file uploaded in your google drive folder "/temp"
image_appearance = np.load("img_warped.npy")
image_appearance.shape

In [0]:
class Flatten(nn.Module):
    def __init__(self):
        super(Flatten, self).__init__()

    def forward(self, x):
        return x.view(x.size(0), -1)
    
class Reshape(nn.Module):
    def __init__(self, *args):
        super(Reshape, self).__init__()
        self.shape = args

    def forward(self, x):
        return x.view(self.shape)

In [0]:
class AutoEncoder_Appearance(nn.Module):
    def __init__(self):
        super(AutoEncoder_Appearance, self).__init__()
        
        self.encoder = nn.Sequential(
            # TODO: Fill in the encoder structure
            nn.Conv2d(3, 16, kernel_size=5, stride=2, padding=2),
            nn.LeakyReLU(),
            nn.Conv2d(16, 32, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(),
            nn.Conv2d(64, 128, kernel_size=3, stride=2, padding=1),
            nn.LeakyReLU(),          
        )

        self.fc1 = nn.Sequential(                    
            # TODO: Fill in the FC layer structure
            Flatten(),
            nn.Linear(128 * 8 * 8, 50),
            nn.LeakyReLU(),
        )

        self.decoder = nn.Sequential(
            # TODO: Fill in the decoder structure
            # Hint: De-Conv in PyTorch: ConvTranspose2d 
            Reshape(-1,50, 1, 1),
            nn.ConvTranspose2d(50, 128, kernel_size=8, stride=1),
            nn.LeakyReLU(),
            nn.ConvTranspose2d(128, 64, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.LeakyReLU(),
            nn.ConvTranspose2d(64, 32, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.LeakyReLU(),
            nn.ConvTranspose2d(32, 16, kernel_size=3, stride=2, padding=1, output_padding=1),
            nn.LeakyReLU(),
            nn.ConvTranspose2d(16, 3, kernel_size=5, stride=2, padding=2, output_padding=1),
            nn.Sigmoid(),
        )

    def forward(self, x):
            self.code50 = self.encoder(x)
            self.fcd = self.fc1(self.code50)  
            return self.decoder(self.fcd) 

In [0]:
image_appearance = image_appearance / 255
training_data = image_appearance[:800]
testing_data = image_appearance[800:]

In [0]:
training_data = training_data.astype(np.float32)
testing_data = testing_data.astype(np.float32)

In [0]:
#show an example
random_idx = np.random.randint(len(training_data))
print(random_idx)
plt.imshow(training_data[random_idx])
plt.show()

In [0]:
train_loader = torch.utils.data.DataLoader(training_data.transpose((0,3,1,2)), batch_size=100, shuffle=True, num_workers=0)

In [0]:
print("GPU?: ", torch.cuda.is_available())

In [0]:
learning_rate = 7e-4
num_epochs = 500

In [0]:
model_appearance = AutoEncoder_Appearance()

if torch.cuda.is_available():
      model_appearance = model_appearance.cuda()

criterion = nn.MSELoss()
optimizer = torch.optim.Adam(model_appearance.parameters(), lr=learning_rate)
                             #weight_decay=1e-5)

In [0]:
for epoch in range(num_epochs):
    for batch_idx, x in enumerate(train_loader):
        if torch.cuda.is_available():
            x = x.cuda()
        # ===================forward=====================
        output = model_appearance(x)
        loss = criterion(output, x)
        # ===================backward====================
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()
    # ===================log========================
        if batch_idx % 10 == 0 and (epoch+1)%50==0:
            print('epoch [{}/{}]'
                  .format(epoch+1, num_epochs),"batch: ", batch_idx, " loss: ",loss.data)
            pic = output.cpu().data.numpy()[0]
            plt.subplot(1,2,1)
            plt.imshow(pic.transpose(1,2,0))
            
            
            pic = x.cpu().data.numpy()[0]
            plt.subplot(1,2,2)
            plt.imshow(pic.transpose(1,2,0))
    
            plt.show()

torch.save(model_appearance.state_dict(), "autoencoder_appearance.pt")

In [0]:
num_epoch=np.array([50, 100, 150, 200, 250, 300, 350, 400, 450,  500])
rc_err_appearance=np.array([0.0032,0.0029,0.0024,0.0025,0.0022,0.0021,0.0021,0.0020,0.0017,0.0017])
plt.figure()
plt.plot(num_epoch,rc_err_appearance)
plt.xlabel("Number of epoches")
plt.ylabel("Reconstruction  Error)")
plt.savefig('ae_1_1.png')

In [0]:
image_landmarks = np.load("image_landmarks.npy")

In [0]:
###Define AutoEncoder for landmarks
class AutoEncoder_Landmarks(nn.Module):
    def __init__(self):
        super(AutoEncoder_Landmarks, self).__init__()
        
        self.encoder = nn.Sequential(
            nn.Linear(68*2, 100),
            nn.LeakyReLU(),
            nn.Linear(100, 10),
            nn.LeakyReLU()
        )
        
        self.decoder = nn.Sequential(
            nn.Linear(10, 100),
            nn.LeakyReLU(),
            nn.Linear(100, 68*2),
            nn.Sigmoid()
        )
        
    def forward(self, x):
        self.code10 = self.encoder(x)
        return self.decoder(self.code10)

In [0]:
training_landmarks = image_landmarks[:800]
testing_landmarks = image_landmarks[800:]

training_landmarks_flatten = training_landmarks.reshape(-1, 136)
testing_landmarks_flatten = testing_landmarks.reshape(-1, 136)


In [0]:
from sklearn.preprocessing import StandardScaler

scaler_landmark_mean = StandardScaler(with_std = False)

scaler_landmark_mean.fit(training_landmarks_flatten)

mean_landmark = scaler_landmark_mean.mean_

In [0]:
mean_landmark = mean_landmark.reshape(68, 2)

In [0]:
from sklearn.preprocessing import MinMaxScaler

scaler_landmark = MinMaxScaler()
training_landmarks_flatten_scaled = scaler_landmark.fit_transform(training_landmarks_flatten)
testing_landmarks_flatten_scaled = scaler_landmark.transform(testing_landmarks_flatten)


training_landmarks_flatten_scaled = training_landmarks_flatten_scaled.astype(np.float32)
testing_landmarks_flatten_scaled = testing_landmarks_flatten_scaled.astype(np.float32)

In [0]:
landmark_train_loader = \
    torch.utils.data.DataLoader(training_landmarks_flatten_scaled, batch_size=100, shuffle=True, num_workers=0)

In [0]:
learning_rate = 7e-4
num_epochs = 500
weight_decay = 1e-6
batch_size = 100

In [0]:
model_landmark = AutoEncoder_Landmarks()

if torch.cuda.is_available():
      model_landmark = model_landmark.cuda()

criterion_landmark = nn.MSELoss()
optimizer_landmark = torch.optim.Adam(model_landmark.parameters(), lr=learning_rate,
                             weight_decay= weight_decay)

In [0]:

for epoch in range(num_epochs):
    for batch_idx, x in enumerate(landmark_train_loader):
        if torch.cuda.is_available():
            x = x.cuda()
        
        # ===================forward=====================
        output = model_landmark(x)
        loss = criterion_landmark(output, x)
        # ===================backward====================
        optimizer_landmark.zero_grad()
        loss.backward()
        optimizer_landmark.step()
        # ===================log========================
        if batch_idx % 10 == 0 and (epoch+1)%50==0:
            print('epoch [{}/{}]'
                  .format(epoch+1, num_epochs),"batch: ", batch_idx, " loss: ",loss.data)
      
      # if epoch % 500 == 0:
     #torch.save(model.state_dict(), "/content/drive/My Drive/temp/autoencoder_landmark.pt")
torch.save(model_landmark.state_dict(), "autoencoder_landmark.pt")

In [0]:
#num_epoch=np.array([50, 100, 150, 200, 250, 300, 350, 400, 450,  500])
#rc_err_lanmarks=np.array([0.0004,0.00049,0.0005,0.0003,0.0004,0.0021,0.0021,0.0020,0.0017,0.0017])
#plt.figure()
#plt.plot(num_epoch,rc_err_appearance)
#plt.xlabel("Number of epoches")
#plt.ylabel("Reconstruction  Error)")
#plt.savefig('ae_1_1.png')

In [0]:
model_landmark.load_state_dict(torch.load('autoencoder_landmark.pt'))
model_landmark = model_landmark.cpu()

In [0]:
# #generate random landmarks
# random_landmark_code10 = torch.from_numpy(np.random.rand(50, 10))
# random_landmark_code10 = random_landmark_code10.float()
# generated_landmarks = model_landmark.decoder(random_landmark_code10)

In [0]:
model_appearance.load_state_dict(torch.load("autoencoder_appearance.pt"))
model_appearance = model_appearance.cpu()

In [0]:
#generate images
random_image_code50 = torch.from_numpy(np.random.rand(50, 50))
random_image_code50 = random_image_code50.float()


generated_images = model_appearance.decoder(random_image_code50)

generated_images = generated_images.data.numpy().transpose(0,2,3,1)

In [0]:
generated_images.shape

In [0]:
#show 50 synthesized images
for i in range(50):
    plt.subplot(5, 10, i+1)
    plt.imshow(generated_images[i])
    plt.axis("off")
    
plt.show()

In [0]:
sample_testing_images = model_appearance(torch.from_numpy(testing_data[:20].transpose((0,3,1,2))))

sample_testing_images = sample_testing_images.data.numpy().transpose((0,2,3,1))

In [0]:
sample_testing_images.shape

In [0]:
sample_testing_landmarks = model_landmark(torch.from_numpy(testing_landmarks_flatten_scaled[:20]))

In [0]:
sample_testing_landmarks = sample_testing_landmarks.data.numpy()
sample_testing_landmarks = scaler_landmark.inverse_transform(sample_testing_landmarks)

In [0]:
sample_testing_landmarks = sample_testing_landmarks.reshape(-1, 68, 2)

In [0]:
!pip install imageio

In [0]:
# -*- coding: utf-8 -*-

import numpy as np
import cv2
import matplotlib.pyplot as plt
import matplotlib.gridspec as gridspec
import sys
import datetime
import imageio

##################################
# Apply affine transform calculated using srcTri and dstTri to src and
# output an image of size.
def applyAffineTransform(src, srcTri, dstTri, size) :
    
    # Given a pair of triangles, find the affine transform.
    warpMat = cv2.getAffineTransform( np.float32(srcTri), np.float32(dstTri) )
    
    # Apply the Affine Transform just found to the src image
    dst = cv2.warpAffine( src, warpMat, (size[0], size[1]), None, flags=cv2.INTER_LINEAR, borderMode=cv2.BORDER_REFLECT_101 )

    return dst


# Check if a point is inside a rectangle
def rectContains(rect, point) :
    if point[0] < rect[0] :
        return False
    elif point[1] < rect[1] :
        return False
    elif point[0] > rect[0] + rect[2] :
        return False
    elif point[1] > rect[1] + rect[3] :
        return False
    return True

#calculate delanauy triangle
def calculateDelaunayTriangles(rect, points):
    #create subdiv
    subdiv = cv2.Subdiv2D(rect);
    
    # Insert points into subdiv
    for p in points:
        p1=(int(p[0]),int(p[1]))
        if p1[1]<=rect[2]-1 and p1[0]<=rect[2]-1 and p1[1]>=rect[0] and p1[0]>=rect[0]:
            subdiv.insert(p1) 
    
    triangleList = subdiv.getTriangleList();
    
    delaunayTri = []
    
    pt = []    
        
    for t in triangleList:        
        pt.append((t[0], t[1]))
        pt.append((t[2], t[3]))
        pt.append((t[4], t[5]))
        
        pt1 = (t[0], t[1])
        pt2 = (t[2], t[3])
        pt3 = (t[4], t[5])        
        
        if rectContains(rect, pt1) and rectContains(rect, pt2) and rectContains(rect, pt3):
            ind = []
            #Get face-points (from 68 face detector) by coordinates
            for j in range(0, 3):
                for k in range(0, len(points)):                    
                    if(abs(pt[j][0] - points[k][0]) < 1.0 and abs(pt[j][1] - points[k][1]) < 1.0):
                        ind.append(k)    
            # Three points form a triangle. Triangle array corresponds to the file tri.txt in FaceMorph 
            if len(ind) == 3:                                                
                delaunayTri.append((ind[0], ind[1], ind[2]))
        
        pt = []        
            
    
    return delaunayTri

# Warps and alpha blends triangular regions from img1 and img2 to img
def warpTriangle(img1, img2, t1, t2) :

    # Find bounding rectangle for each triangle
    r1 = cv2.boundingRect(np.float32([t1]))
    r2 = cv2.boundingRect(np.float32([t2]))

    # Offset points by left top corner of the respective rectangles
    t1Rect = [] 
    t2Rect = []
    t2RectInt = []

    for i in range(0, 3):
        t1Rect.append(((t1[i][0] - r1[0]),(t1[i][1] - r1[1])))
        t2Rect.append(((t2[i][0] - r2[0]),(t2[i][1] - r2[1])))
        t2RectInt.append(((t2[i][0] - r2[0]),(t2[i][1] - r2[1])))

    w,h,num_chans = img1.shape
    # Get mask by filling triangle
    mask = np.zeros((r2[3], r2[2], num_chans), dtype = np.float32)
    cv2.fillConvexPoly(mask, np.int32(t2RectInt), (1.0, 1.0, 1.0), 16, 0);

    # Apply warpImage to small rectangular patches
    img1Rect = img1[r1[1]:r1[1] + r1[3], r1[0]:r1[0] + r1[2]]
    #img2Rect = np.zeros((r2[3], r2[2]), dtype = img1Rect.dtype)
    
    size = (r2[2], r2[3])

    img2Rect = applyAffineTransform(img1Rect, t1Rect, t2Rect, size)
    if num_chans==1:
        img2Rect=np.reshape(img2Rect,(r2[3], r2[2], num_chans))
    
    img2Rect = img2Rect * mask

    # Copy triangular region of the rectangular patch to the output image
    if num_chans==1:
        img2[r2[1]:r2[1]+r2[3], r2[0]:r2[0]+r2[2]] = img2[r2[1]:r2[1]+r2[3], r2[0]:r2[0]+r2[2]] * ( 1.0 - mask )
     
    else:
        img2[r2[1]:r2[1]+r2[3], r2[0]:r2[0]+r2[2]] = img2[r2[1]:r2[1]+r2[3], r2[0]:r2[0]+r2[2]] * ( (1.0, 1.0, 1.0) - mask )
     
    img2[r2[1]:r2[1]+r2[3], r2[0]:r2[0]+r2[2]] = img2[r2[1]:r2[1]+r2[3], r2[0]:r2[0]+r2[2]] + img2Rect 

###################################
def warp(Image,sc,tc):
    '''
    Image: the image to be warped
    sc: original landmarks
    tc: warped landmarks
    '''
    HW,_,_=Image.shape
    cornerps=[[0,0],[0,HW-1],[HW-1,0],[HW-1,HW-1]]
    #cornerps=[[0,0],[0,HW-1],[HW-1,0],[HW-1,HW-1],[0,np.floor(HW/2)],[np.floor(HW/2),0],[HW-1,np.floor(HW/2)],[np.floor(HW/2),HW-1]]

    scl=sc.astype(np.int64).tolist()+cornerps
    tcl=tc.astype(np.int64).tolist()+cornerps
    imgWarped = np.copy(Image);    
    rect = (0, 0, HW, HW)
    dt = calculateDelaunayTriangles(rect,tcl)
# Apply affine transformation to Delaunay triangles
    for i in range(0, len(dt)):
        t1 = []
        t2 = []
        
        #get points for img1, img2 corresponding to the triangles
        for j in range(0, 3):
            t1.append(scl[dt[i][j]])
            t2.append(tcl[dt[i][j]])
        
        warpTriangle(Image, imgWarped, t1, t2)
    return imgWarped

#########################################
def plot(samples,Nh,Nc,channel,IMG_HEIGHT, IMG_WIDTH):
    fig = plt.figure(figsize=(Nc, Nh))
    plt.clf()
    gs = gridspec.GridSpec(Nh, Nc)
    gs.update(wspace=0.05, hspace=0.05)

    for i, sample in enumerate(samples[0:Nh*Nc,:,:,:]):
        ax = plt.subplot(gs[i])
        plt.axis('off')
        ax.set_xticklabels([])
        ax.set_yticklabels([])
        ax.set_aspect('equal')
        if channel==1:
            image=sample.reshape(IMG_HEIGHT, IMG_WIDTH)
            immin=(image[:,:]).min()
            immax=(image[:,:]).max()
            image=(image-immin)/(immax-immin+1e-8)
            plt.imshow(image,cmap ='gray')
        else:
            image=sample.reshape(IMG_HEIGHT, IMG_WIDTH,channel)
            immin=(image[:,:,:]).min()
            immax=(image[:,:,:]).max()
            image=(image-immin)/(immax-immin+1e-8)
            plt.imshow(image)
    return fig 






In [0]:
def get_warped_images(images, landmarks, to_landmarks):
    warped_images = np.zeros(images.shape)
    H = warped_images.shape[0]
    for i in range(H):
        if i % int(H / 5) == 0:
            print("Running at...", i,"/", H)
        warped_images[i] = warp(images[i], landmarks[i], to_landmarks[i])
        
    print("Finished")
    return warped_images

In [0]:
warped_back_images = get_warped_images(sample_testing_images, [mean_landmark for _ in range(20)], sample_testing_landmarks)
warped_real_images = get_warped_images(testing_data[:20], [mean_landmark for _ in range(20)], testing_landmarks[:20])

In [0]:
#plot top 10 reconstructed faces
for i in range(20):
    plt.subplot(4,5,i+1)
    pic = warped_back_images[i]
    plt.imshow(pic)
    plt.scatter(sample_testing_landmarks[i,:,0], sample_testing_landmarks[i,:,1],s =0.5)
    plt.axis("off")
    
plt.show()

for i in range(20):
    plt.subplot(4,5,i+1)
    pic = warped_real_images[i]
    plt.imshow(pic)
    plt.scatter(testing_landmarks[i,:,0], testing_landmarks[i,:,1],s =0.5)
    plt.axis("off")
    
plt.show()

In [0]:
def calculate_variance_rank(model, images):
    scaler_variance = StandardScaler()
    images_encode50 = model.encoder(torch.from_numpy(images)).data.numpy()
    scaler_variance.fit(images_encode50)
    
    def rank_simple(vector):
        return sorted(range(len(vector)), key=vector.__getitem__)

    return rank_simple(scaler_variance.var_)

In [0]:
ranks = calculate_variance_rank(model_appearance, training_data.transpose((0,3,1,2)))

In [0]:
print("the first 4 dimensions of the latent variables of appearance that have the maximal variance: ", ranks[-5:-1])

In [0]:
#get a random train image and show the interpolation along those four encoders
random_idx = np.random.randint(800)
sample_image = training_data[random_idx]
sample_landmark = training_landmarks[random_idx]

In [0]:
original_image = warp(sample_image, mean_landmark, sample_landmark)

In [0]:
#original image
plt.imshow(original_image)
plt.axis("off")
plt.show()

In [0]:
#generate encoded parts
sample_torch = torch.from_numpy(sample_image).unsqueeze(0).permute(0,3,1,2)
sample_code = model_appearance.encoder(sample_torch).data.numpy()[0]

In [0]:
interpolation_encodes = np.array([sample_code]*10)

#interpolate along four ranks with largest variance
ranks_first_four = ranks[::-1][:4]

for rank in ranks_first_four:
    for i in range(10):
        interpolation_encodes[i][rank] = 0.3*i

    interpolation_decodes = model.decoder(torch.from_numpy(interpolation_encodes))

    plt.figure(figsize = (12,12)) 

    for i in range(10):
        plt.subplot(1,10,i+1)
        pic = interpolation_decodes[i].data.numpy().transpose(1,2,0)
        warp_back_pic = warp(pic, mean_landmark, sample_landmark)
        plt.scatter(sample_landmark[:,0], sample_landmark[:,1], s=0.5)
        plt.imshow(warp_back_pic)
        plt.axis("off")
    plt.show()

In [0]:
interpolation_encodes = np.array([sample_code]*25)

#interpolate along two ranks with largest variance
rank_first, rank_second = ranks[-1], ranks[-2]

for i in range(5):
    for j in range(5):
        interpolation_encodes[5 * i + j][rank_first] = 0.5*i
        interpolation_encodes[5 * i + j][rank_second] = 0.5*j
        

interpolation_decodes = model.decoder(torch.from_numpy(interpolation_encodes))

plt.figure(figsize = (6,6)) 

for i in range(25):
    plt.subplot(5,5,i+1)
    pic = interpolation_decodes[i].data.numpy().transpose(1,2,0)
    warp_back_pic = warp(pic, mean_landmark, sample_landmark)
    plt.scatter(sample_landmark[:,0], sample_landmark[:,1], s=0.5)
    plt.imshow(warp_back_pic)
    plt.axis("off")
plt.show()