From 319347aa9fce4f9cfa785398fd5fce051f3ef8af Mon Sep 17 00:00:00 2001 From: Anand Sivaram Palassery Date: Thu, 23 Jun 2022 10:43:49 +0530 Subject: [PATCH] ion-sfu-rtp example - This example code is used to interact with external RTP source and destination. - Extra gstreamer script is provided to simulate the RTP source and destination. - Multicast gstreamer is used in order to minimize the encoding load on the computer. - This application could also be used to create multiple call load on the SFU. --- example/ion-sfu-rtp/README.md | 116 +++++++ example/ion-sfu-rtp/gst.sh | 277 ++++++++++++++++ example/ion-sfu-rtp/main.go | 597 ++++++++++++++++++++++++++++++++++ example/ion-sfu-rtp/run.sh | 109 +++++++ 4 files changed, 1099 insertions(+) create mode 100644 example/ion-sfu-rtp/README.md create mode 100755 example/ion-sfu-rtp/gst.sh create mode 100644 example/ion-sfu-rtp/main.go create mode 100755 example/ion-sfu-rtp/run.sh diff --git a/example/ion-sfu-rtp/README.md b/example/ion-sfu-rtp/README.md new file mode 100644 index 00000000..0e97632d --- /dev/null +++ b/example/ion-sfu-rtp/README.md @@ -0,0 +1,116 @@ +# ION-SFU RTP + +## Features +This application can take RTP from remote and forward received RTP from SFU to remote. +- RTP streams could be generated from anywhere. +- For simplicity, separate gstreamer scripts are used as RTP source and destination. +- All communication with gstreamer are with udpsrc and udpsink on either loopback, unicast or multicast. +- Instead of gstreamer, any other RTP streams could be used, including from IP Camera, generated by VLC etc. +- This application can also manage two streams - first normal stream and the second presentation screen share simulation stream. +- Received peer RTP streams from SFU could be forwarded to a decoding gstreamer pipeline to decode audio and video for both first stream and the remote presentation screen share stream. +- Every incoming RTP stream and track is analyzed for packet losses. This is applicable for both the gstreamer generated local RTP streams and the SFU forwarded remote RTP streams. +- All the streams and tracks related stats are printed periodically (stat_interval in main.go) and during application exit with SIGINT or SIGTERM signals. +- This also joins the room and periodically sends messages to all other clients. +- There is a run script included so that multiple ion-sfu-rtp client processes could be started to simulate load on SFU. + +## Requirement +- Currently this example is tested on Debian GNU/Linux and Ubuntu only. + +### gstreamer +- Install gstreamer + +`sudo apt-get install libgstreamer1.0-dev libgstreamer-plugins-base1.0-dev gstreamer1.0-plugins-good gstreamer1.0-plugins-bad gstreamer1.0-plugins-ugly gstreamer1.0-tools` + +### ion +- Install the ion servers as given at +[https://pionion.github.io](https://pionion.github.io/) + + +### ion-app-web +- Install [ion-app-web](https://github.com/pion/ion-app-web) +- Start the development server using `npm start` +- Better to have a camera and mic for the browser client connecting to this server. + +## Use cases + +### Command line help +- To get command line of different applications, open a shell and try the following +``` +./gst.sh +go run main.go -help +./run.sh +``` +- The **default codecs are VP8 and Opus**. In order to use H.264 codec, do export **VCODEC environment**. This has to be used for each shell before starting the script and the application. +``` +export VCODEC=H264 +``` + +### Basic gstreamer encode and decode +- As a basic step, make sure that gstreamer scripts are working by entering the following commands on **two** different shells. + +``` +./gst.sh -e 2 +./gst.sh -d 2 225.0.0.225 21000 +``` +- Two separate gstreamer video displays should be opened. The first one with [SMPTE pattern](https://en.wikipedia.org/wiki/SMPTE_color_bars) and the second one with a moving ball. +- This also produces an audio [acoustics beat](https://en.wikipedia.org/wiki/Beat_(acoustics)) with 1Hz audible beat frequency. The beat is useful to understand that two audio streams are coming properly. + +### Basic call with a browser client +- open a browser client to connect to the ion-app-web npm server. Join the room. +- Open **two** different shells and run the following commands in any order. +``` +./gst.sh -e 1 +go run main.go -addr "localhost:5551" -session "ion" +``` +- Now the browser client should display the SMPTE pattern with a single 1000 Hz tone. + +### Call with a browser client with ion-sfu-rtp sending screen share also +- Join the room from the browser client +- Open **two** different shells and run the following commands in any order. +``` +./gst.sh -e 2 +go run main.go -addr "localhost:5551" -session "ion" -nr_stream 2 +``` +- Now the browser client should display both the SMPTE pattern and the moving ball with the audible beat. +- The screen share stream is started after a delay (delay_stream in main.go code) + +### Call with a browser client with both ion-sfu-rtp and the browser sending screen share +- Open **three** different shells and run the following commands in any order. +``` +./gst.sh -e 2 +./gst.sh -d 2 +go run main.go -addr "localhost:5551" -session "ion" -nr_stream 2 -rtp_fwd 1 +``` +- Now join the room from the browser client, better to start the browser client after the above commands so that the key video frames are not missed. +- Start a screen share presentation from the browser client +- Now the browser client should display both the SMPTE pattern and the moving ball with the audible beat +- As both RTP forward and the gstreamer decode pipeline are started, two separate video displays should be opened. One for the main camera video from the browser and the second for the screen share. +- Use rtp_fwd only with one client to avoid confusion to the gstreamer decode pipeline. + +### Multiple clients to load the SFU +- Open **two** different shells and run the following commands. +- For example, to start 5 clients with 2 streams per client with session name ion +``` +./gst.sh -e 2 +./run.sh ion 5 2 +./run.sh stop +``` +- Better to start the gstreamer encode script first. +- The run script would spawn multiple ion-sfu-rtp client processes and all of them take the same encoded streams from the multicast ports. +- Now, a browser client also could be opened to monitor all of them. +- It could be stopped with the same run script with stop parameter. + +## Frequently Asked Questions (FAQ) +- Why the gstreamer script is provided separately? + - The ion-sfu-rtp Go application becomes simple, it just listens for RTP packets on UDP ports. + - There is no need to use Cgo at all. + - Instead of gstreamer any other RTP generating application or system could be used. + - The gstreamer script could be easily modified for different scenario. + - With the help of the optional gstreamer decode script, bidirection calls could be easily simulated. +- What is the use of multicast IP in the gstreamer script and the ion-sfu-rtp main application? + - By default both the gstreamer script and the ion-sfu-rtp main application uses 225.0.0.225 multicast group. + - This could be easily overridden by arguments from both the main application and the gstreamer script. + - With multicast group, the gstreamer script could be run on any other computer also. + - Multiple ion-sfu-rtp clients could also be started on different computers. + - With multicast, only one audio and video source and encoder is required, for any number of clients. + diff --git a/example/ion-sfu-rtp/gst.sh b/example/ion-sfu-rtp/gst.sh new file mode 100755 index 00000000..2c153068 --- /dev/null +++ b/example/ion-sfu-rtp/gst.sh @@ -0,0 +1,277 @@ +#!/bin/bash + +#set -x +#set -e + +tx_port_begin=${TX_PORT_BEGIN:-21000} +rx_port_begin=${RX_PORT_BEGIN:-22000} + +# this same default multicast address is used in main.go too +default_ip=${DEFAULT_IP:-225.0.0.225} + +# video parameters, width, height, fps etc. +w=${WIDTH:-640} +h=${HEIGHT:-360} +fps=${FPS:-15} +# video bitrate and audio bitrate +vb=${VB:-256000} +ab=${AB:-20000} +vb_kbps=$((vb / 1000)) + +# use the v4l2-ctl command to find out the formats supported +# v4l2-ctl -d /dev/video0 --list-formats-ext +v4l2_device=${V4L2_DEVICE:-/dev/video0} + +gst='gst-launch-1.0' +#gst="${gst} -v" + +sigint() +{ + echo "Received SIGINT" + sleep 1 + ps -f | grep ${gst} + ps -f | grep ${gst} | awk '{print $2}' | xargs kill +} + +sigterm() +{ + echo "Received SIGTERM" + sleep 1 + ps -f | grep ${gst} + ps -f | grep ${gst} | awk '{print $2}' | xargs kill +} + + +# pattern 0 smpte, 18 moving ball +video_tx_vp8() +{ + echo "video tx vp8" + port=${1} + pattern=${2} + if [ "${SRC}" == "v4l2" ]; + then + videosrc="v4l2src device=${v4l2_device}" + capsrc="video/x-raw,format=YUY2,width=${w},height=${h},framerate=${fps}/1" + else + videosrc="videotestsrc pattern=${pattern}" + capsrc="video/x-raw,format=I420,width=${w},height=${h},framerate=${fps}/1" + fi + ${gst} \ + ${videosrc} ! ${capsrc} ! \ + videoscale ! videorate ! videoconvert ! \ + timeoverlay halignment=center valignment=center ! \ + vp8enc error-resilient=partitions keyframe-max-dist=10 auto-alt-ref=true cpu-used=5 deadline=1 target-bitrate=${vb} ! \ + rtpvp8pay pt=120 ! udpsink host=${remote_ip} port=${port} & +} + +video_tx_h264() { + echo "video tx h264" + port=${1} + pattern=${2} + if [ "${SRC}" == "v4l2" ]; + then + videosrc="v4l2src device=${v4l2_device}" + capsrc="video/x-raw,format=YUY2,width=${w},height=${h},framerate=${fps}/1" + else + videosrc="videotestsrc pattern=${pattern}" + capsrc="video/x-raw,format=I420,width=${w},height=${h},framerate=${fps}/1" + fi + ${gst} \ + ${videosrc} ! ${capsrc} ! \ + videoscale ! videorate ! videoconvert ! \ + timeoverlay halignment=center valignment=center ! \ + x264enc bframes=0 cabac=0 dct8x8=0 speed-preset=ultrafast tune=zerolatency key-int-max=20 bitrate=${vb_kbps} ! video/x-h264,stream-format=byte-stream ! \ + rtph264pay pt=126 ! udpsink host=${remote_ip} port=${port} & +} + +# wave 0 sine, 8 ticks +# when multiple streams are used, select frequency very near so that +# audio beats are there +# this is a way of distinguishing different streams easily. +audio_tx_opus() +{ + echo "audio tx opus" + port=${1} + wave=${2} + freq=${3} + ${gst} \ + audiotestsrc wave=${wave} freq=${freq} ! audioresample ! audio/x-raw,channels=1,rate=48000 ! \ + opusenc bitrate=${ab} ! rtpopuspay pt=109 ! udpsink host=${remote_ip} port=${port} & +} + +video_rx_vp8() +{ + echo "video rx vp8" + port=${1} + ${gst} \ + udpsrc address=${listen_ip} port=${port} \ + caps='application/x-rtp, media=(string)video, clock-rate=(int)90000' ! \ + rtpvp8depay ! vp8dec ! \ + videoconvert ! autovideosink & +} + +video_rx_h264() +{ + echo "video rx h264" + port=${1} + #decoder=decodebin + decoder=openh264dec + ${gst} \ + udpsrc address=${listen_ip} port=${port} \ + caps='application/x-rtp, media=(string)video, clock-rate=(int)90000' ! \ + rtph264depay ! h264parse ! ${decoder} ! \ + videoconvert ! autovideosink +} + +audio_rx_opus() +{ + echo "audio rx opus" + port=${1} + ${gst} \ + udpsrc address=${listen_ip} port=${port} \ + caps="application/x-rtp, media=(string)audio" ! \ + rtpopusdepay ! opusdec ! autoaudiosink & +} + +set_udp_port() +{ + let audio_tx_port1=${tx_port_begin} + let video_tx_port1=${tx_port_begin}+2 + let audio_rx_port1=${rx_port_begin} + let video_rx_port1=${rx_port_begin}+2 + + let audio_tx_port2=${tx_port_begin}+4 + let video_tx_port2=${tx_port_begin}+6 + let audio_rx_port2=${rx_port_begin}+4 + let video_rx_port2=${rx_port_begin}+6 +} + +print_info() { + echo "" + echo "mod = ${mod}" + echo "nr_stream = ${nr_stream}" + echo "remote_ip = ${remote_ip}" + echo "listen_ip = ${listen_ip}" + echo "tx_port_begin = ${tx_port_begin}" + echo "rx_port_begin = ${rx_port_begin}" + echo "audio_tx_port1 = ${audio_tx_port1}" + echo "video_tx_port1 = ${video_tx_port1}" + echo "audio_rx_port1 = ${audio_rx_port1}" + echo "video_rx_port1 = ${video_rx_port1}" + echo "audio_tx_port2 = ${audio_tx_port2}" + echo "video_tx_port2 = ${video_tx_port2}" + echo "audio_rx_port2 = ${audio_rx_port2}" + echo "video_rx_port2 = ${video_rx_port2}" + echo "" +} + +usage() +{ + echo + echo "$0 <-d|-e> [remote_ip|listen_ip] [port]" + echo + echo "-d = decode mode" + echo "-e = encode mode" + echo "remote_ip, listen_ip optional, default multicast ${default_ip}" + echo "port is either remote send port or local listen port" + echo "port must be preceded with ip argument" + echo + echo "To use H.264 codec for both decode and encode, use the environment" + echo "export VCODEC=H264" + echo "To use v4l2 camera instead of videotestsrc use the environment" + echo "v4l2 works only with 1 stream" + echo "export SRC=v4l2" + echo "The default video codec is VP8" + echo +} + +if [ "$#" -lt "2" ]; +then + usage + exit 2 +fi + +mod=${1} +nr_stream=${2} +shift 2 +if [ "$#" -ne "0" ] && [ "$#" -ne "2" ]; +then + echo "Both IP and Port must be specified" + usage + exit 2 +fi +ip=${1} +port=${2} +if [ "${ip}" == "" ]; +then + remote_ip=${default_ip} + listen_ip=${default_ip} +else + remote_ip=${ip} + listen_ip=${ip} +fi +if [ "${port}" != "" ]; +then + tx_port_begin=${port} + rx_port_begin=${port} +fi +set_udp_port +print_info +sleep 1 + +if [ "${VCODEC}" == "H264" ]; +then + video_tx=video_tx_h264 + video_rx=video_rx_h264 +else + video_tx=video_tx_vp8 + video_rx=video_rx_vp8 +fi +audio_tx=audio_tx_opus +audio_rx=audio_rx_opus + +trap 'sigint' INT +trap 'sigterm' TERM + +case "${mod}" in + -e) + ${audio_tx} ${audio_tx_port1} 0 1000 + ${video_tx} ${video_tx_port1} 0 + if [ "${nr_stream}" -eq 2 ]; + then + ${audio_tx} ${audio_tx_port2} 0 1001 + ${video_tx} ${video_tx_port2} 18 + fi + ;; + -d) + ${audio_rx} ${audio_rx_port1} + ${video_rx} ${video_rx_port1} + if [ "${nr_stream}" -eq 2 ]; + then + ${audio_rx} ${audio_rx_port2} + ${video_rx} ${video_rx_port2} + fi + ;; + *) + usage + exit 2 + ;; +esac + +sleep 1 +echo "" +ps -eao pid,cmd | grep $0 | grep $grep $$ +childpid=$(jobs -p) +for pid in ${childpid}; +do + ps -eao pid,cmd | grep ${gst} | grep ${pid} +done +echo "" + +wait + +echo "exiting" +echo "completed" + +exit 0 + diff --git a/example/ion-sfu-rtp/main.go b/example/ion-sfu-rtp/main.go new file mode 100644 index 00000000..42c13994 --- /dev/null +++ b/example/ion-sfu-rtp/main.go @@ -0,0 +1,597 @@ +package main + +import ( + "os" + "fmt" + "flag" + "net" + "io" + "time" + "syscall" + "sync" + "os/signal" + + ilog "github.com/pion/ion-log" + sdk "github.com/pion/ion-sdk-go" + rtp "github.com/pion/rtp" + "github.com/pion/webrtc/v3" +) + +// media info +type minfo struct { + nr_rtp int + trackid string + streamid string + mime string + mtype string + fwdport int + pt uint8 + seq uint16 + ts uint32 + ssrc uint32 + nr_pkt_lost int + conn *net.UDPConn +} + +// main configuration +type main_data struct { + room *sdk.Room + rtc *sdk.RTC + session, addr, gst_ip, log_level, uid, rid, client_id string + video_mtype, audio_mtype, vcodec_type, acodec_type string + pid int + nr_stream int + at1, at2, vt1, vt2 *webrtc.TrackLocalStaticRTP + al1, al2, vl1, vl2 *net.UDPConn +} + +const ( + listen_audio1 = 21000 + listen_video1 = 21002 + listen_audio2 = 21004 + listen_video2 = 21006 + print_count = 1000000 + stat_interval = 10 + delay_stream = 10 + datefmt = "20060102:150405.999999Z07:00" +) + +var ( + md main_data + listen_ip1 string = "225.0.0.225" + forward_ip1 string = "225.0.0.225" + rtp_fwd int = 0 + fwd_audio_cur int = 22000 + fwd_video_cur int = 22002 + log = ilog.NewLoggerWithFields(ilog.TraceLevel, "ion-sfu-rtp", nil) + lmap map[string]*minfo + rmap map[string]*minfo + lmutex sync.RWMutex + rmutex sync.RWMutex +) + +func print_stat() { + nr := 0 + log.Infof("Printing Stat %s", time.Now().Format(datefmt)) + fmt.Printf("\n") + fmt.Printf("***********\n") + fmt.Printf("Printing Local Stat\n") + fmt.Printf("***********\n") + for n, mi := range lmap { + m := fmt.Sprintf("[%2d] [%s] rtp:%-5d mtype:%-10s " + + "fwd:%-5d pt:%-3d ssrc:%08x:%-10d pktlost:%d\n", + nr, n, mi.nr_rtp, mi.mtype, + mi.fwdport, mi.pt, mi.ssrc, mi.ssrc, mi.nr_pkt_lost) + fmt.Printf(m) + nr++ + } + fmt.Printf("\n") + nr = 0 + fmt.Printf("***********\n") + fmt.Printf("Printing Remote Stat\n") + fmt.Printf("***********\n") + for n, mi := range rmap { + m := fmt.Sprintf("[%2d] [%s] rtp:%-5d mtype:%-10s " + + "fwd:%-5d pt:%-3d ssrc:%08x:%-10d pktlost:%d\n", + nr, n, mi.nr_rtp, mi.mtype, + mi.fwdport, mi.pt, mi.ssrc, mi.ssrc, mi.nr_pkt_lost) + fmt.Printf(m) + nr++ + } + fmt.Printf("***********\n") + fmt.Printf("\n") +} + +func write_chat() { + var payload map[string]interface{} + var msg map[string]string + + dname := md.rid + "_name" + tm := time.Now().Format("15:04:05.999") + txt := fmt.Sprintf("from %s at %s", md.rid, tm) + payload = make(map[string]interface{}) + msg = make(map[string]string) + msg["uid"] = md.rid + msg["name"] = dname + msg["text"] = txt + payload["msg"] = msg + err := md.room.SendMessage(md.session, dname, "all", payload) + if err != nil { + panic(err) + } + return +} + +func forward_rtp_packet(mi *minfo, b []uint8, n int) { + if(mi.conn == nil) { + rem := fmt.Sprintf("%s:%d", forward_ip1, mi.fwdport) + remaddr, err := net.ResolveUDPAddr("udp", rem) + log.Infof("forward_rtp_packet: remote %+v\n", remaddr) + if err != nil { + panic(err) + } + mi.conn, err = net.DialUDP("udp", nil, remaddr) + if err != nil { + panic(err) + } + } + n1, err := mi.conn.Write(b[:n]) + if err != nil { + /* panic(err) */ + } + if n != n1 { + /* panic(err) */ + } + //log.Infof("mi %+v, len %d %d\n", mi, n, n1) + + return +} + +func analyze_rtp_packet(mi *minfo, b []uint8, n int) { + var nr_lost uint16 + + h := &rtp.Header{} + h.Unmarshal(b) + + if(mi.nr_rtp == 0) { + mi.pt = h.PayloadType + mi.ssrc = h.SSRC + mi.seq = h.SequenceNumber - 1 + mi.ts = h.Timestamp + } + nr_lost = h.SequenceNumber - mi.seq - 1 + mi.nr_pkt_lost += int(nr_lost) + mi.seq = h.SequenceNumber + mi.ts = h.Timestamp + mi.nr_rtp++ + if(mi.pt != h.PayloadType) { + log.Errorf("Payload Type Changed from %d to %d %s", + mi.pt, h.PayloadType, mi.trackid) + mi.pt = h.PayloadType + } + if(mi.ssrc != h.SSRC) { + log.Errorf("SSRC Changed from %d to %d %08x %s", + mi.ssrc, h.SSRC, h.SSRC, mi.trackid) + mi.ssrc = h.SSRC + } + if(mi.nr_rtp < 10) { + log.Infof(">> [%5d %5d] - %s - %x %08x %5d %d\n", + mi.nr_rtp, n, mi.trackid, h.PayloadType, h.SSRC, + h.SequenceNumber, h.Timestamp) + } + + return +} + +func process_local_rtp(name string, uid string, mtype string, + listener *net.UDPConn, track *webrtc.TrackLocalStaticRTP) () { + var mi *minfo + + streamid := track.StreamID() + key := streamid + "_" + name + lmutex.Lock() + mi, ok := lmap[key] + lmutex.Unlock() + if(!ok) { + mi = new(minfo) + mi.trackid = name + mi.streamid = streamid + mi.mtype = mtype + lmutex.Lock() + lmap[key] = mi + lmutex.Unlock() + } + + b := make([]byte, 1600) // UDP MTU + for { + n, _, err := listener.ReadFrom(b) + if err != nil { + panic(fmt.Sprintf("error during read: %s", err)) + } + n1, err := track.Write(b[:n]) + if err != nil { + log.Infof("RTP %s - %d - %d - %+v", name, n, n1, err) + } + analyze_rtp_packet(mi, b, n) + if((mi.nr_rtp % print_count) == 0) { + log.Infof("LOCAL %s [%d] [%s %s] %5d", + uid, mi.nr_rtp, streamid, name, n1) + } + } + + return +} + +func read_remote_rtp(uid string, rtc *sdk.RTC) { + rtc.OnTrack = func(track *webrtc.TrackRemote, receiver *webrtc.RTPReceiver) { + ssrc := track.SSRC(); + m := fmt.Sprintf("read_remote_rtp() OnTrack " + + "streamId=%v trackId=%v kind=%v ssrc=%d(%08x) ", + track.StreamID(), track.ID(), track.Kind(), ssrc, ssrc) + log.Infof(m) + + b := make([]byte, 1600) + for { + select { + default: + n, _, err := track.Read(b) + if err != nil { + if err == io.EOF { + log.Errorf("id=%v track.ReadRTP err=%v", rtc, err) + return + } + log.Errorf("id=%v Error reading track rtp %s", rtc, err) + continue + } + streamid := track.StreamID() + trackid := track.ID() + mime := track.Codec().RTPCodecCapability.MimeType + key := streamid + "_" + trackid + rmutex.Lock() + mi, ok := rmap[key] + rmutex.Unlock() + if(!ok) { + mi = new(minfo) + mi.trackid = trackid + mi.streamid = streamid + mi.mime = mime + mi.mtype = mime[0:5] + rmutex.Lock() + if(mi.mtype == "audio") { + mi.fwdport = fwd_audio_cur + fwd_audio_cur += 4 + } else { + mi.fwdport = fwd_video_cur + fwd_video_cur += 4 + } + rmap[key] = mi + rmutex.Unlock() + } + switch(mi.mime) { + case sdk.MimeTypeVP8: + break + case sdk.MimeTypeVP9: + break + case sdk.MimeTypeH264: + break + case sdk.MimeTypeOpus: + break + default: + break + } + analyze_rtp_packet(mi, b, n) + if((mi.nr_rtp % print_count) == 0) { + log.Infof("REMOTE %s [%d] [%s %s] %5d, %s", + uid, mi.nr_rtp, streamid, trackid, n, mime) + } + if(rtp_fwd != 0) { + forward_rtp_packet(mi, b, n) + } + } + } + } +} + +func create_local_track(streamid_audio, streamid_video, + trackid_audio, trackid_video, acodec_type, vcodec_type string) ( + audio_track, video_track *webrtc.TrackLocalStaticRTP) { + video_track, err := webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{MimeType: "video/" + vcodec_type}, + trackid_video, streamid_video) + if err != nil { + panic(err) + } + + audio_track, err = webrtc.NewTrackLocalStaticRTP( + webrtc.RTPCodecCapability{MimeType: "audio/" + acodec_type}, + trackid_audio, streamid_audio) + if err != nil { + panic(err) + } + + return +} + +func create_local_listener(ip string, + aport, vport int) (aconn, vconn *net.UDPConn) { + aconn, err := net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP(ip), + Port: aport}) + if err != nil { + panic(err) + } + vconn, err = net.ListenUDP("udp", &net.UDPAddr{IP: net.ParseIP(ip), + Port: vport}) + if err != nil { + panic(err) + } + + return +} + +func rtc_main(connector *sdk.Connector) { + // new sdk engine + config := sdk.RTCConfig{ + WebRTC: sdk.WebRTCTransportConfig{ + VideoMime: md.video_mtype, + }, + } + + rtc, err := sdk.NewRTC(connector, config) + if err != nil { + panic(err) + } + md.rtc = rtc + + // keeping an easily identifiable unique id by concatanating + // the given client_id and the process id. + streamid_audio1 := fmt.Sprintf("p1_%s", md.uid) + streamid_video1 := fmt.Sprintf("p1_%s", md.uid) + streamid_audio2 := fmt.Sprintf("p2_%s", md.uid) + streamid_video2 := fmt.Sprintf("p2_%s", md.uid) + trackid_audio1 := fmt.Sprintf("a1_%s", md.uid) + trackid_video1 := fmt.Sprintf("v1_%s", md.uid) + trackid_audio2 := fmt.Sprintf("a2_%s", md.uid) + trackid_video2 := fmt.Sprintf("v2_%s", md.uid) + log.Infof("main [%s]: remote %s", md.uid, md.addr) + log.Infof("main [%s]: session %s", md.uid, md.session) + log.Infof("main [%s]: gst_ip %s %s %s", + md.uid, md.gst_ip, listen_ip1, forward_ip1) + log.Infof("main [%s]: log_level %s", md.uid, md.log_level) + log.Infof("main [%s]: nr_stream %d", md.uid, md.nr_stream) + log.Infof("main [%s]: rtp_fwd %d", md.uid, rtp_fwd) + log.Infof("main [%s]: %s, %s", md.uid, streamid_audio1, streamid_video1) + log.Infof("main [%s]: %s, %s", md.uid, streamid_audio2, streamid_video2) + log.Infof("main [%s]: %s, %s", md.uid, trackid_audio1, trackid_video1) + log.Infof("main [%s]: %s, %s", md.uid, trackid_audio2, trackid_video2) + + read_remote_rtp(md.uid, rtc) + rtc.OnDataChannel = func(dc *webrtc.DataChannel) { + log.Infof("main [%s]: OnDataChannel", md.uid) + } + + // client join a session + err = rtc.Join(md.session, md.rid) + if err != nil { + log.Errorf("rtc_main: join err=%v", err) + panic(err) + } + + if(md.nr_stream > 0) { + log.Infof("main [%s]: Publishing First Track", md.uid) + md.at1, md.vt1 = create_local_track(streamid_audio1, streamid_video1, + trackid_audio1, trackid_video1, md.acodec_type, md.vcodec_type) + md.al1, md.vl1 = create_local_listener(listen_ip1, + listen_audio1, listen_video1) + log.Infof("main [%s]: listener [%+v %+v]\n%+v\n%+v", + md.uid, md.al1, md.vl1, md.at1, md.vt1) + + _, _ = rtc.Publish(md.vt1, md.at1) + go process_local_rtp(trackid_audio1, md.uid, md.audio_mtype, + md.al1, md.at1) + go process_local_rtp(trackid_video1, md.uid, md.video_mtype, + md.vl1, md.vt1) + } + + if(md.nr_stream > 1) { + delay := delay_stream + for i := 0; i < delay; i++ { + log.Infof("main [%s]: Publishing Second Stream in %d", + md.uid, (delay -i)) + time.Sleep(time.Millisecond * 1000) + } + log.Infof("main [%s]: Publishing Second Stream", md.uid) + md.at2, md.vt2 = create_local_track(streamid_audio2, streamid_video2, + trackid_audio2, trackid_video2, md.acodec_type, md.vcodec_type) + md.al2, md.vl2 = create_local_listener(listen_ip1, + listen_audio2, listen_video2) + log.Infof("main [%s]: listener [%+v %+v]\n%+v\n%+v", + md.uid, md.al2, md.vl2, md.at2, md.vt2) + + _, _ = rtc.Publish(md.vt2, md.at2) + go process_local_rtp(trackid_audio2, md.uid, md.audio_mtype, md.al2, md.at2) + go process_local_rtp(trackid_video2, md.uid, md.video_mtype, md.vl2, md.vt2) + } else { + log.Infof("main [%s]: No Second Track", md.uid) + } + + return +} + +func room_main(connector *sdk.Connector) { + dname := md.rid + "_name" + room := sdk.NewRoom(connector) + md.room = room + + err := room.CreateRoom(sdk.RoomInfo{Sid: md.session}) + if err != nil { + log.Errorf("room_main: err=%v", err) + panic(err) + } + + err = room.AddPeer(sdk.PeerInfo{Sid: md.session, Uid: md.rid}) + if err != nil { + log.Errorf("room_main: err=%v", err) + panic(err) + } + + err = room.UpdatePeer(sdk.PeerInfo{Sid: md.session, + Uid: md.rid, DisplayName: dname}) + if err != nil { + log.Errorf("room_main: err=%v", err) + panic(err) + } + peers := room.GetPeers(md.session) + log.Infof("room_main: peers %d", len(peers)) + for n, p := range peers { + log.Infof("room_main: peer [%2d] = %+v", n, p) + } + + err = room.UpdateRoom(sdk.RoomInfo{Sid: md.session, Lock: true}) + if err != nil { + log.Errorf("room_main: err=%v", err) + panic(err) + } + + err = room.UpdateRoom(sdk.RoomInfo{Sid: md.session, Lock: false}) + if err != nil { + log.Errorf("room_main: err=%v", err) + panic(err) + } + + /* + err = room.EndRoom(md.session, "conference end", true) + if err != nil { + log.Errorf("room_main: err=%v", err) + panic(err) + } + */ + + room.OnJoin = func(success bool, info sdk.RoomInfo, err error) { + log.Infof("room_main: OnJoin success %+v, info = %+v", + success, info) + // calling rtc_main one room join is done + rtc_main(connector) + } + + room.OnLeave = func(success bool, err error) { + log.Infof("room_main: OnLeave success %+v", success) + } + + room.OnPeerEvent = func(state sdk.PeerState, peer sdk.PeerInfo) { + log.Infof("room_main: OnPeerEvent state %+v, peer = %+v", + state, peer) + } + + room.OnMessage = func(from, to string, data map[string]interface{}) { + log.Infof("room_main: OnMessage from %+v, to = %+v, data = %+v", + from, to, data) + } + + room.OnDisconnect = func(sid, reason string) { + log.Infof("room_main: OnDisconnect sid = %+v, reason = %+v", + sid, reason) + } + + room.OnRoomInfo = func(info sdk.RoomInfo) { + log.Infof("room_main: OnRoomInfo info = %+v", info) + } + + jinfo := sdk.JoinInfo{ + Sid: md.session, + Uid: md.rid, + DisplayName: dname, + Role: sdk.Role_Host, + Protocol: sdk.Protocol_WebRTC, + Direction: sdk.Peer_BILATERAL, + } + err = room.Join(jinfo) + return +} + +func exit_cleanup() { + fmt.Printf("exit_cleanup:\n") + if(md.room != nil) { + md.room.Leave(md.session, md.rid) + } + time.Sleep(time.Millisecond * 1000) + return +} + +func main() { + // parse flag + flag.StringVar(&md.addr, "addr", "localhost:5551", "ion-sfu grpc addr") + flag.StringVar(&md.session, "session", "ion", "join session name") + flag.StringVar(&md.gst_ip, "gst_ip", "null", + "gstreamer ip to listen or forward") + flag.StringVar(&md.log_level, "log", "info", + "log level:debug|info|warn|error") + flag.StringVar(&md.client_id, "client_id", "cl0", + "client id of this client") + flag.IntVar(&md.nr_stream, "nr_stream", 1, "number of streams 1 or 2") + flag.IntVar(&rtp_fwd, "rtp_fwd", 0, "rtp_fwd 0 or 1") + flag.Parse() + if(md.gst_ip != "null") { + listen_ip1 = md.gst_ip + forward_ip1 = md.gst_ip + } + md.acodec_type = "opus" + md.audio_mtype = sdk.MimeTypeOpus + vcodec_env := os.Getenv("VCODEC") + if(vcodec_env == "H264") { + md.video_mtype = sdk.MimeTypeH264 + md.vcodec_type = "h264" + } else { + md.video_mtype = sdk.MimeTypeVP8 + md.vcodec_type = "vp8" + } + sigchan := make(chan os.Signal, 1) + donechan := make(chan bool, 1) + do_exit := 0 + signal.Notify(sigchan, syscall.SIGINT, syscall.SIGTERM) + lmap = make(map[string]*minfo) + rmap = make(map[string]*minfo) + + connector := sdk.NewConnector(md.addr) + md.pid = os.Getpid() + md.uid = fmt.Sprintf("%s_%d", md.client_id, md.pid) + md.rid = sdk.RandomKey(6) + fmt.Printf("[%s %s] %+v\n", md.uid, md.rid, connector) + room_main(connector) + // calling rtc_main directly without room join + //rtc_main(connector) + + m := fmt.Sprintf("main [%s]: waiting", md.uid) + log.Infof(m) + fmt.Printf("%s\n", m) + time.Sleep(time.Millisecond * 1000) + tick1 := time.NewTicker(stat_interval * time.Second) + + go func () { + sig := <- sigchan + fmt.Printf("***********\n") + fmt.Printf("main [%s]: signal received [%+v]\n", md.uid, sig) + fmt.Printf("***********\n") + print_stat() + exit_cleanup() + donechan <- true + do_exit = 1 + }() + + for do_exit == 0 { + select { + case <- tick1.C: + fmt.Printf("main [%s]: tick\n", md.uid) + write_chat() + print_stat() + case done := <-donechan: + fmt.Printf("main [%s]: done received %+v\n", md.uid, done) + tick1.Stop() + break + } + } + + fmt.Printf("main [%s]: exiting\n", md.uid) + + return +} + diff --git a/example/ion-sfu-rtp/run.sh b/example/ion-sfu-rtp/run.sh new file mode 100755 index 00000000..5ae8ec82 --- /dev/null +++ b/example/ion-sfu-rtp/run.sh @@ -0,0 +1,109 @@ +#!/bin/bash + +# default values +sn='ion' +nr_client=-1 +nr_stream=-1 +program=${PWD}/ion-sfu-rtp +pidfile=/tmp/ion_sfu_rtp_run_${USER}.pid + +usage() +{ + echo + echo "$0 " + echo "$0 stop - to stop all the clients" + echo "" + echo "session name = any valid session name" + echo "nr_client = 1 or above, upto whatever the system could handle" + echo "nr_stream = either 1 or 2 only" + echo "" + echo "For example:" + echo "to start 5 clients with 2 streams per client with session name ion" + echo + echo "$0 ion 5 2" + echo +} + +stop_clients() +{ + echo stopping + childpid=$(pidof ${program}) + nr=0 + for pid in ${childpid}; + do + ps -eao pid,cmd | grep ${program} | grep ${pid} + let nr=${nr}+1 + done + echo "Number of processes: ${nr}" + sleep 1 + echo killall -15 ${program} + for pid in ${childpid}; + do + kill -15 ${pid} + done + sleep 1 + echo killall -9 ${program} + killall -9 ${program} + echo "exiting" + echo "completed" +} + +if [ "$#" -lt 1 ]; +then + usage + exit 2 +fi + +sn=${1} +if [ "$sn" == "stop" ]; +then + stop_clients + exit 0 +fi + +if [ "$#" -ne 3 ]; +then + usage + exit 2 +fi + +nr_client=${2} +nr_stream=${3} +args="-addr localhost:5551 -session ${sn} -nr_stream ${nr_stream}" + +if [ ! -f ${program} ]; +then + echo "${program} not found" + go build . + exit 2 +fi + +echo "Session Name: ${sn}" +echo "NR Client: ${nr_client}" +echo "NR Stream: ${nr_stream}" +echo "${program} ${args} -client_id cl000" +echo + +for((n=0;n<${nr_client};n++)); +do + sleep 2 + client_id=$(printf "cl%03d" ${n}) + echo ${n} ${client_id} + echo setsid -f ${program} ${args} -client_id ${client_id} + setsid -f ${program} ${args} -client_id ${client_id} +done + +sleep 5 +echo "started" +rm -f ${pidfile} +childpid=$(pidof ${program}) +echo ${childpid} > ${pidfile} +for pid in ${childpid}; +do + ps -eao pid,cmd | grep ${program} | grep ${pid} +done +echo cat ${pidfile} +cat ${pidfile} + +exit 0 +