Skip to content

Commit

Permalink
Merge pull request #99 from pipeless-ai/rtsp
Browse files Browse the repository at this point in the history
feat: Support for RTSP output
  • Loading branch information
miguelaeh committed Dec 8, 2023
2 parents eb69d12 + f03df6c commit d41fc9f
Show file tree
Hide file tree
Showing 13 changed files with 141 additions and 37 deletions.
4 changes: 2 additions & 2 deletions .github/workflows/pipeless-build-and-release.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ jobs:
gstreamer1.0-plugins-good gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-ugly gstreamer1.0-libav gstreamer1.0-tools \
gstreamer1.0-x gstreamer1.0-alsa gstreamer1.0-gl gstreamer1.0-gtk3 \
gstreamer1.0-qt5 gstreamer1.0-pulseaudio
gstreamer1.0-qt5 gstreamer1.0-pulseaudio gstreamer1.0-rtsp
- name: Build
run: |
Expand Down Expand Up @@ -170,7 +170,7 @@ jobs:
gstreamer1.0-plugins-good gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-ugly gstreamer1.0-libav gstreamer1.0-tools \
gstreamer1.0-x gstreamer1.0-alsa gstreamer1.0-gl gstreamer1.0-gtk3 \
gstreamer1.0-qt5 gstreamer1.0-pulseaudio
gstreamer1.0-qt5 gstreamer1.0-pulseaudio gstreamer1.0-rtsp
- name: Crates.io
run: cargo publish --manifest-path pipeless/Cargo.toml --token ${CRATES_IO_TOKEN}
Expand Down
2 changes: 1 addition & 1 deletion examples/yolo/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ def hook(frame, context):
model = context['model']
input_fps = frame['fps']
delay = time.time() - frame['input_ts']
if delay > 1 / input_fps:
if input_fps > 0 and delay > 1 / input_fps:
print('Skipping frame to maintain real-time')
else:
prediction = next(model(rgb_frame, stream=True))
Expand Down
2 changes: 1 addition & 1 deletion install.sh
Original file line number Diff line number Diff line change
Expand Up @@ -205,7 +205,7 @@ libgstreamer-plugins-bad1.0-dev gstreamer1.0-plugins-base \
gstreamer1.0-plugins-good gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-ugly gstreamer1.0-libav gstreamer1.0-tools \
gstreamer1.0-x gstreamer1.0-alsa gstreamer1.0-gl gstreamer1.0-gtk3 \
gstreamer1.0-qt5 gstreamer1.0-pulseaudio"
gstreamer1.0-qt5 gstreamer1.0-pulseaudio gstreamer1.0-rtsp"
elif [ "${OS}" == "darwin" ]; then
echo "brew install gstreamer"
fi
Expand Down
2 changes: 1 addition & 1 deletion package/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ RUN install_packages \
gstreamer1.0-plugins-good gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-ugly gstreamer1.0-libav gstreamer1.0-tools \
gstreamer1.0-x gstreamer1.0-alsa gstreamer1.0-gl gstreamer1.0-gtk3 \
gstreamer1.0-qt5 gstreamer1.0-pulseaudio \
gstreamer1.0-qt5 gstreamer1.0-pulseaudio gstreamer1.0-rtsp \
# Gstreamer deps
libcairo2-dev libgirepository1.0-dev \
# Python
Expand Down
2 changes: 1 addition & 1 deletion package/Dockerfile-cuda
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ RUN install_packages \
gstreamer1.0-plugins-good gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-ugly gstreamer1.0-libav gstreamer1.0-tools \
gstreamer1.0-x gstreamer1.0-alsa gstreamer1.0-gl gstreamer1.0-gtk3 \
gstreamer1.0-qt5 gstreamer1.0-pulseaudio \
gstreamer1.0-qt5 gstreamer1.0-pulseaudio gstreamer1.0-rtsp \
# Gstreamer deps
libcairo2-dev libgirepository1.0-dev \
# Python
Expand Down
2 changes: 1 addition & 1 deletion package/Dockerfile-tensorrt
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ RUN install_packages \
gstreamer1.0-plugins-good gstreamer1.0-plugins-bad \
gstreamer1.0-plugins-ugly gstreamer1.0-libav gstreamer1.0-tools \
gstreamer1.0-x gstreamer1.0-alsa gstreamer1.0-gl gstreamer1.0-gtk3 \
gstreamer1.0-qt5 gstreamer1.0-pulseaudio \
gstreamer1.0-qt5 gstreamer1.0-pulseaudio gstreamer1.0-rtsp \
# Gstreamer deps
libcairo2-dev libgirepository1.0-dev \
# Python
Expand Down
54 changes: 53 additions & 1 deletion pipeless/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 2 additions & 1 deletion pipeless/Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pipeless-ai"
version = "1.2.1"
version = "1.3.0"
edition = "2021"
authors = ["Miguel A. Cabrera Minagorri"]
description = "An open-source computer vision framework to build and deploy applications in minutes"
Expand Down Expand Up @@ -44,6 +44,7 @@ sled = "0.34.7"
lazy_static = "1.4.0"
rayon = "1.8.0"
num_cpus = "1.16.0"
gstreamer-rtsp = "0.21.0"

