In [2]:
import dask
import dask.dataframe as dd
import dask.array as da
import dask.delayed as delayed
import numpy as np
import numba as nb
import matplotlib.pyplot as plt
from dask.array import to_zarr, from_zarr
import cv2

In [3]:
# capture = cv2.VideoCapture('../media/test2_original.mp4')

# frame_count = int(capture.get(cv2.CAP_PROP_FRAME_COUNT))
# width = int(capture.get(cv2.CAP_PROP_FRAME_WIDTH))
# height = int(capture.get(cv2.CAP_PROP_FRAME_HEIGHT))

# frames = np.zeros((frame_count, height, width, 3), dtype=np.uint8)

# frame_index = 0
# while True:
#     ret, frame = capture.read()
#     if not ret:
#         break
#     frames[frame_index] = frame
#     frame_index += 1
# capture.release()

# # print memory size of array
# print(frames.nbytes / 1e6, 'MB')

In [4]:
# from dask.array import to_zarr

# chunk_size = (1, height, width, 3)
# df = da.from_array(frames, chunks=chunk_size)
# to_zarr(df, '../data/test2_original_frames')
# df

In [5]:
chunk_size = (1, 1080, 1920, 3)
df = from_zarr('../data/test2_original_frames')
df

Unnamed: 0,Array,Chunk
Bytes,13.90 GiB,5.93 MiB
Shape,"(2400, 1080, 1920, 3)","(1, 1080, 1920, 3)"
Dask graph,2400 chunks in 2 graph layers,2400 chunks in 2 graph layers
Data type,uint8 numpy.ndarray,uint8 numpy.ndarray
"Array Chunk Bytes 13.90 GiB 5.93 MiB Shape (2400, 1080, 1920, 3) (1, 1080, 1920, 3) Dask graph 2400 chunks in 2 graph layers Data type uint8 numpy.ndarray",2400  1  3  1920  1080,

Unnamed: 0,Array,Chunk
Bytes,13.90 GiB,5.93 MiB
Shape,"(2400, 1080, 1920, 3)","(1, 1080, 1920, 3)"
Dask graph,2400 chunks in 2 graph layers,2400 chunks in 2 graph layers
Data type,uint8 numpy.ndarray,uint8 numpy.ndarray


In [6]:
# show memory usage

# import psutil
# import os

# #print(df.nbytes / 1e6, 'MB')

# process = psutil.Process(os.getpid())
# print(process.memory_info().rss / 1e6, 'MB')

In [7]:
def convert_to_gray(frames):
    grays = []
    for frame in frames:
        gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        grays.append(gray[None])
    return np.array(grays)

gray_df = df.map_blocks(
    convert_to_gray,
    chunks=(1, 1, 1080, 1920),
    dtype=np.uint8
)

to_zarr(
    gray_df,
    '../data/test2_original_frames_gray',
    overwrite=True
)
gray_df

Unnamed: 0,Array,Chunk
Bytes,4.63 GiB,1.98 MiB
Shape,"(2400, 1, 1080, 1920)","(1, 1, 1080, 1920)"
Dask graph,2400 chunks in 3 graph layers,2400 chunks in 3 graph layers
Data type,uint8 numpy.ndarray,uint8 numpy.ndarray
"Array Chunk Bytes 4.63 GiB 1.98 MiB Shape (2400, 1, 1080, 1920) (1, 1, 1080, 1920) Dask graph 2400 chunks in 3 graph layers Data type uint8 numpy.ndarray",2400  1  1920  1080  1,

Unnamed: 0,Array,Chunk
Bytes,4.63 GiB,1.98 MiB
Shape,"(2400, 1, 1080, 1920)","(1, 1, 1080, 1920)"
Dask graph,2400 chunks in 3 graph layers,2400 chunks in 3 graph layers
Data type,uint8 numpy.ndarray,uint8 numpy.ndarray


In [8]:
def get_good_features(frames):
    features = []
    for frame in frames:
        points = cv2.goodFeaturesToTrack(
            frame[0],
            maxCorners=200,
            qualityLevel=0.01,
            minDistance=30,
            blockSize=3
        )

        if points is None or points.shape != (200, 1, 2):
            points = np.zeros((200, 1, 2), dtype=np.float32)
            points[:, :, 0] = np.nan
        features.append(points)
    return np.array(features)

points_df = gray_df.map_blocks(
    get_good_features,
    chunks=(1, 200, 1, 2),
    dtype='float32',
)

to_zarr(
    points_df,
    '../data/test2_original_frames_points',
    overwrite=True
)

points_df

Unnamed: 0,Array,Chunk
Bytes,3.66 MiB,1.56 kiB
Shape,"(2400, 200, 1, 2)","(1, 200, 1, 2)"
Dask graph,2400 chunks in 4 graph layers,2400 chunks in 4 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 3.66 MiB 1.56 kiB Shape (2400, 200, 1, 2) (1, 200, 1, 2) Dask graph 2400 chunks in 4 graph layers Data type float32 numpy.ndarray",2400  1  2  1  200,

Unnamed: 0,Array,Chunk
Bytes,3.66 MiB,1.56 kiB
Shape,"(2400, 200, 1, 2)","(1, 200, 1, 2)"
Dask graph,2400 chunks in 4 graph layers,2400 chunks in 4 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


In [12]:
def compute_motion(prev_frame, prev_points, next_frame):
    prev_frame = prev_frame[0][0][0].compute()
    prev_points = prev_points[0][0][0].compute()
    next_frame = next_frame[0][0][0].compute()

    prev_frame = prev_frame[0][0]
    prev_points = prev_points[0][0]
    next_frame = next_frame[0][0]

    if np.any(np.isnan(prev_points)):
        return np.array([0,0,0])

    try:
        next_points, status, error = cv2.calcOpticalFlowPyrLK(
            prev_frame,
            next_frame,
            prev_points,
            None
        )
    except:
        print('pp', prev_points)
        raise

    idx = np.where(status == 1)[0]
    ppp = prev_points[idx]
    npp = next_points[idx]

    if len(npp) <= 0:
        return np.array([0,0,0])

    try:
        m = cv2.estimateAffinePartial2D(ppp, npp)[0]
    except:
        print(next_points)
        raise

    if m is None:
        return np.array([0,0,0])

    dx = m[0, 2]
    dy = m[1, 2]
    da = np.arctan2(m[1, 0], m[0, 0])
    return np.array([dx, dy, da])


gray_delayed = gray_df.to_delayed()
points_delayed = points_df.to_delayed()

delayed_motion = []
for i in range(len(gray_delayed)-1):

    prev_frame = gray_delayed[i]
    prev_points = points_delayed[i]
    next_frame = gray_delayed[i + 1]

    delayed_motion.append(
        delayed(compute_motion)(
            prev_frame,
            prev_points,
            next_frame
        )
    )

dask.compute(*delayed_motion)

[[1459.5083 1123.6526]]


error: OpenCV(4.6.0) /Users/runner/work/opencv-python/opencv-python/opencv/modules/calib3d/src/ptsetreg.cpp:176: error: (-215:Assertion failed) count >= 0 && count2 == count in function 'run'
