In [None]:
!$PIP_INSTALL -qq pandas==2.1.4 fastapi grpcio ray pyarrow aiohttp opencensus aiohttp_cors

In [1]:
%reload_ext watermark
%reload_ext autoreload
%autoreload 2
%watermark -p numpy,sklearn,pandas
%watermark -p ipywidgets,cv2,PIL,matplotlib,plotly,netron
%watermark -p torch,torchvision,torchaudio
# %watermark -p tensorflow,tensorboard,tflite
# %watermark -p onnx,tf2onnx,onnxruntime,tensorrt,tvm
# %matplotlib inline
# %config InlineBackend.figure_format='retina'
# %config IPCompleter.use_jedi = False

# %matplotlib inline
# %matplotlib widget
# from IPython.display import display, Markdown, HTML, IFrame, Image, Javascript
# from IPython.core.magic import register_line_magic, register_cell_magic
# display(HTML('<style>.container { width:%d%% !important; }</style>' % 90))

import sys, os, io, logging, time, random, math
import json, base64, requests, shutil
import argparse, shlex, signal
import numpy as np
from PIL import Image
import matplotlib.pyplot as plt

def _dir(x, dumps=True, ret=True):
    attrs = sorted([y for y in dir(x) if not y.startswith('_')])
    result = '%s: %s' % (str(type(x))[8:-2], json.dumps(attrs) if dumps else attrs)
    if ret:
        return result
    print(result)

numpy  : 1.26.3
sklearn: 1.4.0
pandas : 2.1.4

ipywidgets: 8.1.1
cv2       : 4.9.0
PIL       : 10.2.0
matplotlib: 3.8.2
plotly    : 5.18.0
netron    : 7.3.9

torch      : 2.1.1+cpu
torchvision: 0.16.1+cpu
torchaudio : 2.1.1+cpu



In [2]:
import logging

logger = logging.getLogger(__name__)


def _imshow(image, title=None, color='bgr', figsize=(6, 3), canvas=False):
    import IPython
    plt.close('all')
    if figsize == 'auto':
        ih, iw = image.shape[:2]
        fw, fh = int(1.5 * iw / 80) + 1, int(1.5 * ih / 80) + 1
        if fw > 32:
            fh = int(32 * (fh / fw))
            fw = 32
        figsize = (fw, fh)
    if canvas:
        IPython.get_ipython().enable_matplotlib(gui='widget');
        fig = plt.figure(figsize=figsize)
        fig.canvas.toolbar_position = 'left'
        fig.canvas.toolbar_visible = True
        fig.canvas.header_visible = False
        fig.canvas.footer_visible = True
    else:
        IPython.get_ipython().enable_matplotlib(gui='inline')
        fig = plt.figure(figsize=figsize)
    plt.axis('off')
    if title is not None:
        plt.title(title)
    if color == 'gray' or len(image.shape) == 2:
        plt.imshow(image, cmap='gray');
    else:
        if color == 'bgr':
            image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
        plt.imshow(image);