[dependencies.uuid]
version = "1.4.1"
Expand Down
2 changes: 1 addition & 1 deletion pipeless/src/config/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -304,4 +304,4 @@ mod tests {
let not_found_entry = table.find_by_pipeline_id(non_existent_pipeline_id);
assert_eq!(not_found_entry, None);
}
}
}
2 changes: 1 addition & 1 deletion pipeless/src/dispatcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,4 @@ pub fn start(
}
}).await;
});
}
}
5 changes: 1 addition & 4 deletions pipeless/src/input/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -103,10 +103,7 @@ fn on_new_sample(
gst::ClockTime::ZERO
}
};
let duration = buffer.duration().ok_or_else(|| {
error!("Unable to get duration");
gst::FlowError::Error
})?;
let duration = buffer.duration().or(Some(gst::ClockTime::from_mseconds(0))).unwrap();
let buffer_info = buffer.map_readable().or_else(|_| {
error!("Unable to extract the info from the sample buffer.");
Err(gst::FlowError::Error)
Expand Down
94 changes: 74 additions & 20 deletions pipeless/src/output/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ fn set_pipeline_tags(pipeline: &gst::Pipeline, new_tag_list: &gst::TagList) {
debug!("Updating pipeline with new tags: {}", new_tag_list.to_string());
// NOTE: We always must have an element called taginject on the pipeline
match pipeline.by_name("taginject") {
None => warn!("Taginject element not found in the pipeline"),
None => debug!("Taginject element not found in the pipeline"),
Some(t_inject) => {
t_inject.set_property(
"tags",
Expand Down Expand Up @@ -159,6 +159,37 @@ fn create_processing_bin(stream: &StreamDef) -> Result<gst::Bin, OutputPipelineE
let ghostpath_src = gst::GhostPad::with_target(&muxer_src_pad)
.map_err(|_| { OutputPipelineError::new("Unable to create the ghost pad to link bin") })?;
bin.add_pad(&ghostpath_src).map_err(|_| { OutputPipelineError::new("Unable to add ghostpad source") })?;
} else if stream.video.get_protocol() == "rtsp" {
let videoconvert = pipeless::gst::utils::create_generic_component(
"videoconvert", "videoconvert")?;
let capsfilter = pipeless::gst::utils::create_generic_component(
"capsfilter", "capsfilter")?;
let encoder = pipeless::gst::utils::create_generic_component(
"x264enc", "encoder")?;
bin.add_many([
&videoconvert, &capsfilter, &encoder
]).map_err(|_| { OutputPipelineError::new("Unable to add elements to processing bin") })?;

let caps = gst::Caps::from_str("video/x-h264,profile=baseline")
.map_err(|_| { OutputPipelineError::new("Unable to create caps from provided string") })?;
capsfilter.set_property("caps", caps);

videoconvert.link(&encoder)
.map_err(|_| { OutputPipelineError::new("Unable to link videoconvert to encoder") })?;
encoder.link(&capsfilter)
.map_err(|_| { OutputPipelineError::new("Unable to link encoder to capsfilter") })?;

// Ghost pads to be able to plug other components to the bin
let videoconvert_sink_pad = videoconvert.static_pad("sink")
.ok_or_else(|| { OutputPipelineError::new("Failed to create the pipeline. Unable to get videoconvert sink pad.") })?;
let ghostpath_sink = gst::GhostPad::with_target(&videoconvert_sink_pad)
.map_err(|_| { OutputPipelineError::new("Unable to create the sink ghost pad to link bin") })?;
bin.add_pad(&ghostpath_sink).map_err(|_| { OutputPipelineError::new("Unable to add ghostpad sink") })?;
let capsfilter_src_pad = capsfilter.static_pad("src")
.ok_or_else(|| { OutputPipelineError::new("Failed to create the pipeline. Unable to get capsfilter source pad.") })?;
let ghostpath_src = gst::GhostPad::with_target(&capsfilter_src_pad)
.map_err(|_| { OutputPipelineError::new("Unable to create the ghost pad to link bin") })?;
bin.add_pad(&ghostpath_src).map_err(|_| { OutputPipelineError::new("Unable to add ghostpad source") })?;
} else if stream.video.get_protocol() == "screen" {
let queue1 = pipeless::gst::utils::create_generic_component(
"queue", "queue1")?;
Expand Down Expand Up @@ -207,9 +238,18 @@ fn create_sink(stream: &StreamDef) -> Result<gst::Element, BoolError> {
return match stream.video.get_protocol() {
// TODO: implement processing bin for all the below protocols
"file" => get_sink("filesink", Some(location)),
"https" => get_sink("souphttpsink", Some(location)),
"rtmp" => get_sink("rtmpsink", Some(format!("rtmp://{}", location).as_ref())),
"rstp" => get_sink("rtspclientsink", Some(location)),
"rtsp" => {
let sink = get_sink("rtspclientsink", Some(format!("rtsp://{}", location).as_ref()))?;
// TODO: allow the user to configure the profiles per stream (not globally to pipeless)
// sink.set_property("profiles", gstreamer_rtsp::RTSPProfile::AVPF);

// TODO: allow the user to configure the protocols
// Using TCP by default because UDP split frames to MTU size and may produce problems in some multimedia servers.
// Also, UDP may produce errors when there are NATs between the client and server
sink.set_property("protocols", gstreamer_rtsp::RTSPLowerTrans::TCP);
Ok(sink)
},
"screen" => get_sink("autovideosink", None),
_ => {
warn!("unsupported output protocol, defaulting to screen");
Expand Down Expand Up @@ -284,7 +324,7 @@ fn on_bus_message(

fn create_gst_pipeline(
output_stream_def: &StreamDef,
caps: &str
caps: &str,
) -> Result<(gst::Pipeline, gst::BufferPool), OutputPipelineError> {
let pipeline = gst::Pipeline::new();
let input_stream_caps = gst::Caps::from_str(caps)
Expand Down Expand Up @@ -415,7 +455,11 @@ impl Pipeline {
Ok(())
}

pub fn on_new_frame(&self, frame: pipeless::data::Frame) -> Result<(), OutputPipelineError>{
pub fn on_new_frame(
&self,
frame: pipeless::data::Frame,
pipeless_bus_sender: &tokio::sync::mpsc::UnboundedSender<pipeless::events::Event>
) -> Result<(), OutputPipelineError>{
match frame {
pipeless::data::Frame::RgbFrame(mut rgb_frame) => {
let modified_pixels = rgb_frame.get_modified_pixels();
Expand All @@ -442,20 +486,30 @@ impl Pipeline {
// data_slice.par_iter_mut().for_each(|byte| {
// *byte = (*byte + 1) % 256;
// });
gst_buffer_mut.copy_from_slice(0, out_frame_data)
.map_err(|_| { OutputPipelineError::new("Unable to copy slice into buffer") })?;

if copy_timestamps {
let pts = rgb_frame.get_pts();
let dts = rgb_frame.get_dts();
let duration = rgb_frame.get_duration();
gst_buffer_mut.set_pts(pts);
gst_buffer_mut.set_dts(dts);
gst_buffer_mut.set_duration(duration);
}

if let Err(err) = appsrc.push_buffer(gst_buffer) {
return Err(OutputPipelineError::new(&format!("Failed to send the output buffer: {}", err)));
if out_frame_data.len() > gst_buffer_mut.size() {
warn!("
The frame produced is bigger than the buffer.
This may happen if your input stream changes the frame size in the input capabilitites.
Stopping pipeline. If the pipeline is set to automatically restart it will be recreated with the new capabilities.
");
// If the pipeline is set to restart automatically, after the error, a new one will be created
pipeless::events::publish_output_stream_error_event_sync(pipeless_bus_sender, "The size of the input frame has changed.");
} else {
gst_buffer_mut.copy_from_slice(0, out_frame_data)
.map_err(|_| { OutputPipelineError::new("Unable to copy slice into buffer") })?;

if copy_timestamps {
let pts = rgb_frame.get_pts();
let dts = rgb_frame.get_dts();
let duration = rgb_frame.get_duration();
gst_buffer_mut.set_pts(pts);
gst_buffer_mut.set_dts(dts);
gst_buffer_mut.set_duration(duration);
}

if let Err(err) = appsrc.push_buffer(gst_buffer) {
return Err(OutputPipelineError::new(&format!("Failed to send the output buffer: {}", err)));
}
}
}
}
Expand All @@ -466,7 +520,7 @@ impl Pipeline {
pub fn on_new_tags(&self, new_tags: gst::TagList) {
let merged_tags = new_tags;
match self.gst_pipeline.by_name("taginject") {
None => warn!("Taginject element not found, skipping tags update."),
None => debug!("Taginject element not found, skipping tags update."),
Some(_t_inject) => {
// FIXME: Gstreamer bug taginject tags are not readable when they should.
// Uncomment the following 2 lines when fixed to update tags properly.
Expand Down
4 changes: 2 additions & 2 deletions pipeless/src/pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ impl Manager {
let read_guard = rw_pipeline.read().await;
match &read_guard.output_pipeline {
Some(pipe) => {
if let Err(err) = pipe.on_new_frame(out_frame) {
if let Err(err) = pipe.on_new_frame(out_frame, &pipeless_bus_sender) {
error!("{}", err);
}
}
Expand Down Expand Up @@ -339,4 +339,4 @@ impl Manager {
let pipeline_id = read_guard.id;
pipeline_id
}
}
}

0 comments on commit d41fc9f

Please sign in to comment.