In [None]:
# ===============================================
# Video Stutter Detection Notebook
# このノートブックは動画のカクつき（stutter）を検出・解析するための最終段階のコード
# 実行環境（カーネル）はTestProg_video（Python 3.8.19）で行ってください
#
# することリスト（仮）
# 動作確認
# 5分以上経過での停止でプレビューが消えない。所望の動きをしてない。
# コード全体の動きを再確認して問題がないか確認。上の動作確認と一緒にする。
# 動作確認が終わったら、バージョンアップして動作等を改善する。
# ===============================================

In [None]:
# ==========================================================
# 必要なライブラリのインポート（重複除去・カテゴリ別整理）
# ==========================================================

# --- 画像処理・数値計算 ---
import cv2                   # OpenCV: 動画読み込み・画像処理用
import numpy as np            # NumPy: 数値計算・配列処理用

# --- 可視化 ---
import matplotlib.pyplot as plt  # グラフ描画・可視化用

# --- GUI ---
import tkinter as tk             # GUI用
from tkinter import ttk          # GUI用（拡張ウィジェット）

# --- ファイル・システム操作 ---
import os                        # ファイルパス操作・環境操作
import shutil                    # ファイルコピー・削除

# --- 並行処理 ---
import threading                 # マルチスレッド処理
import queue                     # スレッド間通信・キュー処理

# --- その他ユーティリティ ---
import time                      # 時間計測・待機
import gc                        # ガーベジコレクション操作
import re                        # 正規表現処理
import sys                       # システム関連（パス・終了処理など）

In [44]:
# -----------------------------------------------
# list_video_devices 関数 (利用可能ビデオキャプチャデバイス一覧取得関数)
# -----------------------------------------------
"""
概要:
    接続されているビデオキャプチャデバイス（Webカメラなど）を探索し、
    利用可能なデバイス番号と識別名のリストを返す関数。
    この関数を使用することで、ユーザーが利用可能なカメラデバイスを選択可能になる。
    
引数:
    max_devices (int, optional): チェックする最大デバイス番号。デフォルトは10。
    
戻り値:
    devices (list of tuples): 利用可能なデバイスのリスト。
        例: [(0, 'Camera 0'), (1, 'Camera 1'), ...]
        
注意:
    - デバイスマネージャーのカメラでTriforaなど特定のドライバを認識している場合、正常に認識できないことがあります。
    - cv2.VideoCapture() は番号順にデバイスを開くため、接続順序によって番号が変わることがあります。
"""
def list_video_devices(max_devices=10, timeout=1.0):
    devices = []

    def try_open_device(idx, result_dict):
        try:
            cap = cv2.VideoCapture(idx, cv2.CAP_DSHOW)
            if cap.isOpened():
                result_dict["opened"] = True
            cap.release()
        except Exception as e:
            print(f"デバイス {idx} は無視されました: {e}")
            result_dict["opened"] = False

    for idx in range(max_devices):
        result = {"opened": False}
        t = threading.Thread(target=try_open_device, args=(idx, result))
        t.start()
        t.join(timeout=timeout)  # タイムアウト待ち
        if t.is_alive():
            print(f"デバイス {idx} はタイムアウト ({timeout} 秒) によりスキップされました")
        elif result["opened"]:
            devices.append((idx, f"Camera {idx}"))

    return devices


# 実行例
if __name__ == "__main__":
    print("利用可能なビデオキャプチャデバイス:")
    devices = list_video_devices(max_devices=10, timeout=1.0)
    for idx, name in devices:
        print(f"デバイス番号: {idx}, 名前: {name}")


利用可能なビデオキャプチャデバイス:
デバイス番号: 0, 名前: Camera 0
デバイス番号: 1, 名前: Camera 1