def _imgrid(images, nrow=None, padding=4, pad_value=127, labels=None,
            font_scale=1.0, font_thickness=1, text_color=(255,), text_color_bg=None):
    count = len(images)
    if isinstance(images, dict):
        labels = [lab for lab in images.keys()]
        images = [img for img in images.values()]

    if not isinstance(images, (list, tuple, np.ndarray)) or count == 0 or not isinstance(images[0], np.ndarray):
        return
    if nrow is None or nrow > count:
        nrow = count

    max_h, max_w = np.asarray([img.shape[:2] for img in images]).max(axis=0)
    if labels is not None:
        text_org = int(0.1 * max_w), int(0.9 * max_h)
        shape_length = 3
    else:
        shape_length = np.asarray([len(img.shape) for img in images]).max()
    lack = count % nrow
    rows = np.intp(np.ceil(count / nrow))
    hpad_size = [max_h, padding] 
    if rows > 1:
        vpad_size = [padding, nrow * max_w + (nrow - 1) * padding]
        if lack > 0:
            lack_size = [max_h, max_w]
    if shape_length == 3:
        hpad_size.append(3)
        if rows > 1:
            vpad_size.append(3)
            if lack > 0:
                lack_size.append(3)
    hpadding = pad_value * np.ones(hpad_size, dtype=np.uint8)
    if rows > 1:
        vpadding = pad_value * np.ones(vpad_size, dtype=np.uint8)
        if lack > 0:
            lack_image = pad_value * np.ones(lack_size, dtype=np.uint8)
            images.extend([lack_image] * lack)
            if labels is not None:
                labels.extend([''] * lack)
    vlist = []
    for i in range(rows):
        hlist = []
        for j in range(nrow):
            if j != 0:
                hlist.append(hpadding)
            timg = images[i * nrow + j].copy()
            th, tw = timg.shape[:2]
            if th != max_h or tw != max_w:
                timg = cv2.resize(timg, (max_w, max_h))
            if len(timg.shape) != shape_length:
                timg = cv2.cvtColor(timg, cv2.COLOR_GRAY2BGR)
            if labels is not None:
                text = str(labels[i * nrow + j])
                if len(text) > 0:
                    if text_color_bg is not None:
                        (tw, th), _ = cv2.getTextSize(text, cv2.FONT_HERSHEY_SIMPLEX, font_scale, font_thickness)
                        pos1 = text_org[0] - int(font_scale * 5), text_org[1] - th - int(font_scale * 5)
                        pos2 = text_org[0] + int(font_scale * 5) + tw, text_org[1] + int(font_scale * 8)
                        cv2.rectangle(timg, pos1, pos2, text_color_bg, -1)
                    cv2.putText(timg, text, text_org, cv2.FONT_HERSHEY_SIMPLEX, font_scale, text_color, font_thickness)
            hlist.append(timg)
        if i != 0:
            vlist.append(vpadding)
        vlist.append(np.hstack(hlist))
    if rows > 1:
        return np.vstack(vlist)
    return vlist[0]


In [3]:
from ray.data._internal.delegating_block_builder import DelegatingBlockBuilder
from ray.data.block import BlockMetadata
from ray.data.datasource.datasource import Datasource, ReadTask
from ray.util.annotations import DeveloperAPI

import ray
import cv2
import os

DATASOURCE_READER_BATCH_SIZE = 8


@DeveloperAPI
class VideoFrameDatasource(Datasource):
    def __init__(self, video_path: str):
        assert os.path.exists(video_path)
        self._video_path = video_path

    def get_read_tasks(self, parallelism):
        assert parallelism == 1
        meta = BlockMetadata(
            num_rows=None,
            size_bytes=None,
            schema=None,
            input_files=None,
            exec_stats=None,
        )
        read_task = ReadTask(
            lambda p=self._video_path: _read_frames(p),
            metadata=meta,
        )

        return [read_task]

    def estimate_inmemory_data_size(self):
        return None


def _read_frames(video_path: str):
    batch = []
    cap = cv2.VideoCapture(video_path)
    assert cap.isOpened()
    while True:
        success, frame = cap.read()
        if not success:
            logger.warning("Count:%d" % cap.get(cv2.CAP_PROP_FRAME_COUNT))
            cap.release()
            break
        batch.append(frame)
        if len(batch) == DATASOURCE_READER_BATCH_SIZE:
            builder = DelegatingBlockBuilder()
            builder.add_batch({"frames": batch})
            yield builder.build()
            batch.clear()

    if len(batch) > 0:
        builder = DelegatingBlockBuilder()
        builder.add_batch({"frames": batch})
        yield builder.build()


In [4]:
video_path = '/data/source/hzcsai_com/hzcsbet/gamebet/datasets/0bfacc_5.mp4'
ds = ray.data.read_datasource(VideoFrameDatasource(video_path=video_path), parallelism=1)

2024-01-24 22:27:28,602	INFO worker.py:1715 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m


In [7]:
single_batch = ds.take_batch(2)

2024-01-24 22:29:18,581	INFO streaming_executor.py:112 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[ReadVideoFrame] -> LimitOperator[limit=2]
2024-01-24 22:29:18,582	INFO streaming_executor.py:113 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), exclude_resources=ExecutionResources(cpu=0, gpu=0, object_store_memory=0), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)
2024-01-24 22:29:18,582	INFO streaming_executor.py:115 -- Tip: For detailed progress reporting, run `ray.data.DataContext.get_current().execution_options.verbose_progress = True`


Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

In [6]:
del single_batch

In [None]:
single_batch.keys(), len(single_batch['frames'])

In [None]:
_imshow(single_batch['frames'][0])

In [None]:
_dir(ds)

In [None]:
count = 0
for batch in ds.iter_batches(batch_size=8):
    count += len(batch['frames'])
print(count)