Skip to content

Commit

Permalink
v1.8.0
Browse files Browse the repository at this point in the history
feat: serve can now link the calibration

feat: add auto compression + auto sync

feat: add /v1/reset

fix: fix npy compression.wrong.filename
  • Loading branch information
davidliyutong committed Apr 21, 2023
1 parent 71f5bdd commit 6d758ab
Show file tree
Hide file tree
Showing 13 changed files with 676 additions and 21 deletions.
9 changes: 8 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,14 @@ To launch a remote record station that supports REST API
```shell
python -m realsense_recorder serve
```


To launch a remote record station that is calibrated
```shell
python -m realsense_recorder serve --calibration=path/to/calibration
```

> Note: The calibration.json file is expected to be in the `path/to/calibration` directory. The calibration file can be generated by running `python -m realsense_recorder calibrate`
To run calibration

```shell
Expand Down
31 changes: 31 additions & 0 deletions realsense_recorder/cmd/post_processing.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import argparse
import numpy as np
from typing import Tuple

from realsense_recorder.io import compress_record, sync_cameras

# def detect_fiducials(base_dir: str):
# pass


def worker(base_dir: str):
compress_record(base_dir)
sync_cameras(base_dir)


def main(args):
worker(args.base_dir)
pass


def entry_point(argv):
parser = argparse.ArgumentParser(description='Recorder')
parser.add_argument('--base_dir', type=str, help='Base directory', default='')
args = parser.parse_args(argv)
main(args)


if __name__ == '__main__':
import sys

entry_point(sys.argv[1:])
82 changes: 72 additions & 10 deletions realsense_recorder/cmd/serve.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,10 +4,12 @@
import multiprocessing as mp
import os
import os.path as osp
import shutil
import signal
import time
from concurrent.futures import ThreadPoolExecutor
from typing import Callable, Dict, List, Optional
import threading

import cv2
import numpy as np
Expand All @@ -16,23 +18,28 @@
from fastapi import FastAPI
from fastapi.encoders import jsonable_encoder
from fastapi.responses import JSONResponse, RedirectResponse
import pyrealsense2 as rs

from realsense_recorder.common import (
CALLBACKS,
new_realsense_camera_system_from_yaml_file,
RealsenseSystemModel,
RealsenseSystemCfg,
RealsenseCameraCfg,
get_datetime_tag
get_datetime_tag,
enumerate_devices_that_supports_advanced_mode,
)

from .post_processing import worker as post_processing_worker

app = FastAPI()
logging.basicConfig(level=logging.INFO)

STOP_EV: mp.Event = mp.Event()
FINISH_EV: mp.Event = mp.Event()
READY_EV: mp.Event = mp.Event()
CAPTURE_PROCS: List[mp.Process] = []
POST_PROCESSING_QUEUE: Optional[mp.Queue] = None
ARGS: Optional[argparse.Namespace] = None


Expand Down Expand Up @@ -159,10 +166,10 @@ def save_depth_frame(path, frame):
self.insert_meta_data(cam.friendly_name, ts, sys_ts, frame_counter, frame_basename)
frame_basename = '{:07d}_{}_{}'.format(frame_counter, ts["backend_t"], sys_ts)
if color_image is not None:
save_workers.submit(save_color_frame, osp.join(cam.color_save_path, frame_basename+'.bmp'), color_image)
save_workers.submit(save_color_frame, osp.join(cam.color_save_path, frame_basename + '.bmp'), color_image)

if depth_image is not None:
save_workers.submit(save_depth_frame, osp.join(cam.depth_save_path, frame_basename+'.npy'), depth_image)
save_workers.submit(save_depth_frame, osp.join(cam.depth_save_path, frame_basename + '.npy'), depth_image)

progress_bars[idx].set_description(f'SN={cam.option.sn}, FrameCounter={frame_counter}')
progress_bars[idx].update(1)
Expand Down Expand Up @@ -195,7 +202,8 @@ def capture_frames(stop_ev: mp.Event,
finish_ev: mp.Event,
ready_ev: mp.Event,
config: str,
tag: str):
tag: str,
post_processing_queue: mp.Queue = None):
callbacks = {
CALLBACKS.tag_cb: lambda: tag,
CALLBACKS.save_path_cb: lambda cam_cfg, sys_cfg: osp.join(sys_cfg.base_dir, "r" + cam_cfg.sn[-2:]),
Expand All @@ -204,7 +212,14 @@ def capture_frames(stop_ev: mp.Event,

sys = new_realsense_camera_system_from_yaml_file(RemoteRecordSeq, config, callbacks)

sys.app(stop_ev, finish_ev, ready_ev)
try:
sys.app(stop_ev, finish_ev, ready_ev)
except AttributeError as e:
logging.error(f"capture_frames failed: {e}")

if post_processing_queue is not None:
logging.info(f"adding {sys.options.base_dir} to post processing queue")
post_processing_queue.put(sys.options.base_dir)


@app.get("/")
Expand All @@ -231,7 +246,7 @@ def ready():

@app.post("/v1/start")
def start_process(tag: str = None):
global CAPTURE_PROCS, STOP_EV, FINISH_EV, READY_EV, ARGS
global CAPTURE_PROCS, STOP_EV, FINISH_EV, READY_EV, ARGS, POST_PROCESSING_QUEUE

# Wait until last capture ends
if len(CAPTURE_PROCS) > 0:
Expand Down Expand Up @@ -264,7 +279,8 @@ def start_process(tag: str = None):
FINISH_EV,
READY_EV,
ARGS.config,
tag))]
tag,
POST_PROCESSING_QUEUE))]
DELAY_S = 2 # Magic delay duration to avoid U3V communication error
[(proc.start(), time.sleep(DELAY_S
)) for proc in CAPTURE_PROCS]
Expand All @@ -284,6 +300,18 @@ def stop_process():
return make_response(status_code=500, msg="NOT RUNNING")


@app.post("/v1/reset")
def reset_camera():
global CAPTURE_PROCS, STOP_EV
if len(CAPTURE_PROCS) > 0 and any([proc.is_alive() for proc in CAPTURE_PROCS]):
return make_response(status_code=500, msg="RUNNING")
else:
for dev in enumerate_devices_that_supports_advanced_mode(rs.context()):
dev.hardware_reset()
time.sleep(3)
return make_response(status_code=200, msg="RESET OK")


@app.post("/v1/kill")
def kill_process():
global CAPTURE_PROCS, STOP_EV, FINISH_EV
Expand All @@ -301,23 +329,57 @@ def kill_process():
return make_response(status_code=500, msg="NOT RUNNING")


def post_processing_thread():
global POST_PROCESSING_QUEUE, ARGS
logging.info("post processing thread started")
while True:
if not POST_PROCESSING_QUEUE.empty():
base_dir = POST_PROCESSING_QUEUE.get()
if ARGS.calibration is not None:
try:
_calibration_file_name = "calibration.json"
shutil.copy(osp.join(ARGS.calibration, _calibration_file_name), base_dir)
logging.info(f"copy calibration file from {osp.join(ARGS.calibration, _calibration_file_name)} to {base_dir}")
except Exception as e:
logging.error(f"copy calibration file failed: {e}")
try:
post_processing_worker(base_dir)
except Exception as e:
logging.error(f"post processing failed: {e}")
# p = mp.Process(target=post_processing_worker, args=(sys.options.base_dir,))
else:
time.sleep(1)


def main(args: argparse.Namespace):
global ARGS
global ARGS, POST_PROCESSING_QUEUE
POST_PROCESSING_QUEUE = mp.Queue()
ARGS = args
# Prepare system
logging.info('the server listens at port {}'.format(args.port))

if not osp.exists(args.config):
logging.error(f"config file {args.config} does not exist")
exit(1)

if args.calibration is not None and not osp.exists(args.calibration):
logging.error(f"calibration file {args.calibration} does not exist")
exit(1)

try:
t = threading.Thread(target=post_processing_thread)
t.start()
uvicorn.run(app=app, port=args.port)
except KeyboardInterrupt:
logging.info(f"main() got KeyboardInterrupt")
exit(1)
os._exit(1)


def entry_point(argv):
parser = argparse.ArgumentParser(description='Recorder')
parser.add_argument('--app', type=str, help='', default='')
# parser.add_argument('--app', type=str, help='', default='')
parser.add_argument('--config', type=str, help='The realsense system configuration', default='./realsense_config.yaml')
parser.add_argument('--calibration', type=str, help='The realsense system calibration', default=None)
parser.add_argument('--port', type=int, help="Port to listen", default=5050)
parser.add_argument('--debug', action='store_true', help='Toggle Debug mode')
args = parser.parse_args(argv)
Expand Down
14 changes: 8 additions & 6 deletions realsense_recorder/common/record.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,12 +289,14 @@ def close(self):
def start(self, interval_ms: int = 10):
num_of_cameras = len(self.cameras)
ret = []
with ThreadPoolExecutor(max_workers=num_of_cameras) as executor:
for idx, cam in enumerate(self.cameras):
ret.append(executor.submit(lambda: cam.start(delay_ms=interval_ms * (num_of_cameras - idx))))
executor.shutdown(wait=True)
list(map(lambda x: x.result(), ret))
time.sleep(0.1)
# with ThreadPoolExecutor(max_workers=num_of_cameras) as executor:
# for idx, cam in enumerate(self.cameras):
# ret.append(executor.submit(lambda: cam.start(delay_ms=interval_ms * (num_of_cameras - idx))))
# executor.shutdown(wait=True)
# list(map(lambda x: x.result(), ret))
# time.sleep(0.1)
for idx, cam in enumerate(self.cameras):
cam.start()

def stop(self, interval_ms: int = 0):
num_of_cameras = len(self.cameras)
Expand Down
33 changes: 33 additions & 0 deletions realsense_recorder/common/utils.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import datetime
from typing import Dict, Any, List

import numpy as np
import pyrealsense2 as rs
import cv2

from realsense_recorder.common import new_camera_config_by_device, new_system_config

Expand Down Expand Up @@ -68,3 +70,34 @@ def configure_realsense_system_from_keyboard() -> Dict[str, Dict[str, Any]]:

def get_datetime_tag() -> str:
return datetime.datetime.now().strftime("%Y-%m-%d_%H%M%S")

def rvec_tvec_to_matrix(rvec: np.ndarray=None, tvec: np.ndarray=None) -> np.ndarray:
"""
Convert rotation vector and translation vector to transformation matrix
Args:
rvec: rotation vector
tvec: translation vector
Returns:
transformation matrix
"""
R, _ = cv2.Rodrigues(rvec) if rvec is not None else (np.eye(3), None)
T = np.eye(4)
T[:3, :3] = R
T[:3, 3] = tvec if tvec is not None else np.zeros(3)
return T

def R_T_to_matrix(R: np.ndarray, T:np.ndarray) -> np.ndarray:
"""
Convert R matrix and T matrix to transformation matrix
Args:
R: rotation
T: translation
Returns:
transformation matrix
"""
res = np.eye(4)
res[:3, :3] = R if R is not None else np.eye(3)
res[:3, 3] = T if T is not None else np.zeros(3)
return res
Empty file.
Loading

0 comments on commit 6d758ab

Please sign in to comment.