In [71]:
import pyarrow as pa
import pyarrow.fs as fs
import json
import cv2
import numpy as np
import os
from dotenv import load_dotenv
from decord import VideoReader
from decord import cpu
import ray
from typing import Dict, Any

# Load environment variables
load_dotenv()
access_key = os.getenv('access_key')
secret_key = os.getenv('secret_key')
region = os.getenv('region')
namespace = os.getenv('namespace')

# Initialize S3 filesystem with credentials
s3 = fs.S3FileSystem(region=region, access_key=access_key, secret_key=secret_key, endpoint_override=f'https://{namespace}.compat.objectstorage.{region}.oraclecloud.com',)

# Define S3 paths
input_path = 'yt-vtt/RepNetImport/'
output_path = 'yt-vtt/fitclass/'

# List JSON files in the input path
files = s3.get_file_info(fs.FileSelector(input_path))
json_files = [file.path for file in files if file.path.endswith('.json')]

def process_batch(batch:Dict[str, Any]):
            print(batch['video_path'])
            ranges = batch['value'][0]['ranges'][0]
            print(ranges)
            labels = batch['value'][0]['timelinelabels']
            start_frame = ranges['start']
            end_frame = ranges['end']
            vr = batch['video'][0]
            print(vr.shape)
            video_path = batch['video_path'][0]
            fps = batch['fps'][0]
            print(f"Processing video: {video_path}, start: {start_frame}, end: {end_frame}")
            # Create new video segment
            output_file_name = f"{os.path.basename(video_path).split('.')[0]}_{start_frame}_{end_frame}.mp4"
            output_file_path = os.path.join(output_path, output_file_name)

            # Initialize VideoWriter
            frame_width, frame_height = vr[0].shape[1], vr[0].shape[0]
            fourcc = cv2.VideoWriter_fourcc(*'mp4v')
            temp_file = f"/tmp/{output_file_name}"
            vw = cv2.VideoWriter(temp_file, fourcc, fps, (frame_width, frame_height))

            for frame in vr:
                img = frame
                vw.write(cv2.cvtColor(img, cv2.COLOR_RGB2BGR))
            vw.release()
             # Convert the video to H.264 format using ffmpeg
            h264_temp_file = f"/tmp/h264_{output_file_name}"
            os.system(f"ffmpeg -i {temp_file} -c:v libx264 -preset fast -crf 23 -y {h264_temp_file}")

            # Replace the temp_file with the H.264 converted file
            os.remove(temp_file)
            temp_file = h264_temp_file
            
            # Upload video to S3
            with open(temp_file, 'rb') as temp_video:
                with s3.open_output_stream(output_file_path) as output_stream:
                    output_stream.write(temp_video.read())

            os.remove(temp_file)
            print(f"Uploaded video: {output_file_path}")

            # Create JSON metadata
            json_metadata = {
                "data": {
                    "video": f's3://{output_file_path}'
                },
                "annotations": [
                    {
                        "result": [
                            {
                                "type": "number",
                                "value": {
                                    "number": int(labels[0])
                                },
                                "to_name": "video",
                                "from_name": "rep"
                            }
                        ]
                    }
                ]
            }

            # Upload JSON metadata to S3
            json_file_name = f"{os.path.basename(output_file_name).split('.')[0]}.json"
            json_file_path = os.path.join(output_path, json_file_name)
            with s3.open_output_stream(json_file_path) as json_stream:
                json_stream.write(json.dumps(json_metadata).encode('utf-8'))

            print(f"Uploaded JSON: {json_file_path}")
            return batch
            

@ray.remote
def process_file(json_file):
    # Read JSON file
    print(f"Processing file: {json_file}")
    with s3.open_input_file(json_file) as f:
        data = json.load(f)

    # Extract video path
    video_path = data['data']['video'].split('s3://')[1]

    # Read video using decord
    with s3.open_input_file(video_path) as video_file:
        vr = VideoReader(video_file, ctx=cpu(0))
        fps = vr.get_avg_fps()
        
    # Process annotation results using ray.data with map_batches
    results = ray.data.from_items([
        {
            **result,
            "video_path": video_path,
            "fps": fps,
            "video": np.array([vr[frame].asnumpy() for frame in range(result['value']['ranges'][0]['start'], result['value']['ranges'][0]['end'] + 1)])
        }
        for annotation in data['annotations']
        for result in annotation['result']
    ])
    results.filter(lambda x:  int(x['value']['timelinelabels'][0]) > 2).map_batches(process_batch).take_all()

# Initialize Ray
ray.init(ignore_reinit_error=True)

# Process files in parallel
futures = [process_file.remote(json_file) for json_file in json_files]
ray.get(futures)

# Shutdown Ray
ray.shutdown()

2025-05-07 21:38:04,910	INFO worker.py:1843 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


