# ch8 视觉里程计2
光流法跟踪特征点的原理是怎样的 ？
直接法是如何估计相机位姿的？
如何实现多层直接法的计算。

光流法跟踪特征点的原理：
光流法基于图像亮度恒定假设，认为同一个特征点在短时间内的连续帧中亮度不变。其核心思想是：

计算图像中特征点的局部梯度
利用泰勒展开，建立特征点位移与图像亮度变化的关系
通过求解方程组，估算特征点在下一帧中的新位置
常用的方法包括Lucas-Kanade光流法和金字塔Lucas-Kanade法。

直接法估计相机位姿：
直接法直接使用图像像素信息，而不依赖特征提取。其基本步骤是：

假设一个初始相机位姿
根据当前位姿，将上一帧的像素投影到当前帧
计算投影点与实际观测之间的光度误差
最小化这个光度误差，优化相机位姿
迭代上述过程直到收敛
直接法通常使用梯度下降或高斯牛顿法等优化算法来求解。

多层直接法的实现：
多层直接法是为了提高直接法的鲁棒性和收敛速度。其实现步骤如下：

构建图像金字塔，从粗到细多个层级
从最粗糙的层级开始估计位姿
将估计结果作为初值，在下一个更精细的层级中继续优化
重复这个过程，直到最精细的层级
在每一层中，使用与单层直接法相同的优化方法
这种方法可以处理更大的运动，并且能够避免陷入局部最优解。


目前在SLAM视觉里程计中，特征点法和直接法都有广泛应用，但各有其优势和适用场景。让我为您分析一下当前的主流趋势：

特征点法：

仍然是许多SLAM系统的主流选择
代表性工作如ORB-SLAM系列
优点：计算效率高，对光照变化和运动模糊较为鲁棒
适用于纹理丰富的环境
直接法：

近年来受到越来越多关注
代表性工作如DSO (Direct Sparse Odometry)
优点：能利用更多图像信息，在低纹理环境中表现更好
适用于计算资源充足的场景
混合方法：

结合特征点法和直接法的优势
如SVO (Semi-direct Visual Odometry)
在某些应用中表现出色
深度学习方法：

近年来快速发展，如DeepVO, ESP-VO等
潜力巨大，但目前在实时性和泛化能力上还有待提高
总的来说，特征点法由于其成熟度和效率，仍然是许多实际应用的首选。但直接法和混合方法在某些特定场景下展现出优势，也越来越受到重视。深度学习方法虽然还不是主流，但是一个值得关注的发展方向。

选择哪种方法通常取决于具体应用场景、硬件限制和精度要求。在实际项目中，往往需要根据具体需求来选择或结合使用这些方法


# Lucas-Kanade光流


In [None]:
import numpy as np
import cv2
import sys
import time

def find_feature_matches(img1, img2):
    orb = cv2.ORB_create()
    kp1 = orb.detect(img1, None)
    return kp1

def main(path_to_dataset):
    associate_file = path_to_dataset + "/associate.txt"
    
    try:
        with open(associate_file, 'r') as f:
            lines = f.readlines()
    except IOError:
        print("Cannot find associate.txt!")
        return

    keypoints = []  # 使用列表存储特征点，因为需要删除跟踪失败的点
    
    for index, line in enumerate(lines[:100]):  # 只处理前100帧
        time_rgb, rgb_file, time_depth, depth_file = line.strip().split()
        color = cv2.imread(path_to_dataset + "/" + rgb_file)
        depth = cv2.imread(path_to_dataset + "/" + depth_file, -1)
        
        if index == 0:
            # 对第一帧提取FAST特征点
            kps = find_feature_matches(color, None)
            keypoints = [kp.pt for kp in kps]
            last_color = color
            continue
        
        if color is None or depth is None:
            continue
        
        # 对其他帧用LK跟踪特征点
        if len(keypoints) > 0:
            prev_keypoints = np.array(keypoints, dtype=np.float32)
            start_time = time.time()
            next_keypoints, status, error = cv2.calcOpticalFlowPyrLK(last_color, color, prev_keypoints, None)
            end_time = time.time()
            print(f"LK Flow use time: {end_time - start_time:.4f} seconds.")
            
            # 把跟丢的点删掉
            keypoints = [pt for pt, st in zip(next_keypoints, status) if st[0] == 1]
            
            print(f"tracked keypoints: {len(keypoints)}")
            if len(keypoints) == 0:
                print("all keypoints are lost.")
                break
            
            # 画出 keypoints
            img_show = color.copy()
            for kp in keypoints:
                cv2.circle(img_show, tuple(map(int, kp)), 10, (0, 240, 0), 1)
            cv2.imshow("corners", img_show)
            cv2.waitKey(1)  # 等待1毫秒
        
        last_color = color

