In [18]:
# !pip install pytesseract
# !pip install opencv-python
# !sudo apt-get install tesseract-ocr -y
# !pip install pytesseract opencv-python-headless

In [19]:
import cv2
import pytesseract

import cv2
import numpy as np


import matplotlib.pyplot as plt
import numpy as np
from datetime import datetime
import os

# 设置 Tesseract OCR 的路径
pytesseract.pytesseract.tesseract_cmd = "/usr/bin/tesseract"

def left_or_right_from_mag_ang(mag, ang, threshold=100000):
    """
    判断光流的总体运动方向：向右、向左或静止。
    :param mag: 光流的幅值矩阵。
    :param ang: 光流的角度矩阵（弧度）。
    :param threshold: 判断运动的阈值，低于该值视为静止。
    :return: total_motion 和 'right', 'left', 'static'
    """
    # 计算水平运动分量（正值为右，负值为左）
    horizontal_motion = mag * np.cos(ang)  # 计算水平方向的光流分量
    total_motion = np.sum(horizontal_motion)  # 计算所有像素的水平运动总和

    # 判断运动方向
    if abs(total_motion) < threshold:  # 运动量低于阈值，视为静止
        return total_motion, 'static'
    elif total_motion > 0:  # 水平分量为正，向右运动
        return total_motion, 'right'
    else:  # 水平分量为负，向左运动
        return total_motion, 'left'

In [20]:
#对每一帧picture，判断时间信息

def convert_to_24hr_format(timestamp):
    """将时间戳转换为24小时制格式"""
    try:
        # 假设时间戳是类似 "12:34:56 PM" 或 "01:23:45 AM" 格式
        dt = datetime.strptime(timestamp, "%I:%M:%S")  # 12小时制格式
        hr = dt.hour
        # print(hr)
        if hr < 8:
            # print(hr)
            hr += 12
            dt = dt.replace(hour=hr)

        return dt.strftime("%H:%M:%S")  # 转换为24小时制

    except ValueError:
        return timestamp  # 如果转换失败，直接返回原始时间戳

def timestamp_difference_is_greater_than_one_second(prev_timestamp, curr_timestamp):
    """检查两个时间戳之间的差异是否大于 1 秒"""
    try:
        # 将时间戳转换为 datetime 对象
        prev_time = datetime.strptime(prev_timestamp, "%H:%M:%S")
        curr_time = datetime.strptime(curr_timestamp, "%H:%M:%S")

        # 计算时间差，单位为秒
        time_diff = (curr_time - prev_time).total_seconds()

        # 如果差值大于 1 秒，打印错误信息
        if time_diff > 1:
            print(f"Error: Timestamp difference exceeds 1 second. Prev: {prev_timestamp}, Curr: {curr_timestamp}")
            return True  # 返回 True 表示有问题
        return False  # 差值不超过 1 秒
    except ValueError:
        return False  # 如果时间戳格式不正确，则返回 False


def get_real_time(frame):
    crop_size=(50, 430)
    top_left_crop = frame[0:crop_size[0], 230:crop_size[1]]

    # 转为灰度图像
    gray = cv2.cvtColor(top_left_crop, cv2.COLOR_BGR2GRAY)

    # 二值化处理
    _, thresh = cv2.threshold(gray, 241, 255, cv2.THRESH_BINARY)

    # 可选：反转颜色，如果背景是黑色，文字是白色
    thresh = cv2.bitwise_not(thresh)

    # OCR 提取时间戳
    custom_config = r'--oem 3 --psm 7 -c tessedit_char_whitelist=0123456789:.'  # 只允许数字和冒号
    timestamp = pytesseract.image_to_string(thresh, config=custom_config).strip()

    if timestamp:  # 如果成功提取到内容
        timestamp_24hr = convert_to_24hr_format(timestamp)  # 转换为24小时制格式
    else:
            # 如果没有提取到时间戳，输出错误信息
        print("Error: No timestamp extracted ")
    return timestamp_24hr