In [45]:
# -----------------------------------------------
# launch_gui 関数 (ビデオデバイス選択GUI関数)
# -----------------------------------------------
"""
概要:
    ユーザーが接続されているビデオキャプチャデバイスの中から1つを選択できるGUIを表示し、
    選択されたデバイス番号を返す関数。

引数:
    devices (list of tuples): list_video_devices などで取得した利用可能なデバイスのリスト
        例: [(0, 'Camera 0'), (1, 'Camera 1'), ...]

戻り値:
    int or None: 選択されたデバイス番号
        - ユーザーが選択して「開始」を押した場合は選択番号を返す
        - GUIを閉じるなど何も選ばなかった場合は None を返す

注意:
    - combo.current(0) により、デフォルトで最初のデバイスが選択される
    - GUIを閉じただけで何も選ばなかった場合は None になる
"""
def launch_gui(devices):
    selected_device = {"idx": None}  # 辞書で共有変数

    root = tk.Tk()
    root.title("デバイス選択")
    tk.Label(root, text="使用するデバイスを選択:").pack(pady=5)

    display_names = [f"{name} (番号: {idx})" for idx, name in devices]
    device_var = tk.StringVar()
    combo = ttk.Combobox(root, textvariable=device_var, values=display_names, state="readonly")
    combo.current(0)
    combo.pack(pady=5)

    def on_start():
        selected_device["idx"] = devices[combo.current()][0]
        root.destroy()

    tk.Button(root, text="開始", command=on_start).pack(pady=10)
    root.mainloop()

    return selected_device["idx"]

In [46]:
# ====================================================
# 🔒 安全で完全なフォルダ削除関数（Windows対応・再試行付き）
# ====================================================
def safe_rmtree(folder_path: str, retry=3, wait=0.5):
    if not os.path.exists(folder_path):
        print(f"🗑️ フォルダ不存在: {folder_path}")
        return

    def get_all_entries(path):
        entries = []
        for root, dirs, files in os.walk(path, topdown=False):
            entries.extend([os.path.join(root, f) for f in files])
            entries.extend([os.path.join(root, d) for d in dirs])
        return entries

    for attempt in range(retry):
        try:
            all_entries = get_all_entries(folder_path)
            total = len(all_entries)
            if total == 0:
                os.rmdir(folder_path)
                print(f"🗑️ 完全削除済み: {folder_path}")
                return

            for i, entry in enumerate(all_entries, start=1):
                try:
                    if os.path.isfile(entry) or os.path.islink(entry):
                        os.remove(entry)
                    elif os.path.isdir(entry):
                        os.rmdir(entry)
                except Exception as e:
                    print(f"⚠ 削除失敗: {entry} → {e}")

                # 進捗表示
                pct = int(i / total * 100)
                bar_len = 30
                filled = int(bar_len * pct / 100)
                sys.stdout.write(f"\r削除中: [{'#'*filled}{'-'*(bar_len-filled)}] {pct}%")
                sys.stdout.flush()

            # 最後にフォルダ自体を削除
            try:
                os.rmdir(folder_path)
            except Exception as e:
                print(f"\n⚠ フォルダ自体の削除失敗: {e}")
            else:
                print(f"\n🗑️ 完全削除済み: {folder_path}")
            break

        except Exception as e:
            print(f"\n⚠ 削除中エラー: {e} → {wait}秒後に再試行 ({attempt+1}/{retry})")
            time.sleep(wait)
    else:
        print(f"❌ {folder_path} の削除に失敗しました。")

In [47]:
# ====================================================
# カクつき検知ワーカー（最初のフレームのみ保存）
# ====================================================
def stutter_worker_first_only(frame_queue, output_folder, stop_flag,
                              fps=60, threshold=100, min_time_diff=0.1, stop_no_diff_sec=300):
    min_frame_diff = int(min_time_diff * fps)
    prev_gray = None
    temp_stutter_indices = []
    stutter_frames = []
    last_diff_time = time.time()

    os.makedirs(output_folder, exist_ok=True)

    try:
        while not stop_flag.is_set():
            try:
                item = frame_queue.get(timeout=1)
            except queue.Empty:
                continue

            if item is None:
                frame_queue.task_done()
                break

            idx, frame = item
            gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)

            if prev_gray is not None:
                diff = cv2.absdiff(gray, prev_gray)
                non_zero_count = np.count_nonzero(diff)

                if non_zero_count <= threshold:
                    temp_stutter_indices.append((idx, frame))
                    if len(temp_stutter_indices) > min_frame_diff * 3:
                        temp_stutter_indices.pop(0)
                else:
                    if len(temp_stutter_indices) >= min_frame_diff:
                        first_idx, first_frame = temp_stutter_indices[0]
                        stutter_frames.append(first_idx)
                        cv2.imwrite(os.path.join(output_folder, f"stutter_{first_idx:05d}.png"), first_frame)
                    temp_stutter_indices = []
                    last_diff_time = time.time()

            prev_gray = gray

            # 差分無しが指定秒続いた場合 → 停止
            if time.time() - last_diff_time > stop_no_diff_sec:
                print("⚠ 5分以上変化なし：キャプチャ停止（ワーカー）")
                stop_flag.set()
                cv2.destroyAllWindows()
                break

            frame_queue.task_done()
            del frame, gray

            if len(temp_stutter_indices) > 0 and (time.time() - last_diff_time) > 5:
                temp_stutter_indices = temp_stutter_indices[-min_frame_diff:]
                gc.collect()

    finally:
        if len(temp_stutter_indices) >= min_frame_diff:
            first_idx, first_frame = temp_stutter_indices[0]
            stutter_frames.append(first_idx)
            cv2.imwrite(os.path.join(output_folder, f"stutter_{first_idx:05d}.png"), first_frame)

        print(f"✅ カクつき検出終了: {len(stutter_frames)} フレーム保存完了")
        gc.collect()

