# Ds_infer_pipeline

Class to build different pipelines. We need a python class that will deal with the deepstream pipeline, number of cameras, output video file, and output object file.

This will simplify the rest of the positrack3 code.

Sources:
One or several cameras (usb or CSI)
One or several files

Pre-inference image processing 
Crop, resize

Infer
Any unet model

File Outputs
1. Video file with one or more video sources
2. File with object position for each frame with timestamps. Use CV2 blob détection, time, X, y, prob per object per frame. 

Display output.
Display with blob.

Config files for :
1. Inference engine
2. Sources and preprocessing
3. Blob processing
4. Output file format.

Yalm file for pipeline configuration.

Expose the model output for a batch when it becomes available.

Work for any unet network. 

We just need a way so that it makes the new output tensor, frame no, time stamp available when a batch is processed. 

The code outside of this class can deal with model specific computations (animal position, hd), ttl pulses and ROS posting. Similar to what the current positrack2 programs.

With this design, we can change the tracking process without having to change much code.


In [1]:
import sys
sys.path.append('../../apps/')
import gi
import math
import ctypes

gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from common.is_aarch_64 import is_aarch64
from common.bus_call import bus_call
import cv2
import pyds
import numpy as np
import os.path
import yaml

In [13]:
class Deepstream_pipeline:
    """
    Class use to build a Gstreamer/Deepstream pipeline.
    The configuration of the pipeline is described in a few config files.
    The inference_config_file is in a format prescribed by NVIDIA Deepstream.
    The other config files are yalm file that were develop to work with this class
    
    This class should allow some flexibility when building the pipeline so that we can use the same class across different experiments.
    
    Here is an idea of what this class deal with.
    
    Source:
    Can be one or several cameras
    Can be one or several video files
    
    Preprocessing:
    Crop, rotate and resize operations
    
    
    Inference:
    TensorRT model
    
    
    Model output processing:
    Steps apply to the model output to obtain the position of objects
    
    
    Output:
    Window with images
    File with position of object in each video frames
    File with the images processed
    
    """
    def __init__(self,
                 pipeline_config_file):
    
        self.pipeline_config_file = pipeline_config_file
       
        
        if not os.path.exists(self.pipeline_config_file):
            raise IOError(f"{self.pipeline_config_file} missing")

        self.load_pipeline_config()

         # Standard GStreamer initialization
        Gst.init(None)

        # Create gstreamer elements
        # Create Pipeline element that will form a connection of other elements
        print("Creating Pipeline \n ")
        self.pipeline = Gst.Pipeline()

        if not self.pipeline:
            raise ValueError(" Unable to create Pipeline \n")

        
        # add all the elements in the pipeline
        self.add_source_elements()
        self.add_streammux()
        self.add_inference()  
        self.add_output_elements()
        
        # link all elements of the pipeline
        self.link_pipeline_elements()
        
        
        loop = GLib.MainLoop()
        bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect ("message", bus_call, loop)
        
        self.pipeline.set_state(Gst.State.PLAYING)
        try:
            loop.run()
        except:
            pass
        # cleanup
        print("Cleanup")
        self.pipeline.set_state(Gst.State.NULL)
        
   
    def link_pipeline_elements(self):
        """
        Link the elements of the pipeline
        
        After adding all elements to the pipeline, we need to link them
        
        """
        self.link_sources() # link the source elements, and the sources to the streammux
    
        self.streammux.link(self.segmentation)
            
        if self.output_number > 1:
            # link the segmentation to the tee
            pass
        
        self.link_outputs()
   

    def link_sources(self):
        """
        Link the elements of the sources
        """
        for i in range(self.source_number):
            s = self.source_list[i]
            if s["type"]=="usb":
                print(f"Link source {i} usb camera")
                ## add elements to the pipeline
                s["source"].link(s["caps_v4l2src"])
                s["caps_v4l2src"].link(s["vidconvsrc"])
                s["vidconvsrc"].link(s["nvvidconvsrc"])
                s["nvvidconvsrc"].link(s["caps_vidconvsrc"])
                
                ## link the source to the streammux
                s["caps_vidconvsrc_srcpad"] = s["caps_vidconvsrc"].get_static_pad("src")
                if not s["caps_vidconvsrc_srcpad"]:
                    raise ValueError("Unable to get source pad of caps_vidconvsrc \n")
                
                self.streammux_sinkpad = self.streammux.get_request_pad("sink_0")
                if not self.streammux_sinkpad:
                    raise ValueError("Unable to get the sink pad of streammux \n")

                
                
                s["caps_vidconvsrc_srcpad"].link(self.streammux_sinkpad)
    
    def link_outputs(self):
        """
        Link the elements of the outputs
        """
        
        if self.output_number > 1:
            self.segmentation.link(self.output_tee)
        
        
        for i in range(self.output_number):
            out = self.output_list[i]
            
            if out["type"] == "video_file":
                print(f"Linking video_file {i}")
                if self.output_number > 1:
                    
                    tee_file_pad = self.output_tee.get_request_pad('src_%u')
                    if not tee_file_pad:
                        raise ValueError("Unable to get request tee pads\n")
                    sink_queue_pad = out["queue"].get_static_pad('sink')
                    if not sink_queue_pad:
                        raise ValueError("Unable to get request sink_queue1 pad\n")
                    tee_file_pad.link(sink_queue_pad)
                
                else:
                    ## link to the segmentation
                    print("Link to self.segmentation")
                    self.segmentation.link(out["queue"])
                
                out["queue"].link(out["nvvidconv"])
                out["nvvidconv"].link(out["caps_vidconv"])
                out["caps_vidconv"].link(out["nvvenc"])
                out["nvvenc"].link(out["parse"])
                out["parse"].link(out["mux"])
                out["mux"].link(out["file_sink"])
            
            if out["type"] == "on-screen-display":
                print(f"Linking on-screen-display {i}")
                if self.output_number > 1:
                    
                    tee_osd_pad = self.output_tee.get_request_pad('src_%u')
                    if not tee_osd_pad:
                        raise ValueError("Unable to get request tee pads\n")
                    sink_queue_pad = out["queue"].get_static_pad('sink')
                    if not sink_queue_pad:
                        raise ValueError("Unable to get request sink_queue1 pad\n")
                    tee_osd_pad.link(sink_queue_pad)
                
                else:
                    ## link to the segmentation
                    print("Link to self.segmentation")
                    self.segmentation.link(out["queue"])
                
                out["queue"].link(out["nvosd"])
                out["nvosd"].link(out["sink_osd"])
                
                
    
    def add_streammux(self):
        """
        Add a streammux before the inference element
        """
        self.streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
        if not self.streammux:
            raise ValueError(" Unable to create NvStreamMux \n")
        
       
        
        self.streammux.set_property("nvbuf-memory-type", int(pyds.NVBUF_MEM_CUDA_DEVICE))
        self.streammux.set_property('width', self.streammux_properties["width"])
        self.streammux.set_property('height', self.streammux_properties["height"])
        self.streammux.set_property('batch-size', self.streammux_properties["batch-size"])
        self.streammux.set_property('batched-push-timeout', self.streammux_properties["batched-push-timeout"])
        self.streammux.set_property('live-source', self.streammux_properties["live-source"]) # Essential to get more than 1 Hz
    
        
        self.pipeline.add(self.streammux)
     
    def add_inference(self):
        """
        Add the inference element to the pipeline
        """
        # Create segmentation for primary inference
        self.segmentation = Gst.ElementFactory.make("nvinfer", "primary-nvinference-engine")
        
        # Get configuration from the config file
        if not self.segmentation:
            raise ValueError("Unable to create primary inferene\n")
        self.segmentation.set_property('config-file-path', self.inference_config_file)
        pgie_batch_size = self.segmentation.get_property("batch-size")
        
        if pgie_batch_size != self.source_number:
            print("WARNING: Overriding infer-config batch-size", pgie_batch_size,
                  " with number of sources ", num_sources,
                  " \n")
            self.segmentation.set_property("batch-size", num_sources)
        
        self.pipeline.add(self.segmentation)
        
        
        
    def add_source_elements(self):
        """
        Add source elements to the pipeline
        
        """
        
        
        for i in range(self.source_number):
            s = self.source_list[i]
            if s["type"]=="usb":
                
                ## create elements
                s["source"] = Gst.ElementFactory.make("v4l2src", f"usb-cam-source{i}")
                if not s["source"]:
                    raise ValueError(" Unable to create Source \n")
                    
                s["caps_v4l2src"] = Gst.ElementFactory.make("capsfilter", f"v4l2src_caps{i}")
                if not s["caps_v4l2src"]:
                    raise ValueError(" Unable to create caps_v4l2src\n")
                
                s["vidconvsrc"] = Gst.ElementFactory.make("videoconvert", f"convertor_src{i}")
                if not s["vidconvsrc"]:
                    raise ValueError(" Unable to create vidconvsrc\n")
                
                
                s["nvvidconvsrc"] = Gst.ElementFactory.make("nvvideoconvert", f"nvvidconv_src{i}")
                if not s["nvvidconvsrc"]:
                    raise ValueError(" Unable to create nvvidconvsrc\n") 
                
                s["caps_vidconvsrc"] = Gst.ElementFactory.make("capsfilter", f"nvmm_caps{i}")
                if not s["caps_vidconvsrc"]:
                    raise ValueError(" Unable to create caps_vidconvsrc\n")
                    
                    
                
                    
                    
                ## set properties
                fr = s["framerate"]
                s["caps_v4l2src"].set_property('caps', Gst.Caps.from_string(f"video/x-raw, framerate={fr}/1"))
                s["caps_vidconvsrc"].set_property('caps', Gst.Caps.from_string("video/x-raw(memory:NVMM)"))
                s["source"].set_property('device', s["path"])
                
                ## add elements to the pipeline
                self.pipeline.add(s["source"])
                self.pipeline.add(s["caps_v4l2src"])
                self.pipeline.add(s["vidconvsrc"])
                self.pipeline.add(s["nvvidconvsrc"])
                self.pipeline.add(s["caps_vidconvsrc"])
    
    

    def add_output_elements(self):
        """
        Add output element to the pipeline
        """
        if self.output_number > 1:
            # we will need a tee
            self.output_tee =  Gst.ElementFactory.make("tee", "tee1")
            if not self.output_tee:
                raise ValueError(" Unable to create tee\n")
            self.pipeline.add(self.output_tee)
            
        
        for i in range(self.output_number):
            out = self.output_list[i]
            if out["type"] == "video_file":
                ## Create part of the pipeline to save a video file
                
                out["queue"] =  Gst.ElementFactory.make("queue", f"queue_output{i}")
                if not out["queue"]:
                    raise ValueError("Unable to create queue\n")

                out["nvvidconv"] = Gst.ElementFactory.make("nvvideoconvert", f"convertor_output{i}")
                if not out["nvvidconv"]:
                    raise ValueError("Unable to create nvvideoconvert\n")

                out["caps_vidconv"] = Gst.ElementFactory.make("capsfilter", f"nvmm_caps_output{i}")
                if not out["caps_vidconv"]:
                    raise ValueError("Unable to create capsfilter\n")

                out["nvvenc"] = Gst.ElementFactory.make("nvv4l2h264enc", f"enc_output{i}")
                if not out["nvvenc"]:
                    raise ValueError("Unable to create nvvenc\n")

                out["parse"] = Gst.ElementFactory.make("h264parse", f"parse_output{i}")
                if not out["parse"]:
                    raise ValueError("Unable to create parse\n")

                out["mux"] =  Gst.ElementFactory.make("matroskamux", f"mux_output{i}")
                if not out["mux"]:
                    raise ValueError("Unable to create mux\n")

                out["file_sink"] =  Gst.ElementFactory.make("filesink", f"fs_output{i}")
                if not out["file_sink"]:
                    raise ValueError(" Unable to create file_sink\n")

                width=out["width"]
                height=out["height"]
                out["caps_vidconv"].set_property('caps', 
                                                 Gst.Caps.from_string(f'video/x-raw(memory:NVMM),width={width}, height={height}'))
                
                
                out["nvvenc"].set_property('bitrate', out["bitrate"])
                out["file_sink"].set_property("location", out["default-file-name"])

                
                self.pipeline.add(out["queue"])
                self.pipeline.add(out["nvvidconv"])
                self.pipeline.add(out["caps_vidconv"])
                self.pipeline.add(out["nvvenc"])
                self.pipeline.add(out["parse"])
                self.pipeline.add(out["mux"])
                self.pipeline.add(out["file_sink"])

            if out["type"] == "on-screen-display":
                
                out["queue"] =  Gst.ElementFactory.make("queue", f"queue_output{i}")
                if not out["queue"]:
                    raise ValueError("Unable to create queue\n")
                
                out["nvosd"] = Gst.ElementFactory.make("nvdsosd", f"onscreendisplay_output{i}")
                if not out["nvosd"]:
                    raise ValueError("Unable to create nvosd\n")
                    
                out["sink_osd"] = Gst.ElementFactory.make("nveglglessink", f"nvvideo-renderer-osd_output{i}")
                if not out["sink_osd"]:
                    raise ValueError("Unable to create egl sink_seg \n")
                
                
                out["sink_osd"].set_property("qos", 0)
                
                self.pipeline.add(out["queue"])
                self.pipeline.add(out["nvosd"])
                self.pipeline.add(out["sink_osd"])
                
        
                        
    def load_pipeline_config(self):
        """
        Load the configuration from a config.yaml file
        """
        if os.path.exists(self.pipeline_config_file):
            print("Loading",self.pipeline_config_file)
            with open(self.pipeline_config_file) as file:
                self.configDict = yaml.full_load(file)
    
    
        # process sources
        print(self.configDict)
        self.source_number=self.configDict["source_number"]
        self.source_list = []
        for i in range(self.source_number):
            self.source_list.append({"path":self.configDict[f"source{i}_path"],
                                     "type":self.configDict[f"source{i}_type"],
                                     "framerate":self.configDict[f"source{i}_framerate"]})
        # process streammux
        self.streammux_properties = {"width":self.configDict["streammux_width"],
                                     "height":self.configDict["streammux_height"],
                                     "batch-size":self.configDict["streammux_batch-size"],
                                     "batched-push-timeout":self.configDict["streammux_batched-push-timeout"],
                                     "live-source":self.configDict["streammux_live-source"]}
        # inference
        self.inference_config_file = self.configDict["inference_config_file"]
        
        # output
        self.output_number = self.configDict["output_number"]
        self.output_list = []
        for i in range(self.output_number):
            self.output_list.append({"type":self.configDict[f"output{i}_type"]})
            # type specific fields
            if self.output_list[-1]["type"] == "video_file":
                self.output_list[-1]["width"] = self.configDict[f"output{i}_width"],
                self.output_list[-1]["height"] = self.configDict[f"output{i}_height"]
                self.output_list[-1]["bitrate"] = self.configDict[f"output{i}_bitrate"]
                self.output_list[-1]["default-file-name"] = self.configDict[f"output{i}_default-file-name"]
            
            
        