In [21]:
def generate_optical_flow_horizontal(video_path, frame_interval=1):
    """
    生成稠密光流（仅保留左右运动），判断左右运动方向，并保存为图像文件。
    :param video_path: 视频文件路径
    :param frame_interval: 每隔多少帧生成一张光流图，默认值为 1
    """
    count = 0
    flow_visuals = []  # 用于存储每帧光流的结果
    motion_segments = []  # 用于存储每段连续的运动方向
    current_segment = None

    # 创建输出目录，如果不存在则创建
    # if not os.path.exists(output_dir):
    #     os.makedirs(output_dir)

    # 打开视频文件
    cap = cv2.VideoCapture(video_path)
    ret, prev_frame = cap.read()

    # 如果视频无法打开，打印错误信息并退出
    if not ret:
        print("Error: Cannot read video.")
        return

    # 将第一帧转换为灰度图，作为光流计算的参考
    prev_gray = cv2.cvtColor(prev_frame, cv2.COLOR_BGR2GRAY)
    frame_idx = 0  # 当前帧索引

    while True:
        # 读取下一帧
        ret, frame = cap.read()
        if not ret:
            break  # 如果无法读取帧，说明视频结束，退出循环

        # 跳过非目标帧
        if frame_idx % frame_interval != 0:
            frame_idx += 1
            continue

        # 将当前帧转换为灰度图
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

        # 计算稠密光流
        flow = cv2.calcOpticalFlowFarneback(
            prev_gray, gray, None,
            0.5, 3, 15, 3, 5, 1.2, 0
        )

        # 提取水平光流（左右运动）并忽略垂直光流
        horizontal_flow = flow[..., 0]  # 水平方向的光流（左右运动）
        vertical_flow = flow[..., 1]   # 垂直方向的光流
        flow[..., 1] = 0  # 将垂直运动的分量设置为 0

        # Ensure horizontal_flow and vertical_flow are NumPy arrays with the same shape

        # 使用极坐标表示法，直接得到幅值（mag）和方向角度（ang）
        mag, ang = cv2.cartToPolar(horizontal_flow, vertical_flow)

        # 判断当前帧的运动方向（直接使用 mag 和 ang 计算）
        total_motion, direction = left_or_right_from_mag_ang(mag, ang)


        #计算当前帧的时间
        timestamp = get_real_time(frame)
        # print(f"Time {timestamp}: Frame {frame_idx}, Motion={direction}, Total Motion={total_motion}")

        # 可视化光流：使用 HSV 表示法
        hsv = np.zeros_like(frame)  # 创建 HSV 图像
        hsv[..., 1] = 255  # 饱和度固定为最大值
        hsv[..., 0] = ang * 180 / np.pi / 2  # 色调表示方向
        hsv[..., 2] = cv2.normalize(mag, None, 0, 255, cv2.NORM_MINMAX)  # 亮度表示幅值

        # 转换为 BGR 图像
        flow_visual = cv2.cvtColor(hsv, cv2.COLOR_HSV2BGR)
        flow_visuals.append([timestamp,direction])

        # 保存光流图像到指定目录
        # output_path = os.path.join(output_dir, f"flow_{frame_idx:04d}.png")
        # cv2.imwrite(output_path, flow_visual)

        # 更新参考帧为当前灰度帧
        prev_gray = gray.copy()
        frame_idx += 1  # 增加帧索引

    # 释放视频资源
    cap.release()
    return flow_visuals

In [22]:
from datetime import datetime
def time_diff(timestring1, timestring2):
    """
    计算两个时间字符串之间的时间差（单位：秒）。
    :param timestring1: 第一个时间字符串，格式为 HH:MM:SS。
    :param timestring2: 第二个时间字符串，格式为 HH:MM:SS。
    :return: 两个时间之间的差值（秒）。
    """
    # 定义时间格式
    time_format = "%H:%M:%S"

    # 将时间字符串解析为 datetime 对象
    time1 = datetime.strptime(timestring1, time_format)
    time2 = datetime.strptime(timestring2, time_format)

    # 计算时间差并转换为秒
    time_difference = (time2 - time1).total_seconds()
    return float(time_difference)

In [23]:
def get_motion_list(flow_visuals):
    """
    获取完整的向左/向右运动列表。
    :param flow_visuals: 包含 [timestamp, direction, flow_visual] 的列表。
    :return: motion_list，每个元素是 [direct, each_motion_list]。
    """
    # 初始化变量
    motion_list = []  # 存储最终的运动段列表
    each_motion_list = []  # 当前运动段的详细信息
    start_time = flow_visuals[0][0]  # 第一个时间戳作为参考点
    current_direction = 'static'  # 当前运动方向
    segment_start_time = 0  # 运动段开始时间
    segment_duration = 5  # 最大运动段持续时间（秒）

    # 遍历光流数据
    for timestamp, direction in flow_visuals:
        time_diff_seconds = time_diff(start_time, timestamp)  # 计算当前时间戳与起始时间的差值
        # 如果时间间隔小于 2 秒，跳过
        if time_diff_seconds < 2:
            continue
        # 如果超出当前段的持续时间，结束当前段并存储
        if time_diff_seconds - segment_start_time > segment_duration:
            if current_direction != 'static' and each_motion_list:
                motion_list.append([each_motion_list[0],each_motion_list[-1],direction])
            # 重置当前运动段
            current_direction = 'static'
            each_motion_list = []
            segment_start_time = time_diff_seconds

        # 如果当前运动段是静止状态，开始新一段运动
        if current_direction == 'static':
            current_direction = direction
            segment_start_time = time_diff_seconds
            each_motion_list = []
            each_motion_list.append(timestamp)

        # 如果当前运动方向匹配或当前方向是静止，继续记录
        if direction == current_direction or direction == 'static':
            each_motion_list.append(timestamp)
        else:
            current_direction = 'static'
            each_motion_list = []
    return motion_list