In [48]:
# ====================================================
# 自動閾値による全フレームカクつき検出
# ====================================================
def detect_stutter_auto_threshold(temp_folder, fps=60, min_time_diff=0.1, output_folder=None, threshold_method='std', k=2.0):
    if output_folder is None:
        desktop = os.path.join(os.path.expanduser("~"), "Desktop")
        output_folder = os.path.join(desktop, "stutter_frames")

    safe_rmtree(output_folder)
    os.makedirs(output_folder, exist_ok=True)
    print(f"{output_folder} を空の状態で作成しました。")

    frame_files = sorted([f for f in os.listdir(temp_folder) if f.endswith('.png')])
    if not frame_files:
        print("フレームが見つかりません。")
        return []

    stutter_frames = []
    temp_stutter_indices = []
    min_frame_diff = int(min_time_diff * fps)

    prev_frame = cv2.imread(os.path.join(temp_folder, frame_files[0]), cv2.IMREAD_GRAYSCALE)
    diff_values = []

    for fname in frame_files[1:]:
        curr_frame = cv2.imread(os.path.join(temp_folder, fname), cv2.IMREAD_GRAYSCALE)
        diff_values.append(np.count_nonzero(cv2.absdiff(curr_frame, prev_frame)))
        prev_frame = curr_frame
        del curr_frame

    diff_array = np.array(diff_values)
    if threshold_method == 'std':
        threshold = np.mean(diff_array) - k * np.std(diff_array)
    elif threshold_method == 'median':
        threshold = np.median(diff_array) - k * np.std(diff_array)
    else:
        raise ValueError("threshold_method は 'std' か 'median' を指定してください。")
    threshold = max(threshold, 0)
    print(f"自動計算された差分しきい値: {threshold:.2f}")

    prev_frame = cv2.imread(os.path.join(temp_folder, frame_files[0]), cv2.IMREAD_GRAYSCALE)
    for i, fname in enumerate(frame_files[1:], start=1):
        curr_frame = cv2.imread(os.path.join(temp_folder, fname), cv2.IMREAD_GRAYSCALE)
        non_zero_count = np.count_nonzero(cv2.absdiff(curr_frame, prev_frame))

        if non_zero_count <= threshold:
            temp_stutter_indices.append(i)
        else:
            if len(temp_stutter_indices) >= min_frame_diff:
                stutter_frames.extend(temp_stutter_indices)
                for idx in temp_stutter_indices:
                    save_indices = [max(idx-1,0), idx, min(idx+1,len(frame_files)-1)]
                    for sidx in save_indices:
                        src = os.path.join(temp_folder, frame_files[sidx])
                        dst = os.path.join(output_folder, f"stutter_{frame_files[sidx]}")
                        shutil.copy(src, dst)
            temp_stutter_indices = []

        prev_frame = curr_frame
        del curr_frame

    if len(temp_stutter_indices) >= min_frame_diff:
        stutter_frames.extend(temp_stutter_indices)
        for idx in temp_stutter_indices:
            save_indices = [max(idx-1,0), idx, min(idx+1,len(frame_files)-1)]
            for sidx in save_indices:
                src = os.path.join(temp_folder, frame_files[sidx])
                dst = os.path.join(output_folder, f"stutter_{frame_files[sidx]}")
                shutil.copy(src, dst)

    print(f"{len(stutter_frames)} フレームがカクつきとして検出され、前後も含めて {output_folder} に保存されました。")
    return stutter_frames