if __name__ == "__main__":
    # if len(sys.argv) != 2:
    #     print("usage: python useLK.py path_to_dataset")
    #     sys.exit(1)
    # main(sys.argv[1])
    main("/Users/bytedance/Desktop/test/slambook_python/ch8/data")


LK Flow use time: 0.0031 seconds.
tracked keypoints: 500
LK Flow use time: 0.0024 seconds.
tracked keypoints: 500
LK Flow use time: 0.0021 seconds.
tracked keypoints: 500
LK Flow use time: 0.0027 seconds.
tracked keypoints: 500
LK Flow use time: 0.0022 seconds.
tracked keypoints: 500
LK Flow use time: 0.0028 seconds.
tracked keypoints: 500
LK Flow use time: 0.0022 seconds.
tracked keypoints: 500
LK Flow use time: 0.0022 seconds.
tracked keypoints: 500


[ WARN:0@30.831] global loadsave.cpp:241 findDecoder imread_('/Users/bytedance/Desktop/test/slambook_python/ch8/data/rgb/1305031453.659600.png'): can't open/read file: check file path/integrity
[ WARN:0@30.831] global loadsave.cpp:241 findDecoder imread_('/Users/bytedance/Desktop/test/slambook_python/ch8/data/depth/1305031453.673185.png'): can't open/read file: check file path/integrity
[ WARN:0@30.832] global loadsave.cpp:241 findDecoder imread_('/Users/bytedance/Desktop/test/slambook_python/ch8/data/rgb/1305031453.691678.png'): can't open/read file: check file path/integrity
[ WARN:0@30.832] global loadsave.cpp:241 findDecoder imread_('/Users/bytedance/Desktop/test/slambook_python/ch8/data/depth/1305031453.705487.png'): can't open/read file: check file path/integrity
[ WARN:0@30.832] global loadsave.cpp:241 findDecoder imread_('/Users/bytedance/Desktop/test/slambook_python/ch8/data/rgb/1305031453.727652.png'): can't open/read file: check file path/integrity
[ WARN:0@30.832] global lo

: 

In [None]:
# 项目里面的数据不太够额 ，下面直接法

# 直接法

In [None]:
import numpy as np
import cv2
import sys
import time
from scipy.optimize import least_squares

class Measurement:
    def __init__(self, pos_world, grayscale):
        self.pos_world = pos_world
        self.grayscale = grayscale

def project2Dto3D(x, y, d, fx, fy, cx, cy, scale):
    zz = float(d) / scale
    xx = zz * (x - cx) / fx
    yy = zz * (y - cy) / fy
    return np.array([xx, yy, zz])

def project3Dto2D(x, y, z, fx, fy, cx, cy):
    u = fx * x / z + cx
    v = fy * y / z + cy
    return np.array([u, v])

def getPixelValue(img, x, y):
    if x < 0 or x >= img.shape[1] - 1 or y < 0 or y >= img.shape[0] - 1:
        return 0
    xx = x - int(x)
    yy = y - int(y)
    return (1 - xx) * (1 - yy) * img[int(y), int(x)] + \
           xx * (1 - yy) * img[int(y), int(x) + 1] + \
           (1 - xx) * yy * img[int(y) + 1, int(x)] + \
           xx * yy * img[int(y) + 1, int(x) + 1]

def poseEstimationDirect(measurements, gray, K):
    def cost_func(pose):
        T = np.eye(4)
        T[:3, :3] = cv2.Rodrigues(pose[:3])[0]
        T[:3, 3] = pose[3:]
        
        residuals = []
        for m in measurements:
            p = np.dot(T, np.append(m.pos_world, 1))[:3]
            pixel = project3Dto2D(p[0], p[1], p[2], K[0, 0], K[1, 1], K[0, 2], K[1, 2])
            if 0 <= pixel[0] < gray.shape[1] and 0 <= pixel[1] < gray.shape[0]:
                error = getPixelValue(gray, pixel[0], pixel[1]) - m.grayscale
                residuals.append(error)
        
        return residuals

    pose_init = np.zeros(6)
    result = least_squares(cost_func, pose_init)
    
    T = np.eye(4)
    T[:3, :3] = cv2.Rodrigues(result.x[:3])[0]
    T[:3, 3] = result.x[3:]
    
    return T