In [24]:
import json
import os
from datetime import datetime

def parse_json_time(json_time):
    """
    解析 JSON 中的时间字符串为标准时间格式。
    :param json_time: JSON 时间字段，例如 "Nov 15, 2024 13:25:49.422189000 PST"
    :return: 标准时间字符串，格式为 "HH:MM:SS"
    """
    time_with_millis = json_time.split()[3]
    return time_with_millis.split(".")[0]

def save_data_per_motion(motion_list, from_path, to_path):
    """
    根据 motion_list 的每个元素生成单独的文件，提取 JSON 数据并保存。
    :param motion_list: 包含 [start_time, end_time, direction] 的列表。
    :param from_path: 输入 JSON 文件路径。
    :param to_path: 输出 JSON 文件保存目录。
    """
    # 确保输出目录存在
    if not os.path.exists(to_path):
        os.makedirs(to_path)

    # 读取原始文件名
    base_filename = os.path.splitext(os.path.basename(from_path))[0]

    # 读取 JSON 文件
    with open(from_path, 'r', encoding='utf-8') as f:
        json_data = json.load(f)

    motion_index = 1  # 用于编号运动段

    for start_time, end_time, direction in motion_list:
        # 解析开始和结束时间为时间对象
        start_time_obj = datetime.strptime(start_time, "%H:%M:%S").time()
        end_time_obj = datetime.strptime(end_time, "%H:%M:%S").time()

        # 提取匹配的数据
        extracted_data = []
        for record in json_data:
            try:
                # 从 JSON 数据中提取时间
                json_time_str = record["_source"]["layers"]["frame"]["frame.time"]
                record_time = datetime.strptime(parse_json_time(json_time_str), "%H:%M:%S").time()

                # 检查时间是否在 start_time 和 end_time 范围内
                if start_time_obj <= record_time <= end_time_obj:
                    # 添加匹配的记录并附加运动方向信息
                    record["motion_direction"] = direction
                    extracted_data.append(record)
            except KeyError:
                # 跳过缺少关键字段的记录
                continue

        # 生成新的文件名
        output_filename = f"{base_filename}_{direction}_{motion_index}.json"
        output_file = os.path.join(to_path, output_filename)

        # 保存提取的数据到新文件
        with open(output_file, 'w', encoding='utf-8') as f:
            json.dump(extracted_data, f, indent=4)

        print(f"Extracted data saved to {output_file}")

        motion_index += 1  # 增加运动段编号


In [None]:
import os
def process_videos(video_path, from_path, to_path):
    """
    遍历视频文件目录，计算光流，提取运动方向，匹配 JSON 文件并保存结果。
    :param video_path: 视频文件目录。
    :param from_path: JSON 文件目录。
    :param to_path: 输出 JSON 文件保存目录。
    """
    # 遍历视频目录下的每个文件
    for video_file in sorted(os.listdir(video_path)):
        # 跳过非视频文件
        if not video_file.endswith('.mp4'):
            continue

        # 视频文件完整路径
        video_file_path = os.path.join(video_path, video_file)
        print(f"Processing video: {video_file_path}")

        # 生成光流并提取运动段列表
        flow_visuals = generate_optical_flow_horizontal(video_file_path)
        motion_list = get_motion_list(flow_visuals)
        print("Motion List:",motion_list)

        # 找到对应的 JSON 文件
        base_name = os.path.splitext(video_file)[0]  # 去掉扩展名
        json_file = os.path.join(from_path, f"{base_name}.json")
        print(f"Processing file: {json_file}")

        # 检查 JSON 文件是否存在
        if not os.path.exists(json_file):
            print(f"Warning: JSON file not found for {base_name}")
            continue

        # 保存提取的数据到指定目录
        save_data_per_motion(motion_list, json_file, to_path)
        print(f"Completed processing for video: {video_file_path}")


: 

In [None]:
# 示例调用
video_path = '202_video'
from_path = '202_packet_json_new'
to_path = '202_direction_json'

process_videos(video_path, from_path, to_path)

Processing video: 202_video/b1_p1_0.mp4
