In [None]:
import sys
sys.path.append("../")
from sgan.data.loader import data_loader


In [None]:
import numpy as np
import cv2
import matplotlib.pyplot as plt
import torch

In [None]:
class Config:
    def __init__(self, obs_len=12, pred_len=8, delim="tab", skip=1, metric="meter", batch_size=16, loader_num_workers=1):
        self.obs_len = obs_len
        self.pred_len = pred_len
        self.delim = delim
        self.skip = skip
        self.metric = metric
        self.batch_size = batch_size
        self.loader_num_workers = loader_num_workers

In [None]:
train_path = "/mnt/c/Users/ktwzj/code/data/nba/nba-cross.s025/train_sample"
args = Config()
train_dset, train_loader = data_loader(args, train_path)

In [None]:
for batch in train_loader:
    obs_traj = batch[0]
    print(obs_traj.shape)
    break

In [None]:
field_width = 28.7
field_height = 15.2
scene = obs_traj[:, :11, :]
sampling_scale_factor = 50
sampling_resolution = (int(sampling_scale_factor * field_width), int(sampling_scale_factor * field_height) )
image_grid = np.zeros((11, sampling_resolution[1], sampling_resolution[0]))
image = np.zeros((sampling_resolution[1], sampling_resolution[0]), np.uint8)
def build_map_function(scale_factor):
    def f (x, y):
        return x * scale_factor, y * scale_factor
    return f
sampling_map_func = build_map_function(sampling_scale_factor)
# image = cv2.fromarray(image)
image
center_coordinates = (129, 30)
 
# Radius of circle
radius = 3
  
# Blue color in BGR
color = 128
  
# Line thickness of 2 px
thickness = -1
  
# Using cv2.circle() method
# Draw a circle with blue line borders of thickness of 2 px
agent = scene[:, 0, :]
for i in range(agent.shape[0]):
    x = agent[i,0].item()
    y = agent[i, 1].item()
#     print(x.item())
    x,y = sampling_map_func(x,y)
    center_coordinates = (int(x), int(y))
    image = cv2.circle(image, center_coordinates, radius, color, thickness)
blur = cv2.GaussianBlur(image,(5,5),0)
blur = cv2.GaussianBlur(blur,(5,5),0)
blur = cv2.GaussianBlur(blur,(5,5),0)

blur = cv2.GaussianBlur(blur,(5,5),0)
blur = cv2.GaussianBlur(blur,(5,5),0)
blur = cv2.GaussianBlur(blur,(5,5),0)
blur = cv2.GaussianBlur(blur,(5,5),0)
blur = cv2.GaussianBlur(blur,(5,5),0)
kernel = np.ones((3,3),np.uint8)

dilation = cv2.dilate(blur,kernel,iterations = 5)

plt.imshow(blur, cmap="gray")

plt.subplot(131),plt.imshow(image,cmap="gray"),plt.title('Original')
plt.xticks([]), plt.yticks([])
plt.subplot(132),plt.imshow(blur, cmap="gray"),plt.title('Blurred')
plt.xticks([]), plt.yticks([])
plt.subplot(133),plt.imshow(dilation, cmap="gray"),plt.title('Dilation')
plt.xticks([]), plt.yticks([])
plt.show()

# plt.show()
dilation

In [None]:

class TrajectoryDrawer:
    def __init__(self, sampling_scale_factor=50, target_size=[572, 572],drawing_mode="blur"):
        self.sampling_scale_factor = sampling_scale_factor
        self.drawing_mode=drawing_mode
        self.target_size = target_size
        self.agent_num = 11
    def transform_sampling_coords (self, x, y):
        return x * self.sampling_scale_factor, y * self.sampling_scale_factor
    
    def generate_trajectory_image(self, agent, image):
        for i in range(agent.size(0)):
            x = agent[i,0].item()
            y = agent[i, 1].item()
            x,y = self.transform_sampling_coords(x,y)
            center_coordinates = (int(x), int(y))
            radius = 3
            color = 128
            thickness = -1
            image = cv2.circle(image, center_coordinates, radius, color, thickness)

            image = cv2.GaussianBlur(image,(5,5),0)
            image = cv2.GaussianBlur(image,(5,5),0)
            image = cv2.GaussianBlur(image,(5,5),0)
            image = cv2.GaussianBlur(image,(5,5),0)
            image = cv2.GaussianBlur(image,(5,5),0)
            image = cv2.GaussianBlur(image,(5,5),0)
            kernel = np.ones((3,3),np.uint8)
            resized = cv2.resize(image,self.target_size, interpolation = cv2.INTER_CUBIC)

        return resized

    def generate_scene_image(self, scene):  
        channels = []
        for i in range(scene.size(1)):
            agent_image = np.zeros((sampling_resolution[1], sampling_resolution[0]), np.uint8)
            agent = scene[:, i, :]
            channel = self.generate_trajectory_image(agent, agent_image)
            channel = np.array(channel)
            channels.append(channel)
        scene_image = np.stack(channels, axis=2)
        return scene_image
    
    def generate_batch_images(self, traj_batch):
        batch_size = traj_batch.size(1) // self.agent_num
        batch_images = []
        for i in range(batch_size):
            scene = traj_batch[:, i*self.agent_num: (i+1) * self.agent_num, :]
            scene_image = self.generate_scene_image(scene)
            batch_images.append(scene_image)
        batch_images = np.stack(batch_images)
        batch_images = torch.Tensor(batch_images)
        batch_images = batch_images.permute(0, 3, 1, 2)
        return batch_images