[36m(process_file pid=28925)[0m Processing file: yt-vtt/RepNetImport/converted_-cAGR3GiZIU.json


[36m(process_file pid=28921)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=28921)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(<lambda>)->MapBatches(process_batch)]


(pid=28921) Running 0: 0.00 row [00:00, ? row/s]

[36m(process_file pid=28930)[0m Processing file: yt-vtt/RepNetImport/converted_1hzqrwvOLn0.json[32m [repeated 15x across cluster][0m
[36m(process_file pid=29337)[0m Processing file: yt-vtt/RepNetImport/converted_47_RmzItlJg.json
[36m(process_file pid=29337)[0m Processing file: yt-vtt/RepNetImport/converted_47_RmzItlJg.json


[36m(process_file pid=28928)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=28928)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(<lambda>)->MapBatches(process_batch)]


(pid=28928) Running 0: 0.00 row [00:00, ? row/s]

[36m(raylet)[0m Spilled 3053 MiB, 96 objects, write throughput 618 MiB/s. Set RAY_verbose_spill_logs=0 to disable this message.


(pid=28915) Running 0: 0.00 row [00:00, ? row/s]

[36m(process_file pid=28915)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=28915)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(<lambda>)->MapBatches(process_batch)]
[36m(raylet)[0m Spilled 4623 MiB, 143 objects, write throughput 524 MiB/s.
[36m(raylet)[0m Spilled 4623 MiB, 143 objects, write throughput 524 MiB/s.
[36m(process_file pid=28929)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=28929)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(<lambda>)->MapBatches(process_batch)]
[36m(process_file pid=28929)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=28929)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperat

(pid=28929) Running 0: 0.00 row [00:00, ? row/s]

(pid=28926) Running 0: 0.00 row [00:00, ? row/s]

[36m(process_file pid=28926)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=28926)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(<lambda>)->MapBatches(process_batch)]
[36m(raylet)[0m Spilled 8339 MiB, 237 objects, write throughput 558 MiB/s.
[36m(raylet)[0m Spilled 8339 MiB, 237 objects, write throughput 558 MiB/s.
[36m(process_file pid=28917)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=28917)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(<lambda>)->MapBatches(process_batch)]
[36m(process_file pid=28917)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=28917)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperat

(pid=28917) Running 0: 0.00 row [00:00, ? row/s]

(pid=28918) Running 0: 0.00 row [00:00, ? row/s]

[36m(process_file pid=28924)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data[32m [repeated 2x across cluster][0m
[36m(process_file pid=28924)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(<lambda>)->MapBatches(process_batch)][32m [repeated 2x across cluster][0m


(pid=28924) Running 0: 0.00 row [00:00, ? row/s]

(pid=28927) Running 0: 0.00 row [00:00, ? row/s]

[36m(process_file pid=28927)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=28927)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(<lambda>)->MapBatches(process_batch)]
[36m(raylet)[0m Spilled 16745 MiB, 495 objects, write throughput 598 MiB/s.
[36m(raylet)[0m Spilled 16745 MiB, 495 objects, write throughput 598 MiB/s.
[36m(process_file pid=28922)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=28922)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(<lambda>)->MapBatches(process_batch)]
[36m(process_file pid=28922)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=28922)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOper

(pid=28922) Running 0: 0.00 row [00:00, ? row/s]

(pid=28920) Running 0: 0.00 row [00:00, ? row/s]

[36m(process_file pid=28920)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=28920)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(<lambda>)->MapBatches(process_batch)]


(pid=28923) Running 0: 0.00 row [00:00, ? row/s]

[36m(process_file pid=28923)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=28923)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(<lambda>)->MapBatches(process_batch)]


(pid=28925) Running 0: 0.00 row [00:00, ? row/s]

[36m(process_file pid=28925)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=28925)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(<lambda>)->MapBatches(process_batch)]


(pid=28916) Running 0: 0.00 row [00:00, ? row/s]

[36m(process_file pid=28916)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=28916)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(<lambda>)->MapBatches(process_batch)]


(pid=28919) Running 0: 0.00 row [00:00, ? row/s]

[36m(process_file pid=28919)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=28919)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(<lambda>)->MapBatches(process_batch)]
[36m(process_file pid=28930)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=28930)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(<lambda>)->MapBatches(process_batch)]
[36m(process_file pid=28930)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=28930)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(<lambda>)->MapBatches(process_batch)]


(pid=28930) Running 0: 0.00 row [00:00, ? row/s]

[36m(process_file pid=29337)[0m Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-07_21-38-00_036548_89962/logs/ray-data
[36m(process_file pid=29337)[0m Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[Filter(<lambda>)->MapBatches(process_batch)]


(pid=29337) Running 0: 0.00 row [00:00, ? row/s]

KeyboardInterrupt: 

In [72]:
ray.shutdown()

In [None]:
vr = VideoReader(video_path, ctx=cpu(0))
fps = vr.get_avg_fps()
print(f"FPS: {fps}")

In [None]:
!python3 ../drivers/ray_pipeline_splitworkouts.py

2025-05-09 16:17:54,680	INFO worker.py:1843 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m
Processing file: yt-vtt/RepNetImport/converted_23P40lccIsY.json
Processing file: yt-vtt/RepNetImport/converted_23P40lccIsY.json
2025-05-09 16:18:09,387	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-09_16-17-51_130865_99818/logs/ray-data
2025-05-09 16:18:09,387	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBatches(process_batch)]
Running 0: 0.00 row [00:00, ? row/s]
- MapBatches(process_batch) 1: 0.00 row [00:00, ? row/s][A2025-05-09 16:18:09,387	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2025-05-09_16-17-51_130865_99818/logs/ray-data
2025-05-09 16:18:09,387	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[MapBat

In [12]:
import ray
ray.init(ignore_reinit_error=True)

2025-05-08 21:04:37,799	INFO worker.py:1843 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


0,1
Python version:,3.11.5
Ray version:,2.44.1
Dashboard:,http://127.0.0.1:8265


In [13]:
ray.nodes()[0]

{'NodeID': 'a91b47950e52ed7b184e47cb281f50874b333c62254d014029f00b3e',
 'Alive': True,
 'NodeManagerAddress': '127.0.0.1',
 'NodeManagerHostname': 'ML-kanantharaman',
 'NodeManagerPort': 60731,
 'ObjectManagerPort': 60730,
 'ObjectStoreSocketName': '/tmp/ray/session_2025-05-08_21-04-34_701633_80767/sockets/plasma_store',
 'RayletSocketName': '/tmp/ray/session_2025-05-08_21-04-34_701633_80767/sockets/raylet',
 'MetricsExportPort': 64354,
 'NodeName': '127.0.0.1',
 'RuntimeEnvAgentPort': 64693,
 'DeathReason': 0,
 'DeathReasonMessage': '',
 'alive': True,
 'Resources': {'memory': 42373300224.0,
  'CPU': 16.0,
  'node:127.0.0.1': 1.0,
  'object_store_memory': 2147483648.0,
  'node:__internal_head__': 1.0},
 'Labels': {'ray.io/node_id': 'a91b47950e52ed7b184e47cb281f50874b333c62254d014029f00b3e'}}

In [1]:
import pyarrow.fs as fs
import os
from dotenv import load_dotenv


In [2]:
# Load environment variables
load_dotenv()
access_key = os.getenv('access_key')
secret_key = os.getenv('secret_key')
region = os.getenv('region')
namespace = os.getenv('namespace')

# Initialize S3 filesystem with credentials
s3 = fs.S3FileSystem(region=region, access_key=access_key, secret_key=secret_key, endpoint_override=f'https://{namespace}.compat.objectstorage.{region}.oraclecloud.com',)


In [3]:
s3 = fs.S3FileSystem(region=region, access_key=access_key, secret_key=secret_key, endpoint_override=f'https://{namespace}.compat.objectstorage.{region}.oraclecloud.com',)


In [4]:
# Define S3 paths
input_path = 'yt-vtt/RepNetImport/'
output_path = 'yt-vtt/fitclass/'

# List JSON files in the input path
files = s3.get_file_info(fs.FileSelector(input_path))
json_files = [file.path for file in files if file.path.endswith('.json')]


In [5]:
def filter_existing_files(json_files, output_path):
    """
    Filters out JSON files whose corresponding output files already exist in the output path.
    """
    filtered_files = []
    out_files = s3.get_file_info(fs.FileSelector(output_path))
    
    for json_file in json_files:
            videoid = json_file.split('converted_')[1].split('.')[0]
            exists = [file.path for file in out_files if videoid in file.path]
            if(len(exists) == 0):
                filtered_files.append(json_file)
            else:
                print(f"File {json_file} already exists in output path. Skipping.")
    return filtered_files


In [6]:
len(filter_existing_files(json_files, output_path))

File yt-vtt/RepNetImport/converted_-cAGR3GiZIU.json already exists in output path. Skipping.
File yt-vtt/RepNetImport/converted_-cDGcc8Wc70.json already exists in output path. Skipping.
File yt-vtt/RepNetImport/converted_-jJm9BK3Pcg.json already exists in output path. Skipping.
File yt-vtt/RepNetImport/converted_0NsesOZEU2A.json already exists in output path. Skipping.
File yt-vtt/RepNetImport/converted_0iqP6WP2ET4.json already exists in output path. Skipping.
File yt-vtt/RepNetImport/converted_1ADarF8TSxE.json already exists in output path. Skipping.
File yt-vtt/RepNetImport/converted_1AGiq77wZ9c.json already exists in output path. Skipping.
File yt-vtt/RepNetImport/converted_1M-TtC2loIM.json already exists in output path. Skipping.
File yt-vtt/RepNetImport/converted_1WpDUlJkf5Q.json already exists in output path. Skipping.
File yt-vtt/RepNetImport/converted_1hzqrwvOLn0.json already exists in output path. Skipping.
File yt-vtt/RepNetImport/converted_2bfAZ4mcGjo.json already exists in 

9