def main(path_to_dataset):
    associate_file = path_to_dataset + "/associate.txt"
    
    try:
        with open(associate_file, 'r') as f:
            lines = f.readlines()
    except IOError:
        print("Cannot find associate.txt!")
        return

    # 相机内参
    cx, cy = 325.5, 253.5
    fx, fy = 518.0, 519.0
    depth_scale = 1000.0
    K = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]])

    measurements = []
    for index, line in enumerate(lines[:10]):  # 只处理前10帧
        time_rgb, rgb_file, time_depth, depth_file = line.strip().split()
        color = cv2.imread(path_to_dataset + "/" + rgb_file)
        depth = cv2.imread(path_to_dataset + "/" + depth_file, -1)
        
        if color is None or depth is None:
            continue
        
        gray = cv2.cvtColor(color, cv2.COLOR_BGR2GRAY)
        
        if index == 0:
            # 对第一帧提取FAST特征点
            kps = cv2.FastFeatureDetector_create().detect(color)
            for kp in kps:
                if kp.pt[0] < 20 or kp.pt[0] > color.shape[1] - 20 or \
                   kp.pt[1] < 20 or kp.pt[1] > color.shape[0] - 20:
                    continue
                
                d = depth[int(kp.pt[1]), int(kp.pt[0])]
                if d == 0:
                    continue
                
                p3d = project2Dto3D(kp.pt[0], kp.pt[1], d, fx, fy, cx, cy, depth_scale)
                grayscale = gray[int(kp.pt[1]), int(kp.pt[0])]
                measurements.append(Measurement(p3d, grayscale))
            
            prev_color = color
            continue
        
        # 使用直接法计算相机运动
        start_time = time.time()
        T = poseEstimationDirect(measurements, gray, K)
        end_time = time.time()
        print(f"Direct method costs time: {end_time - start_time:.4f} seconds.")
        print(f"T=\n{T}")

        # 画出 keypoints
        img_show = np.vstack([prev_color, color])
        for m in measurements:
            if np.random.rand() > 0.2:
                continue
            p = np.dot(T, np.append(m.pos_world, 1))[:3]
            pixel_prev = project3Dto2D(m.pos_world[0], m.pos_world[1], m.pos_world[2], fx, fy, cx, cy)
            pixel_now = project3Dto2D(p[0], p[1], p[2], fx, fy, cx, cy)
            
            if 0 <= pixel_now[0] < color.shape[1] and 0 <= pixel_now[1] < color.shape[0]:
                cv2.circle(img_show, tuple(map(int, pixel_prev)), 8, (0, 255, 0), 2)
                cv2.circle(img_show, tuple(map(int, (pixel_now[0], pixel_now[1] + color.shape[0]))), 8, (0, 255, 0), 2)
                cv2.line(img_show, tuple(map(int, pixel_prev)), 
                         tuple(map(int, (pixel_now[0], pixel_now[1] + color.shape[0]))), (0, 255, 0), 1)
        
        cv2.imshow("result", img_show)
        cv2.waitKey(0)
        
        prev_color = color

if __name__ == "__main__":
    # if len(sys.argv) != 2:
    #     print("usage: python direct_sparse.py path_to_dataset")
    #     sys.exit(1)
    # main(sys.argv[1])
    main("/Users/bytedance/Desktop/test/slambook_python/ch8/data")


Direct method costs time: 1.6195 seconds.
T=
[[ 0.99917199  0.01394554  0.03822108 -0.25496688]
 [-0.01522319  0.99932802  0.03334316 -0.19760591]
 [-0.03773041 -0.0338974   0.99871286 -0.08872468]
 [ 0.          0.          0.          1.        ]]


2025-02-12 14:01:29.672 Python[86086:694775] +[IMKClient subclass]: chose IMKClient_Modern
2025-02-12 14:01:29.672 Python[86086:694775] +[IMKInputSession subclass]: chose IMKInputSession_Modern


In [None]:
import numpy as np
import cv2
import sys
from scipy.optimize import least_squares

class Measurement:
    def __init__(self, pos_world, grayscale):
        self.pos_world = pos_world
        self.grayscale = grayscale

def project2Dto3D(x, y, d, fx, fy, cx, cy, scale):
    zz = float(d) / scale
    xx = zz * (x - cx) / fx
    yy = zz * (y - cy) / fy
    return np.array([xx, yy, zz])

def project3Dto2D(x, y, z, fx, fy, cx, cy):
    u = fx * x / z + cx
    v = fy * y / z + cy
    return np.array([u, v])

def getPixelValue(img, x, y):
    if x < 0 or x >= img.shape[1] - 1 or y < 0 or y >= img.shape[0] - 1:
        return 0
    xx = x - int(x)
    yy = y - int(y)
    return (1 - xx) * (1 - yy) * img[int(y), int(x)] + \
           xx * (1 - yy) * img[int(y), int(x) + 1] + \
           (1 - xx) * yy * img[int(y) + 1, int(x)] + \
           xx * yy * img[int(y) + 1, int(x) + 1]