In [49]:
# ====================================================
# キャプチャ＆逐次カクつき検知（完全版）
# ====================================================
def start_capture_and_detect(device_number=0, width=720, height=480,
                             capture_fps=60, display_fps=10, threshold=100,
                             min_time_diff=0.1, max_temp_frames=36000, stop_no_diff_sec=300,
                             threshold_method='std', k=2.0):

    desktop = os.path.join(os.path.expanduser("~"), "Desktop")
    temp_folder = os.path.join(desktop, "temp_frames")
    output_folder = os.path.join(desktop, "stutter_frames")

    for folder in (temp_folder, output_folder):
        safe_rmtree(folder)
        os.makedirs(folder, exist_ok=True)
    print(f"📁 {temp_folder} と {output_folder} を作成しました。")

    cap = cv2.VideoCapture(device_number, cv2.CAP_DSHOW)
    if not cap.isOpened():
        print("❌ Error: キャプチャデバイスを開けませんでした。")
        return 0, output_folder, 0

    cap.set(cv2.CAP_PROP_FRAME_WIDTH, width)
    cap.set(cv2.CAP_PROP_FRAME_HEIGHT, height)
    cap.set(cv2.CAP_PROP_FPS, capture_fps)

    stop_flag = threading.Event()
    frame_queue = queue.Queue(maxsize=5)

    worker = threading.Thread(
        target=stutter_worker_first_only,
        args=(frame_queue, output_folder, stop_flag, capture_fps, threshold, min_time_diff, stop_no_diff_sec),
        daemon=True
    )
    worker.start()

    frame_count = 0
    display_interval = 1.0 / display_fps
    prev_display_time = time.time()
    start_time = time.time()

    print("🎥 映像キャプチャ開始。'q'キーで終了できます。")

    try:
        while cap.isOpened():
            if stop_flag.is_set():
                print("🛑 停止フラグ検知：メインループ終了")
                break

            ret, frame = cap.read()
            if not ret:
                print("⚠ フレーム取得失敗、停止します。")
                break

            frame_file = os.path.join(temp_folder, f"frame_{frame_count:05d}.png")
            cv2.imwrite(frame_file, frame)

            try:
                frame_queue.put_nowait((frame_count, frame.copy()))
            except queue.Full:
                pass

            curr_time = time.time()
            if curr_time - prev_display_time >= display_interval:
                fps_display = 1.0 / (curr_time - prev_display_time)
                prev_display_time = curr_time
                display_frame = frame.copy()
                cv2.putText(display_frame, f"Display FPS: {fps_display:.2f}",
                            (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 0, 255), 2)
                cv2.imshow("Preview (FPS limited)", display_frame)
                del display_frame

            frame_count += 1

            if frame_count % max_temp_frames == 0:
                safe_rmtree(temp_folder)
                os.makedirs(temp_folder, exist_ok=True)
                print(f"🧹 一時フォルダをクリアしました（{max_temp_frames}フレーム到達）")

            if cv2.waitKey(1) & 0xFF == ord('q'):
                print("🧍 ユーザーによる終了操作")
                stop_flag.set()
                break

            if frame_count % 300 == 0:
                gc.collect()

    finally:
        stop_flag.set()
        frame_queue.put(None)
        frame_queue.join()
        worker.join(timeout=3)
        cap.release()
        cv2.destroyAllWindows()
        gc.collect()
        total_time = time.time() - start_time
        actual_fps = frame_count / total_time if total_time > 0 else 0
        print(f"✅ 完全終了: {frame_count} フレーム（実測FPS: {actual_fps:.2f}）")

    # キャプチャ終了後、自動閾値で全フレーム解析
    stutter_frames = detect_stutter_auto_threshold(temp_folder, fps=capture_fps, min_time_diff=min_time_diff,
                                                   threshold_method=threshold_method, k=k)

    return frame_count, output_folder, actual_fps, stutter_frames

