Skip to content

Commit

Permalink
[RSDK-7815] test FakeCamera APIs (#254)
Browse files Browse the repository at this point in the history
  • Loading branch information
mattjperez committed Jul 15, 2024
1 parent 6cdae63 commit fef8589
Show file tree
Hide file tree
Showing 4 changed files with 314 additions and 33 deletions.
248 changes: 247 additions & 1 deletion micro-rdk/src/common/camera/fake_camera.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use std::{

use crate::common::{config::ConfigType, registry::ComponentRegistry, registry::Dependency};

pub static FAKE_JPEG: &[u8] = include_bytes!("./fake_image.jpg");
static FAKE_JPEG: &[u8] = include_bytes!("./fake_image.jpg");

pub(crate) fn register_models(registry: &mut ComponentRegistry) {
if registry
Expand Down Expand Up @@ -76,3 +76,249 @@ impl Status for FakeCamera {
}))
}
}

mod tests {
use std::{
convert::Infallible,
net::{SocketAddr, TcpListener, TcpStream},
sync::{Arc, Mutex},
time::Duration,
};

use async_io::Timer;

use super::FAKE_JPEG;
use crate::{
common::{
app_client::encode_request,
config::DynamicComponentConfig,
conn::server::{AsyncableTcpListener, Http2Connector},
grpc::GrpcError,
grpc::{GrpcBody, GrpcServer},
robot::{LocalRobot, RobotError},
},
google::api::HttpBody,
native::{
exec::NativeExecutor,
tcp::{NativeListener, NativeStream},
},
proto::component::camera::v1::{GetImageRequest, GetImageResponse, RenderFrameRequest},
};

use http_body_util::{combinators::BoxBody, BodyExt, Collected, Full};
use hyper::{
body::Incoming,
client::conn::http2::SendRequest,
header::{CONTENT_TYPE, TE},
server::conn::http2,
Method,
};
use prost::Message;

static SUCCESS: i32 = 0;

fn setup_robot() -> Result<LocalRobot, RobotError> {
let mut robot = LocalRobot::default();

let mut conf = Vec::new();

#[cfg(feature = "camera")]
conf.push(Some(DynamicComponentConfig {
name: "camera".to_string(),
namespace: "rdk".to_string(),
r#type: "camera".to_string(),
model: "rdk:builtin:fake".to_string(),
attributes: None,
..Default::default()
}));

robot.process_components(conf, Box::default())?;

Ok(robot)
}

async fn setup_grpc_server(exec: NativeExecutor, addr: SocketAddr) {
let mut listener = NativeListener::new((addr).into(), None)
.unwrap()
.as_async_listener()
.await
.unwrap();

let robot = Arc::new(Mutex::new(setup_robot().unwrap()));

loop {
let incoming = listener.accept().await;
assert!(incoming.is_ok());
let stream = incoming.unwrap();
let srv = GrpcServer::new(robot.clone(), GrpcBody::new());
Box::new(http2::Builder::new(exec.clone()).serve_connection(stream, srv))
.await
.unwrap();
}
}

async fn check_response(
resp: hyper::Response<Incoming>,
code: i32,
) -> Result<Collected<bytes::Bytes>, String> {
let (parts, body) = resp.into_parts();
let body = body.collect().await.unwrap();
assert!(body.trailers().is_some());
assert_eq!(
body.trailers()
.as_ref()
.unwrap()
.get("grpc-status")
.unwrap()
.to_str()
.unwrap(),
code.to_string()
);

assert_eq!(parts.status, 200);
Ok(body)
}

async fn build_request<M: Message + bytes::Buf + 'static>(
host: String,
path: String,
message: M,
) -> hyper::Request<BoxBody<M, Infallible>> {
hyper::Request::builder()
.method(Method::POST)
.uri(host + &path)
.header(CONTENT_TYPE, "application/grpc")
.header(TE, "trailers")
.body(Full::new(message).boxed())
.unwrap()
}

async fn test_get_image(
mut send_request: SendRequest<BoxBody<bytes::Bytes, Infallible>>,
host: &str,
) -> Result<(), String> {
let get_image_path = "/viam.component.camera.v1.CameraService/GetImage";
// valid
let mut message = GetImageRequest::default();
message.name = "camera".to_string();
let message = encode_request(message).unwrap();

assert!(send_request.ready().await.is_ok());
let req = build_request(host.to_string(), get_image_path.to_string(), message).await;

let resp = send_request.send_request(req).await;
assert!(resp.is_ok());
let body = check_response(resp.unwrap(), SUCCESS).await.unwrap();

let resp = GetImageResponse::decode(body.to_bytes().split_off(5));
assert!(resp.is_ok());
let resp = resp.unwrap();
assert_eq!(resp.mime_type, "image/jpeg");
assert_eq!(resp.image.len(), FAKE_JPEG.len());

// invalid
let mut message = GetImageRequest::default();
message.name = "non-existant-camera".to_string();
let message = encode_request(message).unwrap();

assert!(send_request.ready().await.is_ok());
let req = build_request(host.to_string(), get_image_path.to_string(), message).await;

let resp = send_request.send_request(req).await;
assert!(resp.is_ok());
let _body = check_response(resp.unwrap(), GrpcError::RpcUnavailable as i32)
.await
.unwrap();

Ok(())
}