Start with a minimal example

* 1 usb camera
* facetrack
* save to file


In [18]:
pipe = Deepstream_pipeline(pipeline_config_file="pipeline_config.yalm")

Loading pipeline_config.yalm
{'source_number': 1, 'source0_path': '/dev/video0', 'source0_type': 'usb', 'source0_framerate': 30, 'streammux_width': 1920, 'streammux_height': 1080, 'streammux_batch-size': 1, 'streammux_batched-push-timeout': 4000, 'streammux_live-source': 1, 'inference_config_file': 'inference_config.txt', 'output_number': 2, 'output0_type': 'video_file', 'output0_width': 640, 'output0_height': 480, 'output0_bitrate': 4000000, 'output0_default-file-name': 'default_video.mp4', 'output1_type': 'on-screen-display'}
Creating Pipeline 
 
Link source 0 usb camera
Linking video_file 0
Linking on-screen-display 1


53:31:53.202262568 [332m 3499[00m      0x2c9d360 [36mINFO   [00m [00m             nvinfer gstnvinfer.cpp:646:gst_nvinfer_logger:<primary-nvinference-engine>[00m NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::deserializeEngineAndBackend() <nvdsinfer_context_impl.cpp:1909> [UID = 1]: deserialized trt engine from :/home/kevin/repo/DeepStreamTutorials/models/serialized_UNet_engine.trt
ERROR: [TRT]: 3: Cannot find binding of given name: softmax_1
53:31:53.227715606 [332m 3499[00m      0x2c9d360 [36mINFO   [00m [00m             nvinfer gstnvinfer.cpp:646:gst_nvinfer_logger:<primary-nvinference-engine>[00m NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::generateBackendContext() <nvdsinfer_context_impl.cpp:2012> [UID = 1]: Use deserialized engine model: /home/kevin/repo/DeepStreamTutorials/models/serialized_UNet_engine.trt
53:31:53.239029301 [332m 3499[00m      0x2c9d360 [36mINFO   [00m [00m             nvinfer gstnvinfer_impl.cpp:328:notifyLoadModelStatus:<

Cleanup