In [50]:
# -----------------------------------------------
# 可視化関数（時間軸と差分値で表示）
# -----------------------------------------------
"""
frames: 読み込んだ動画フレームのリスト
stutter_frames: カクつきと検出されたフレーム番号のリスト
fps: フレームレート（デフォルト30fps）

横軸: 時間（秒）
縦軸: フレーム間差分の値
"""
def plot_stutter(frames, stutter_frames, fps=60):
    if not frames or not stutter_frames:
        print("フレームまたはカクつきフレームが存在しないため、可視化はスキップします。")
        return

    # 前フレームとの差分を計算して数値化
    diff_values = [0]  # 1フレーム目は比較できないので0
    for i in range(1, len(frames)):
        diff = cv2.absdiff(frames[i], frames[i-1])
        diff_gray = cv2.cvtColor(diff, cv2.COLOR_BGR2GRAY)
        diff_values.append(np.count_nonzero(diff_gray))

    # 横軸を時間に変換
    times = np.arange(len(diff_values)) / fps

    plt.figure(figsize=(12, 5))
    plt.plot(times, diff_values, label="Frame difference")
    
    # カクつきフレームを赤で表示
    stutter_times = np.array(stutter_frames) / fps
    stutter_diff = [diff_values[i] for i in stutter_frames]
    plt.scatter(stutter_times, stutter_diff, color='red', label="Stutter frames")

    plt.title('Video Stutter Detection')
    plt.xlabel('Time (s)')
    plt.ylabel('Frame Difference (Non-zero pixels)')
    plt.legend()
    plt.show()

In [52]:
# -----------------------------------------------
# 実行例
# -----------------------------------------------

# Python では、この書き方をすると「このファイルを直接実行したときだけ、以下の処理を動かす」という意味
if __name__ == "__main__":  
    # list_video_devices 関数を呼んで、PCに接続されているカメラを探して、そのリストを devices に入れる
    devices = list_video_devices()
    if not devices:
        print("利用可能なカメラが見つかりませんでした")
    else:
        # launch_gui 関数を呼んで、どのカメラを使うか選ばせる　戻り値は選ばれたカメラのデバイス番号
        selected_device = launch_gui(devices)
        if selected_device is not None:
            print(f"選択されたデバイス番号: {selected_device}")
        else:
            print("カメラが選択されませんでした（GUIを閉じました）")

# 「カメラが選ばれたときだけキャプチャを開始する」 という安全策
    if selected_device is not None: 
        frame_count, output_folder, actual_fps, stutter_frames = start_capture_and_detect(
            device_number=0,
            capture_fps=60,
            display_fps=10,
            min_time_diff=0.1,
            stop_no_diff_sec=300,
            threshold_method='std',
            k=2.0
        )

        # 保存済み PNG からフレームを読み込む
        temp_folder = os.path.join(os.path.expanduser("~"), "Desktop", "temp_frames")
        frame_files = sorted([f for f in os.listdir(temp_folder) if f.endswith('.png')])
        frames = [cv2.imread(os.path.join(temp_folder, f)) for f in frame_files]

        # stutter_frames は output_folder 内の PNG からフレーム番号を取得
        output_folder = os.path.join(os.path.expanduser("~"), "Desktop", "stutter_frames")
        stutter_frames = []
        for f in os.listdir(output_folder):
            if f.startswith('stutter_') and f.endswith('.png'):
                # ファイル名から最後の数値部分を抽出
                match = re.search(r'(\d+)\.png$', f)
                if match:
                    stutter_frames.append(int(match.group(1)))

        stutter_frames = sorted(stutter_frames)

        # 可視化
        plot_stutter(frames, stutter_frames, fps=60)

選択されたデバイス番号: 0
削除中: [##############################] 100%
🗑️ 完全削除済み: C:\Users\HP_PC\Desktop\temp_frames
削除中: [##############################] 100%
🗑️ 完全削除済み: C:\Users\HP_PC\Desktop\stutter_frames
📁 C:\Users\HP_PC\Desktop\temp_frames と C:\Users\HP_PC\Desktop\stutter_frames を作成しました。
🎥 映像キャプチャ開始。'q'キーで終了できます。
⚠ 5分以上変化なし：キャプチャ停止（ワーカー）


: 