# METRIC DEPTH TO SLOPE

METRIC DEPTH ESTIMATION BY DEPTH ANYTHING PRETRAINED MODEL

In [1]:
import argparse
import os
import glob

import numpy as np
from numpy.linalg import inv

from PIL import Image
import torchvision.transforms as transforms
import open3d as o3d
from tqdm import tqdm
from zoedepth.models.builder import build_model
from zoedepth.utils.config import get_config
import cv2
from model import LFD_RoadSeg, UNet
import torch
import torch.nn as nn
import torch.nn.functional as F

import time

from math import pi,atan2,sqrt
from matplotlib import pyplot as plt


# Global settings
FL = 715.0873
FY = 256 * 0.6
FX = 256 * 0.6
NYU_DATA = False
FINAL_HEIGHT = 392*2
FINAL_WIDTH = 512*2
INPUT_DIR = './my_test/input'
OUTPUT_DIR = './my_test/output'
DATASET = 'kitti' # Lets not pick a fight with the model's dataloader

Jupyter environment detected. Enabling Open3D WebVisualizer.
[Open3D INFO] WebRTC GUI backend enabled.
[Open3D INFO] WebRTCWindowSystem: HTTP handshake server disabled.


### Kalman Filter Code for Result revision

In [2]:
A = 1
H = 1
Q = 0
R = 4

def kalman_filter(z_meas, x_esti, P):
    """Kalman Filter Algorithm for One Variable."""
    # (1) Prediction.
    x_pred = A * x_esti
    P_pred = A * P * A + Q

    # (2) Kalman Gain.
    K = P_pred * H / (H * P_pred * H + R)

    # (3) Estimation.
    x_esti = x_pred + K * (z_meas - H * x_pred)

    # (4) Error Covariance.
    P = P_pred - K * H * P_pred

    return x_esti, P

### Slope Estimation by using Camera

In [17]:
# Set Parameters of Camera

h = 0.141 # camera heigts
Y_c = 0.141 # camera heigts
f = 390 # Focal Length


h = 0.6
Y_c = 0.6
f = 1000
#f = 1500

In [18]:
# Function of Convolution for Finding Slope's Edge
d = np.array([[0,1,0],[0,0,0],[0,-1,0]])
def conv(input_data, filters = d, stride = 1, pad = 1):
    H,W = input_data.shape
    f_H,f_W = filters.shape
    output_H = ( H + 2 * pad - f_H) // stride + 1
    output_W = ( W + 2 * pad - f_W) // stride + 1

    pad_data = np.pad(input_data, [(pad,pad),(pad,pad)],'constant')

    output = np.zeros((output_H,output_W))

    for h in range(output_H):
        h_start = h * stride
        h_end = h_start + f_H
        for w in range(output_W):
            w_start = w * stride
            w_end = w_start + f_W
            output[h,w] = np.sum(pad_data[h_start:h_end, w_start:w_end] * filters)
    
    return output

In [19]:
# Function of getting the lane center
def getCenter(road):
    center = []
    up = 0
    for i in range(100,390):
        arr = np.where(road[i,:] == 1)
        
        if len(arr[0]) == 0:
            pass
            #print(arr)
        else:
            center.append([i,int(np.mean(arr[0]))])
            
    return center


In [20]:
# Function of Getting Slope from Depth Map using Geometric method