async fn test_render_frame(
mut send_request: SendRequest<BoxBody<bytes::Bytes, Infallible>>,
host: &str,
) -> Result<(), String> {
let get_image_path = "/viam.component.camera.v1.CameraService/RenderFrame";
// valid
let mut message = RenderFrameRequest::default();
message.name = "camera".to_string();
let message = encode_request(message).unwrap();

assert!(send_request.ready().await.is_ok());
let req = build_request(host.to_string(), get_image_path.to_string(), message).await;
let resp = send_request.send_request(req).await;
assert!(resp.is_ok());

let body = check_response(resp.unwrap(), SUCCESS).await.unwrap();
let resp = HttpBody::decode(body.to_bytes().split_off(5));

assert!(resp.is_ok());
let resp = resp.unwrap();
assert_eq!(resp.content_type, "image/jpeg");
assert_eq!(resp.data.len(), FAKE_JPEG.len());

// invalid
let mut message = RenderFrameRequest::default();
message.name = "non-existant-camera".to_string();
let message = encode_request(message).unwrap();

assert!(send_request.ready().await.is_ok());
let req = build_request(host.to_string(), get_image_path.to_string(), message).await;

let resp = send_request.send_request(req).await;
assert!(resp.is_ok());
let _body = check_response(resp.unwrap(), GrpcError::RpcUnavailable as i32)
.await
.unwrap();

Ok(())
}

#[test_log::test]
fn test_fake_camera() {
let exec = NativeExecutor::default();

let addr = TcpListener::bind("127.0.0.1:0")
.unwrap()
.local_addr()
.unwrap();

let cloned = exec.clone();
exec.spawn(async move {
setup_grpc_server(cloned, addr).await;
})
.detach();

let host = format!("http://{}", addr);
exec.block_on(async {
Timer::after(Duration::from_millis(100)).await;
});
let stream = exec.block_on(async { async_io::Async::<TcpStream>::connect(addr).await });
let stream = match stream {
Ok(s) => NativeStream::LocalPlain(s),
Err(e) => {
println!("{:?}", e.to_string());
panic!();
}
};

let send_request: SendRequest<BoxBody<bytes::Bytes, Infallible>> = exec.block_on(async {
let client = hyper::client::conn::http2::Builder::new(exec.clone())
.handshake(stream)
.await;

assert!(client.is_ok());
let (send_request, conn) = client.unwrap();
exec.spawn(async move {
let _ = conn.await;
})
.detach();
send_request
});

let get_image = exec.block_on(async { test_get_image(send_request.clone(), &host).await });
assert!(get_image.is_ok());
let render_frame =
exec.block_on(async { test_render_frame(send_request.clone(), &host).await });
assert!(render_frame.is_ok());
}
}
25 changes: 23 additions & 2 deletions micro-rdk/src/common/camera/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ pub enum CameraError {
}

pub trait Camera: Status + DoCommand {
/// returns an image from a camera of the underlying robot. A specific MIME type
/// can be requested but may not necessarily be the same one returned
/// Returns a structured image response from a camera of the underlying robot.
/// A specific MIME type can be requested but may not necessarily be the same one returned
fn get_image(&mut self, _buffer: BytesMut) -> Result<BytesMut, CameraError> {
Err(CameraError::CameraMethodUnimplemented("get_image"))
}
Expand Down Expand Up @@ -71,3 +71,24 @@ where
self.get_mut().unwrap().render_frame(buffer)
}
}

impl<L> Camera for Arc<Mutex<L>>
where
L: ?Sized + Camera,
{
fn get_image(&mut self, buffer: BytesMut) -> Result<BytesMut, CameraError> {
self.lock().unwrap().get_image(buffer)
}
fn get_images(&mut self, buffer: BytesMut) -> Result<BytesMut, CameraError> {
self.lock().unwrap().get_images(buffer)
}
fn get_point_cloud(&mut self, buffer: BytesMut) -> Result<BytesMut, CameraError> {
self.lock().unwrap().get_point_cloud(buffer)
}
fn get_properties(&mut self, buffer: BytesMut) -> Result<BytesMut, CameraError> {
self.lock().unwrap().get_properties(buffer)
}
fn render_frame(&mut self, buffer: BytesMut) -> Result<BytesMut, CameraError> {
self.lock().unwrap().render_frame(buffer)
}
}
22 changes: 11 additions & 11 deletions micro-rdk/src/common/grpc.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,32 +8,32 @@ use std::{
};

use crate::{
common::analog::AnalogReader,
common::board::Board,
common::robot::LocalRobot,
common::{
analog::AnalogReader, board::Board, motor::Motor, robot::LocalRobot,
webrtc::grpc::WebRtcGrpcService,
},
google::rpc::Status,
proto::{self, component, robot},
};
use bytes::{BufMut, BytesMut};
use futures_lite::{future, Future};
use http_body_util::BodyExt;
use hyper::body::{Body, Frame};
use hyper::{
body::{self, Bytes},
body::{self, Body, Bytes, Frame},
http::HeaderValue,
service::Service,
HeaderMap, Request, Response,
};
use log::*;
use prost::Message;
use std::cell::RefCell;
use std::pin::Pin;
use std::rc::Rc;
use std::task::{Context, Poll};
use std::{
cell::RefCell,
pin::Pin,
rc::Rc,
task::{Context, Poll},
};
use thiserror::Error;

use super::{motor::Motor, webrtc::grpc::WebRtcGrpcService};

#[cfg(feature = "camera")]
static GRPC_BUFFER_SIZE: usize = 1024 * 30; // 30KB
#[cfg(not(feature = "camera"))]
Expand Down
Loading

0 comments on commit fef8589

Please sign in to comment.