In [1]:
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from pathlib import Path
import imageio as iio
import deeplabcut
import subprocess
import datetime
from tqdm import tqdm


Loading DLC 3.0.0rc6...
DLC loaded in light mode; you cannot use any GUI (labeling, relabeling and standalone GUI)


In [6]:
import subprocess
import numpy as np
import cv2
import datetime
import deeplabcut

def read_frames_chunked(filename, output_file, chunk_size=100, fps=30, pixel_format="gray16le", movie_dtype="uint16", frame_size=None, mapping="DEPTH", finfo=None, **kwargs):
    if finfo is None:
        finfo = get_video_info(filename, **kwargs)
    
    if not frame_size:
        frame_size = finfo["dims"]
    
    total_frames = finfo["nframes"]
    fourcc = cv2.VideoWriter_fourcc(*'XVID')
    video_writer = cv2.VideoWriter(str(output_file), fourcc, fps, (frame_size[0], frame_size[1]), isColor=False)
    
    for start in range(0, total_frames, chunk_size):
        end = min(start + chunk_size, total_frames)
        command = [
            "ffmpeg", "-loglevel", "fatal", "-ss", str(datetime.timedelta(seconds=start / fps)), "-i", filename,
            "-vframes", str(end - start), "-f", "image2pipe", "-s", f"{frame_size[0]}x{frame_size[1]}",
            "-pix_fmt", pixel_format, "-vcodec", "rawvideo", "-"
        ]
        
        if isinstance(mapping, str):
            mapping_dict = get_stream_names(filename)
            mapping = mapping_dict.get(mapping, 0)
        
        if filename.endswith((".mkv", ".avi")):
            command += ["-map", f"0:{mapping}", "-vsync", "0"]
        
        pipe = subprocess.Popen(command, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
        out, err = pipe.communicate()
        
        if err:
            print("Error:", err)
            continue
        
        video_chunk = np.frombuffer(out, dtype=movie_dtype).reshape((end - start, frame_size[1], frame_size[0]))
        
        # Clipping and normalization
        video_chunk = np.clip(video_chunk, 0, 1000)
        video_chunk = (video_chunk / video_chunk.max() * 255).astype(np.uint8)
        
        for frame in video_chunk:
            video_writer.write(frame)
    
    video_writer.release()

def process_video(path, chunk_size=100, dlc_config='/n/groups/datta/jlove/data/rat_seq/rat_seq_paper/keypoint_model/config-v2.yaml'):
    print('Processing video in chunks')
    output_file = path.parent / 'ir_clipped.avi'
    read_frames_chunked(path.as_posix(), output_file, chunk_size=chunk_size)
    
    videos = [output_file.as_posix()]
    deeplabcut.analyze_videos(
        dlc_config, videos, videotype='avi', shuffle=1, trainingsetindex=0, gputouse=0, save_as_csv=True, destfolder=None, dynamic=(True, .5, 10)
    )
    deeplabcut.filterpredictions(config, videos)
    deeplabcut.create_labeled_video(dlc_config, videos, filtered=True, pcutoff=.3)


In [7]:
folder = Path('/n/groups/datta/jlove/data/rat_seq/rat_seq_paper/data/14weeks/arid1b/cohort15')
irs = list(folder.glob('**/ir.avi'))

In [None]:
process_video(irs[0])

Processing video in chunks