drawer = TrajectoryDrawer()
image = drawer.generate_batch_images(obs_traj)
image.size()
# image.shape
# plt.imshow(image[:,:,:1], cmap="gray")
# plt.show()

In [None]:
import torch
import torch.nn as nn
import torch.nn.functional as F


class DoubleConv(nn.Module):
    """(convolution => [BN] => ReLU) * 2"""

    def __init__(self, in_channels, out_channels, mid_channels=None):
        super().__init__()
        if not mid_channels:
            mid_channels = out_channels
        self.double_conv = nn.Sequential(
            nn.Conv2d(in_channels, mid_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(mid_channels),
            nn.ReLU(inplace=True),
            nn.Conv2d(mid_channels, out_channels, kernel_size=3, padding=1),
            nn.BatchNorm2d(out_channels),
            nn.ReLU(inplace=True)
        )

    def forward(self, x):
        return self.double_conv(x)


class Down(nn.Module):
    """Downscaling with maxpool then double conv"""

    def __init__(self, in_channels, out_channels):
        super().__init__()
        self.maxpool_conv = nn.Sequential(
            nn.MaxPool2d(2),
            DoubleConv(in_channels, out_channels)
        )

    def forward(self, x):
        return self.maxpool_conv(x)


class Up(nn.Module):
    """Upscaling then double conv"""

    def __init__(self, in_channels, out_channels, bilinear=True):
        super().__init__()

        # if bilinear, use the normal convolutions to reduce the number of channels
        if bilinear:
            self.up = nn.Upsample(scale_factor=2, mode='bilinear', align_corners=True)
            self.conv = DoubleConv(in_channels, out_channels, in_channels // 2)
        else:
            self.up = nn.ConvTranspose2d(in_channels , in_channels // 2, kernel_size=2, stride=2)
            self.conv = DoubleConv(in_channels, out_channels)


    def forward(self, x1, x2):
        x1 = self.up(x1)
        # input is CHW
        diffY = x2.size()[2] - x1.size()[2]
        diffX = x2.size()[3] - x1.size()[3]

        x1 = F.pad(x1, [diffX // 2, diffX - diffX // 2,
                        diffY // 2, diffY - diffY // 2])
        # if you have padding issues, see
        # https://github.com/HaiyongJiang/U-Net-Pytorch-Unstructured-Buggy/commit/0e854509c2cea854e247a9c615f175f76fbb2e3a
        # https://github.com/xiaopeng-liao/Pytorch-UNet/commit/8ebac70e633bac59fc22bb5195e513d5832fb3bd
        x = torch.cat([x2, x1], dim=1)
        return self.conv(x)


class OutConv(nn.Module):
    def __init__(self, in_channels, out_channels):
        super(OutConv, self).__init__()
        self.conv = nn.Conv2d(in_channels, out_channels, kernel_size=1)

    def forward(self, x):
        return self.conv(x)

class LNet(nn.Module):
    def __init__(self, n_channels, bilinear=True):
        super(LNet, self).__init__()
        self.n_channels = n_channels
        self.bilinear = bilinear

        self.inc = DoubleConv(n_channels, 64)
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.down3 = Down(256, 512)
        factor = 2 if bilinear else 1
        self.down4 = Down(512, 1024 // factor)
        

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        return x5
 
    
class UNet(nn.Module):
    def __init__(self, n_channels, n_classes, bilinear=True):
        super(UNet, self).__init__()
        self.n_channels = n_channels
        self.n_classes = n_classes
        self.bilinear = bilinear

        self.inc = DoubleConv(n_channels, 64)
        self.down1 = Down(64, 128)
        self.down2 = Down(128, 256)
        self.down3 = Down(256, 512)
        factor = 2 if bilinear else 1
        self.down4 = Down(512, 1024 // factor)
        self.up1 = Up(1024, 512 // factor, bilinear)
        self.up2 = Up(512, 256 // factor, bilinear)
        self.up3 = Up(256, 128 // factor, bilinear)
        self.up4 = Up(128, 64, bilinear)
        self.outc = OutConv(64, n_classes)

    def forward(self, x):
        x1 = self.inc(x)
        x2 = self.down1(x1)
        x3 = self.down2(x2)
        x4 = self.down3(x3)
        x5 = self.down4(x4)
        x = self.up1(x5, x4)
        x = self.up2(x, x3)
        x = self.up3(x, x2)
        x = self.up4(x, x1)
        logits = self.outc(x)
        return logits


In [None]:
drawer = TrajectoryDrawer()
image = drawer.generate_batch_images(obs_traj)
model = LNet(n_channels=11)

In [None]:
# torch.empty_cache()
r = model(image)