Skip to content

Commit

Permalink
src: stream: Add TCP endpoint support for RTP payloads.
Browse files Browse the repository at this point in the history
  • Loading branch information
joaoantoniocardoso committed Jun 10, 2022
1 parent d35f7c7 commit 12bbaf5
Show file tree
Hide file tree
Showing 6 changed files with 92 additions and 4 deletions.
15 changes: 14 additions & 1 deletion src/stream/gst/pipeline_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,15 @@ impl Pipeline {
)))
}
};
Ok(pipeline_payload.to_string())

let mut pipeline_payload = pipeline_payload.to_string();

// We need to add RTP Stream Payloader when using TCP endpoints (https://datatracker.ietf.org/doc/html/rfc4571)
if video_and_stream_information.stream_information.endpoints[0].scheme() == "tcp" {
pipeline_payload = format!("{pipeline_payload} ! rtpstreampay");
}

Ok(pipeline_payload)
}

fn build_pipeline_sink(
Expand Down Expand Up @@ -220,6 +228,11 @@ impl Pipeline {
.join(",");
format!(" ! multiudpsink clients={clients}")
}
"tcp" => {
let host = endpoints[0].host().unwrap();
let port = endpoints[0].port().unwrap();
format!(" ! tcpserversink host={host} port={port}")
}
_ => "".to_string(),
};
Ok(pipeline_sink)
Expand Down
4 changes: 4 additions & 0 deletions src/stream/manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,9 @@ pub fn start() {
StreamType::UDP(stream) => {
stream.start();
}
StreamType::TCP(stream) => {
stream.start();
}
StreamType::RTSP(stream) => {
stream.start();
}
Expand Down Expand Up @@ -139,6 +142,7 @@ pub fn remove_stream(stream_name: &str) -> Result<(), SimpleError> {
fn get_stream_mavtype(stream_type: &StreamType) -> Option<mavlink::common::VideoStreamType> {
match stream_type {
StreamType::UDP(_) => Some(mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_RTPUDP),
StreamType::TCP(_) => None,
StreamType::RTSP(_) => Some(mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_RTSP),
StreamType::REDIRECT(video_strem_redirect) => match video_strem_redirect.scheme.as_str() {
"rtsp" => Some(mavlink::common::VideoStreamType::VIDEO_STREAM_TYPE_RTSP),
Expand Down
1 change: 1 addition & 0 deletions src/stream/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ pub mod stream_backend;
pub mod types;
pub mod video_stream_redirect;
pub mod video_stream_rtsp;
pub mod video_stream_tcp;
pub mod video_stream_udp;
pub mod video_stream_webrtc;
pub mod webrtc;
17 changes: 16 additions & 1 deletion src/stream/stream_backend.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
use super::types::*;
use super::video_stream_redirect::VideoStreamRedirect;
use super::video_stream_rtsp::VideoStreamRtsp;
use super::video_stream_tcp::VideoStreamTcp;
use super::video_stream_udp::VideoStreamUdp;
use super::video_stream_webrtc::VideoStreamWebRTC;
use super::webrtc::utils::{is_webrtcsink_available, webrtc_usage_hint};
Expand Down Expand Up @@ -104,7 +105,7 @@ fn check_scheme_and_encoding_compatibility(
};
} else {
match scheme {
"udp" | "rtsp" | "webrtc" | "stun" | "turn" | "ws" => (), // No encoding restrictions for these schemes.
"udp" | "tcp" | "rtsp" | "webrtc" | "stun" | "turn" | "ws" => (), // No encoding restrictions for these schemes.
"udp265" => {
if VideoEncodeType::H265 != encode {
return Err(SimpleError::new(format!("Endpoint with \"udp265\" scheme only supports H265 encode. Encode: {encode:?}, Endpoints: {endpoints:#?}")));
Expand Down Expand Up @@ -196,6 +197,19 @@ fn create_rtsp_stream(
)?))
}

fn create_tcp_stream(
video_and_stream_information: &VideoAndStreamInformation,
) -> Result<StreamType, SimpleError> {
let endpoints = &video_and_stream_information.stream_information.endpoints;

check_for_host_and_port(endpoints)?;
check_for_multiple_endpoints(endpoints)?;

Ok(StreamType::TCP(VideoStreamTcp::new(
video_and_stream_information,
)?))
}

fn create_redirect_stream(
video_and_stream_information: &VideoAndStreamInformation,
) -> Result<StreamType, SimpleError> {
Expand Down Expand Up @@ -277,6 +291,7 @@ fn create_stream(
.unwrap();
match endpoint.scheme() {
"udp" => create_udp_stream(video_and_stream_information),
"tcp" => create_tcp_stream(video_and_stream_information),
"rtsp" => create_rtsp_stream(video_and_stream_information),
"webrtc" | "stun" | "turn" | "ws" => {
create_webrtc_turn_stream(video_and_stream_information)
Expand Down
7 changes: 5 additions & 2 deletions src/stream/types.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use super::{
stream_backend::StreamBackend, video_stream_redirect::VideoStreamRedirect,
video_stream_rtsp::VideoStreamRtsp, video_stream_udp::VideoStreamUdp,
video_stream_webrtc::VideoStreamWebRTC,
video_stream_rtsp::VideoStreamRtsp, video_stream_tcp::VideoStreamTcp,
video_stream_udp::VideoStreamUdp, video_stream_webrtc::VideoStreamWebRTC,
};
use crate::{
video::types::{FrameInterval, VideoEncodeType},
Expand All @@ -16,6 +16,7 @@ use url::Url;
#[allow(dead_code)]
pub enum StreamType {
UDP(VideoStreamUdp),
TCP(VideoStreamTcp),
RTSP(VideoStreamRtsp),
REDIRECT(VideoStreamRedirect),
WEBRTC(VideoStreamWebRTC),
Expand All @@ -25,6 +26,7 @@ impl StreamType {
pub fn inner(&self) -> &(dyn StreamBackend + '_) {
match self {
StreamType::UDP(backend) => backend,
StreamType::TCP(backend) => backend,
StreamType::RTSP(backend) => backend,
StreamType::REDIRECT(backend) => backend,
StreamType::WEBRTC(backend) => backend,
Expand All @@ -34,6 +36,7 @@ impl StreamType {
pub fn mut_inner(&mut self) -> &mut (dyn StreamBackend + '_) {
match self {
StreamType::UDP(backend) => backend,
StreamType::TCP(backend) => backend,
StreamType::RTSP(backend) => backend,
StreamType::REDIRECT(backend) => backend,
StreamType::WEBRTC(backend) => backend,
Expand Down
52 changes: 52 additions & 0 deletions src/stream/video_stream_tcp.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
use super::{
gst::pipeline_builder::Pipeline, gst::pipeline_runner::PipelineRunner,
stream_backend::StreamBackend,
};

#[derive(Debug)]
#[allow(dead_code)]
pub struct VideoStreamTcp {
pipeline_runner: PipelineRunner,
}

impl VideoStreamTcp {
pub fn new(
video_and_stream_information: &crate::video_stream::types::VideoAndStreamInformation,
) -> Result<Self, simple_error::SimpleError> {
Ok(VideoStreamTcp {
pipeline_runner: PipelineRunner::new(Pipeline::new(video_and_stream_information)?),
})
}
}

impl Drop for VideoStreamTcp {
fn drop(&mut self) {
self.stop();
}
}

impl StreamBackend for VideoStreamTcp {
fn start(&mut self) -> bool {
self.pipeline_runner.start()
}

fn stop(&mut self) -> bool {
self.pipeline_runner.stop()
}

fn restart(&mut self) {
self.pipeline_runner.restart()
}

fn is_running(&self) -> bool {
self.pipeline_runner.is_running()
}

fn pipeline(&self) -> String {
self.pipeline_runner.pipeline()
}

fn allow_same_endpoints(&self) -> bool {
false
}
}

0 comments on commit 12bbaf5

Please sign in to comment.