def poseEstimationDirect(measurements, gray, K):
    def cost_func(pose):
        T = np.eye(4)
        T[:3, :3] = cv2.Rodrigues(pose[:3])[0]
        T[:3, 3] = pose[3:]
        error = []
        for m in measurements:
            p = np.dot(T, np.append(m.pos_world, 1))[:3]
            if p[2] < 0:
                error.append(0)
                continue
            pixel = project3Dto2D(p[0], p[1], p[2], K[0, 0], K[1, 1], K[0, 2], K[1, 2])
            if pixel[0] < 0 or pixel[0] >= gray.shape[1] - 1 or pixel[1] < 0 or pixel[1] >= gray.shape[0] - 1:
                error.append(0)
                continue
            error.append(getPixelValue(gray, pixel[0], pixel[1]) - m.grayscale)
        return error

    pose_init = np.zeros(6)
    result = least_squares(cost_func, pose_init)
    T = np.eye(4)
    T[:3, :3] = cv2.Rodrigues(result.x[:3])[0]
    T[:3, 3] = result.x[3:]
    return T

def main(path_to_dataset):
    associate_file = path_to_dataset + "/associate.txt"
    
    try:
        with open(associate_file, 'r') as f:
            lines = f.readlines()
    except IOError:
        print("Cannot find associate.txt!")
        return

    # 相机内参
    cx, cy = 325.5, 253.5
    fx, fy = 518.0, 519.0
    depth_scale = 1000.0
    K = np.array([[fx, 0, cx], [0, fy, cy], [0, 0, 1]])

    measurements = []
    for index, line in enumerate(lines[:10]):  # 只处理前10帧
        time_rgb, rgb_file, time_depth, depth_file = line.strip().split()
        color = cv2.imread(path_to_dataset + "/" + rgb_file)
        depth = cv2.imread(path_to_dataset + "/" + depth_file, -1)
        
        if color is None or depth is None:
            continue
        
        gray = cv2.cvtColor(color, cv2.COLOR_BGR2GRAY)
        
        if index == 0:
            for x in range(10, gray.shape[1] - 10):
                for y in range(10, gray.shape[0] - 10):
                    dx = int(gray[y, x + 1]) - int(gray[y, x - 1])
                    dy = int(gray[y + 1, x]) - int(gray[y - 1, x])
                    if dx * dx + dy * dy < 50 * 50:
                        continue
                    d = depth[y, x]
                    if d == 0:
                        continue
                    p3d = project2Dto3D(x, y, d, fx, fy, cx, cy, depth_scale)
                    grayscale = float(gray[y, x])
                    measurements.append(Measurement(p3d, grayscale))
            prev_color = color.copy()
            print(f"add total {len(measurements)} measurements.")
            continue
        
        # 使用直接法计算相机运动
        T = poseEstimationDirect(measurements, gray, K)
        print(f"Pose = \n{T}")

        # 画出 keypoints
        img_show = np.vstack([prev_color, color])
        for m in measurements:
            if np.random.rand() > 0.2:
                continue
            p = m.pos_world
            pixel_prev = project3Dto2D(p[0], p[1], p[2], fx, fy, cx, cy)
            p2 = np.dot(T, np.append(p, 1))[:3]
            pixel_now = project3Dto2D(p2[0], p2[1], p2[2], fx, fy, cx, cy)
            if pixel_now[0] < 0 or pixel_now[0] >= color.shape[1] or pixel_now[1] < 0 or pixel_now[1] >= color.shape[0]:
                continue
            cv2.circle(img_show, (int(pixel_prev[0]), int(pixel_prev[1])), 2, (0, 250, 0), 2)
            cv2.circle(img_show, (int(pixel_now[0]), int(pixel_now[1] + color.shape[0])), 2, (0, 250, 0), 2)
            cv2.line(img_show, (int(pixel_prev[0]), int(pixel_prev[1])), 
                     (int(pixel_now[0]), int(pixel_now[1] + color.shape[0])), (0, 250, 0), 1)
        cv2.imshow("result", img_show)
        cv2.waitKey(1)

        prev_color = color

if __name__ == "__main__":
    # if len(sys.argv) != 2:
    #     print("usage: python direct_semidense.py path_to_dataset")
    #     sys.exit(1)
    # main(sys.argv[1])
    main("/Users/bytedance/Desktop/test/slambook_python/ch8/data")


add total 12556 measurements.
Pose = 
[[ 0.99963053  0.02067116 -0.01764972  0.0146247 ]
 [-0.02075666  0.99977363 -0.00467501 -0.00388002]
 [ 0.01754909  0.00503963  0.9998333   0.04754951]
 [ 0.          0.          0.          1.        ]]
