In [1]:
# google: torchvision optical flow

from pathlib import Path
import numpy as np
from skimage import io
from skimage.transform import rescale, resize
from scipy.ndimage import gaussian_filter
from scipy import ndimage

import torch
import matplotlib.pyplot as plt
import torchvision.transforms.functional as F
import torchvision.transforms as T

imginfo = lambda img: print(type(img), img.dtype, img.shape, img.min(), img.max())

from torchvision.models.optical_flow import raft_large

device = "cpu"
model = raft_large(pretrained=True, progress=False).to(device)
model = model.eval()

  warn(


In [2]:
data_dir = Path("data")

# === load first touch coors
import json
with open("data/single_log.json") as f:
    episode_log = json.load(f)
first_x, first_y, first_z = map(int, episode_log["first_touch"])
cropw = 200

def cut_crop(image):
    """
    :param image: [h, w, c]
    """
    return image[first_y-cropw:first_y+cropw, first_x-cropw:first_x+cropw]

In [3]:
# === read numpy image
def read_image(path):
    return io.imread(path).astype(np.float32) / 255

# === prepare batch for RAFT
def to_batch(image):
    batch = torch.from_numpy(image)[None, :, :, :].to(torch.float32) / 255  # [h, w, c] -> [1, h, w, c]
    batch = batch.permute(0, 3, 1, 2)  # [1, h, w, c] -> [1, c, h, w]
    
    transforms = T.Compose(
        [
            T.ConvertImageDtype(torch.float32),
            T.Normalize(mean=0.5, std=0.5),  # map [0, 1] into [-1, 1]
            #T.Resize(size=(520, 960)),
        ]
    )
    batch = transforms(batch)
    return batch

# === compute flow with RAFT
from torchvision.utils import flow_to_image

def compute_flow_picture(img1_batch, img2_batch):
    with torch.no_grad():
        list_of_flows = model(img1_batch.to(device), img2_batch.to(device))
    print(f"length = {len(list_of_flows)} = number of iterations of the model")
    predicted_flows = list_of_flows[-1].detach()

    flow_imgs = flow_to_image(predicted_flows)
    
    # The images have been mapped into [-1, 1] but for plotting we want them in [0, 1]
    img1_batch = [(img1 + 1) / 2 for img1 in img1_batch]
    
    grid = [[img1, flow_img] for (img1, flow_img) in zip(img1_batch, flow_imgs)]
    
    return grid[0][1]

# === save image from RAFT

def save_torch(path, tensor):
    """
    tensor: [c, h, w] cuda float32 [0, 1]
    """
    img = (tensor.to("cpu").permute([1, 2, 0]).numpy() * 255).astype(np.uint8)
    io.imsave(path, img)


In [5]:
# === run flow
file_list = sorted(Path("data/frames").glob("center*jpg"))
i1 = read_image(file_list[2]) # [h, w, 3] float32 [0, 1]
i2 = read_image(data_dir / "frames/grip_image.jpg")

io.imsave("data/image_1.jpg", (i1 * 255).astype(np.uint8))
io.imsave("data/image_2.jpg", (i2 * 255).astype(np.uint8))

img1_batch = to_batch(i2)
img2_batch = to_batch(i1)
flow = compute_flow_picture(img1_batch, img2_batch)
save_torch("data/raft_flow.jpg", flow)

length = 12 = number of iterations of the model


In [6]:
# === segment flow

# idea is to take the moving region closest to the center.
# For that I first threshold the moving part,
# then take histogram of distances from the center.
def compute_flow(img1_batch, img2_batch):
    with torch.no_grad():
        list_of_flows = model(img1_batch.to(device), img2_batch.to(device))
    print(f"length = {len(list_of_flows)} = number of iterations of the model")
    predicted_flows = list_of_flows[-1].detach()

    return predicted_flows

In [7]:
flow = compute_flow(img1_batch, img2_batch)
imginfo(flow) # (u, v) - horizontal, vertical

length = 12 = number of iterations of the model
<class 'torch.Tensor'> torch.float32 torch.Size([1, 2, 720, 1280]) tensor(-36.4142) tensor(5.6763)


In [8]:
# magnitude
mag = (flow[0, 0, :, :] ** 2 + flow[0, 1, :, :] ** 2) ** 0.5
imginfo(mag)

<class 'torch.Tensor'> torch.float32 torch.Size([720, 1280]) tensor(0.0052) tensor(38.0201)


In [9]:
# hint: quantile
q = torch.tensor([0.5, 0.9, 0.95, 0.99])
print(torch.quantile(mag, q)) # distribution from whole image
print(torch.quantile(mag[mag > 1], q)) # distribtuion from moving region
# conclusion: unit of flow is pixels!

tensor([ 0.7376, 16.7610, 22.7723, 30.6017])
tensor([ 1.4278, 25.7486, 29.0186, 32.9353])


In [10]:
# area of the moving region?
_b, _c, h, w = flow.shape
print(h, w)
pixels = torch.sum(mag > 5).numpy()
print(pixels / (h * w))

720 1280
0.131083984375


In [13]:
def to_01(image):
    # image: numpy float32 [a, b]
    low, high = np.quantile(image, 0.01), np.quantile(image, 0.99)
    return np.clip(image / high, 0, 1)

mask = to_01(mag.to("cpu").numpy())
io.imsave("data/mask.jpg", (mask * 255).astype(np.uint8))

0.33602795004844666 30.601709728240966


In [20]:
# === visualize
def overlay_mask(canvas: np.ndarray, mask: np.ndarray):
    # canvas: [h, w, 3] float32 [0, 1]
    # mask: [h, w] int [0, 1]
    mask = np.tile(mask[:, :, None], (1, 1, 3))  # 1 channel to 3 channels
    mask[:, :, 1] = 0
    mask[:, :, 2] = 0
    
    c = np.copy(canvas)
    c =  c * 0.4 + mask * 0.6
    return c

overlay = overlay_mask(i2, mask)
io.imsave("data/overlay.jpg", (overlay * 255).astype(np.uint8))