def getSlopeWithRoad(data,road):
    
    H,W = data.shape

    # Get higher slope edge by using road segmentation
    realRoad = data*road
    midArr = getCenter(road)
    for h,w in midArr:
        h2,w2 = h,w
        if 150 < w2 < 400:
            break
    
    # Get gradient for getting lower slope edge
    gradx,grady = np.gradient(realRoad)
    # Get lower slope edge
    slopeEdge1 = np.where(grady[h2+40:,w2] == np.max(grady[h2+40:H-10,w2]))
    h1 = slopeEdge1[0][-1] + h2 + 40 + 15
    if h1 > 390:
        h1 = 390
    w1 = 262

    # Get the Slope of camera image
    v_b = -(int(h2)- 392//2)
    Z_tot = np.mean(data[h2,w2-10:w2+10])
    Z_b = Z_tot - np.mean(data[h1,w1-10:w1+10])
    tanb = (v_b*Z_tot/f + Y_c)/Z_b
    theta = atan2(tanb,1)*180/pi
    
    return theta, h2, h1

In [24]:
fn_tonumpy = lambda x : x.to('cpu').detach().numpy().transpose(0,2,3,1) # device 위에 올라간 텐서를 detach 한 뒤 numpy로 변환
fn_denorm = lambda x, mean, std : (x * std) + mean
fn_classifier = lambda x :  1.0 * (x > 0.5)  # threshold 0.5 기준으로 indicator function으로 classifier 구현

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
# 변환 (Transform) 설정
transform = transforms.Compose([
    transforms.ToTensor(),
    # 여기에 필요한 추가 변환을 넣으세요
])

def process_images(model,net):
        
        if torch.cuda.is_available():
            torch.cuda.empty_cache() # GPU 캐시 데이터 삭
    
        try:
            # Load file
            filename = './my_test/input/501015deg.avi'
            #filename = './my_test/input/10degree.avi'
            raw_video = cv2.VideoCapture(filename)
            filename = os.path.basename(filename)
            
                
            # 동영상 저장용 코드
            file_path = './my_test/output/result.mp4'
            fps = 25.40
            fourcc = cv2.VideoWriter_fourcc(*'DIVX')            # 인코딩 포맷 문자
            width = 518*2
            height = 392*2
            size = (int(width), int (height))                   # 프레임 크기
            out = cv2.VideoWriter(file_path, fourcc, fps, size)

            # Kalman Filter 
            # Initialization for system model.
            global A,H,Q,R,x,P
            A = 1
            H = 1
            Q = 0
            R = 4
            # Initialization for estimation.
            x_0 = 0
            P_0 = 6
            x,P = x_0, P_0
            
            prev_time = 0

            
            while raw_video.isOpened():

                    # get frame
                    ret, frame = raw_video.read()
                    frame_road_result = cv2.resize(frame, (518, 392))
                    current_time = time.time() - prev_time

                    if not ret :
                        break
                    if ret and current_time > 1/30:
                        prev_time = time.time()
                        #cv2.imshow('image',frame_road)
                        if cv2.waitKey(30) == 27:
                            break
                    
                    if current_time >= 1/29:
                        continue            
                

                    # road segmentation
                    frame_road = cv2.resize(frame, (224,224))
                    frame_road = cv2.cvtColor(frame_road, cv2.COLOR_BGR2RGB)
                    frame_road = transform(frame_road).unsqueeze(0)  # 배치 차원 추가
                    frame_road = torch.tensor(frame_road).to(device)
                    
                    with torch.no_grad():
                        output_road = net(frame_road)
                    output_road = fn_tonumpy(fn_classifier(output_road))
                    output_road = output_road.squeeze()
                    
                    # 0,1 reverse
                    road = np.zeros_like(output_road)
                    road[output_road == 0] = 1

                    # visualize of roadseg
                    road_img = (road * 255).astype(np.uint8)
                    output_road_img = Image.fromarray(road_img)

                    background = Image.fromarray(frame).resize((224,224)).convert('RGBA')
                    output_road_img = output_road_img.convert('RGBA')
                    output_road_img = Image.blend(background, output_road_img, alpha=0.5)
                    numpy_image = np.array(output_road_img)
                    opencv_image = cv2.cvtColor(numpy_image, cv2.COLOR_RGB2BGR)
                    opencv_image = cv2.resize(opencv_image,(518,392))
                    
                    # equalizing the resolution of depth map
                    road = Image.fromarray(road).resize((518,392))
                    road = np.array(road)

                    # Depth Estimation
                    image_tensor = transforms.ToTensor()(frame).unsqueeze(0).to('cuda' if torch.cuda.is_available() else 'cpu')
                    pred = model(image_tensor, dataset=DATASET)
                    
                    if isinstance(pred, dict):
                        pred = pred.get('metric_depth', pred.get('out'))
                    elif isinstance(pred, (list, tuple)):
                        pred = pred[-1]
                    
                    pred = pred.squeeze().detach().cpu().numpy()

                    # Visualize the depth map
                    depth = pred
                    depth = (depth - depth.min()) / (depth.max() - depth.min()) * 255.0
                    depth = depth.astype(np.uint8)
                    depth = cv2.applyColorMap(depth, cv2.COLORMAP_INFERNO)
                    
                    # Calculate the slope
                    theta, h2, h1= getSlopeWithRoad(pred,road)
                    
                    # Using Kalman Filiter
                    
                    # Initailize when the slope is ended
                    if -0.5 < theta < 0.5:
                        # Initialization for system model.
                        A = 1
                        H = 1
                        Q = 0
                        R = 4
                        # Initialization for estimation.
                        x_0 = 0
                        P_0 = 6
                        x,P = x_0, P_0

                    x,P = kalman_filter(theta,x,P)
                    x = round(x, 3)
                    theta = round(theta, 3)

                    # Display the Result
                    #text = "The slope is " + str(x)
                    text = "Theta " + str(theta) + " Kalman " + str(x)

                    org=(100,100)
                    result = np.full(shape=(392,518,3),fill_value=255,dtype=np.uint8)
                    font=cv2.FONT_HERSHEY_SIMPLEX
                    cv2.putText(result, text, org, font, 1,(0,0,0),2)

                    # Draw line of slope edge
                    cv2.line(frame_road_result, (10, h2), (500, h2),(255,0,0))
                    cv2.line(frame_road_result, (10, h1), (500, h1),(0,255,0))
                    
                    resultShow = np.hstack((frame_road_result,opencv_image))
                    resultShow2 = np.hstack((depth,result))
                    result_final = np.concatenate((resultShow,resultShow2), axis=0)

                    # Store the result 
                    out.write(result_final)

                    # Show the result
                    cv2.imshow('Result',result_final)
                    if cv2.waitKey(30) == 27:
                        break
                    
            out.release()
            raw_video.release()
            cv2.destroyAllWindows()

        except Exception as e:
            print(f"Error processing {filename}: {e}")

        return 


In [8]:
# For metric depth, using zoedepth model
model_name = 'zoedepth'
pretrained_resource = 'local::./checkpoints/depth_anything_metric_depth_outdoor.pt'
config = get_config(model_name, "eval", DATASET)
config.pretrained_resource = pretrained_resource
model = build_model(config).to('cuda' if torch.cuda.is_available() else 'cpu')
model.eval()

INFO - 2024-05-13 00:07:38,582 - vision_transformer - using MLP layer as FFN


Params passed to Resize transform:
	width:  518
	height:  392
	resize_target:  True
	keep_aspect_ratio:  False
	ensure_multiple_of:  14
	resize_method:  minimal
Using pretrained resource local::./checkpoints/depth_anything_metric_depth_outdoor.pt
Loaded successfully


ZoeDepth(
  (core): DepthAnythingCore(
    (core): DPT_DINOv2(
      (pretrained): DinoVisionTransformer(
        (patch_embed): PatchEmbed(
          (proj): Conv2d(3, 1024, kernel_size=(14, 14), stride=(14, 14))
          (norm): Identity()
        )
        (blocks): ModuleList(
          (0-23): 24 x NestedTensorBlock(
            (norm1): LayerNorm((1024,), eps=1e-06, elementwise_affine=True)
            (attn): MemEffAttention(
              (qkv): Linear(in_features=1024, out_features=3072, bias=True)
              (attn_drop): Dropout(p=0.0, inplace=False)
              (proj): Linear(in_features=1024, out_features=1024, bias=True)
              (proj_drop): Dropout(p=0.0, inplace=False)
            )
            (ls1): LayerScale()
            (drop_path1): Identity()
            (norm2): LayerNorm((1024,), eps=1e-06, elementwise_affine=True)
            (mlp): Mlp(
              (fc1): Linear(in_features=1024, out_features=4096, bias=True)
              (act): GELU(approximat

In [9]:
# 모델 불러오기
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

ckpt_dir = './checkpoint_lfd'  # 모델 파일 경로
net = UNet().to(device)  # 모델 인스턴스 생성
optim = torch.optim.Adam(net.parameters(), lr = 1e-3 )

# 네트워크 불러오기
def load(ckpt_dir,net,optim):
    if not os.path.exists(ckpt_dir): # 저장된 네트워크가 없다면 인풋을 그대로 반환
        epoch = 0
        return net, optim, epoch

    ckpt_lst = os.listdir(ckpt_dir) # ckpt_dir 아래 있는 모든 파일 리스트를 받아온다
    #print(ckpt_lst)
    ckpt_lst.sort(key = lambda f : int(''.join(filter(str.isdigit,f)))) # filter(str.isdigit, f) : f 에서 숫자만 뽑음 -> 체크포인트 모델을 epoch 순서대로 sort해서 최신꺼를 인덱싱 할 수 있게 한다.

    if torch.cuda.is_available():
        dict_model = torch.load('%s/%s' % (ckpt_dir,ckpt_lst[-1])) # 저장된 모델 체크 포인트 중 가장 최신꺼를 가져온다
        #print(dict_model)
    else:
      device = torch.device('cpu')
      dict_model = torch.load('%s/%s' % (ckpt_dir,ckpt_lst[0]),map_location=device)



    net.load_state_dict(dict_model['net'])
    optim.load_state_dict(dict_model['optim'])
    epoch = int(ckpt_lst[-1].split('epoch')[1].split('.pth')[0])

    return net,optim,epoch

net, optim, start_epoch = load(ckpt_dir = ckpt_dir, net = net, optim = optim)

In [25]:
process_images(model,net)

OpenCV: FFMPEG: tag 0x58564944/'DIVX' is not supported with codec id 12 and format 'mp4 / MP4 (MPEG-4 Part 14)'
OpenCV: FFMPEG: fallback to use tag 0x7634706d/'mp4v'
  frame_road = torch.tensor(frame_road).to(device)
