Skip to content

Commit

Permalink
Merge pull request #1303 from pedroSG94/feature/expose-congestion-per…
Browse files Browse the repository at this point in the history
…cent

expose hascongestion precent used
  • Loading branch information
pedroSG94 committed Oct 15, 2023
2 parents 2b5d3d1 + e81dc7f commit caeb371
Show file tree
Hide file tree
Showing 11 changed files with 111 additions and 31 deletions.
13 changes: 6 additions & 7 deletions library/src/main/java/com/pedro/library/base/StreamBase.kt
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ abstract class StreamBase(
audioSource: AudioManager.Source
) {

private val getMicrophoneData = object: GetMicrophoneData {
override fun inputPCMData(frame: Frame) {
audioEncoder.inputPCMData(frame)
}
}
//video and audio encoders
private val videoEncoder by lazy { VideoEncoder(getVideoData) }
private val audioEncoder by lazy { AudioEncoder(getAacData) }
Expand Down Expand Up @@ -145,7 +150,7 @@ abstract class StreamBase(
fun startRecord(path: String, listener: RecordController.Listener) {
recordController.startRecord(path, listener)
if (!isStreaming) startSources()
else requestKeyframe()
else videoEncoder.requestKeyframe()
}

/**
Expand Down Expand Up @@ -503,12 +508,6 @@ abstract class StreamBase(
return videoEncoder.prepareVideoEncoder() && audioEncoder.prepareAudioEncoder()
}

private val getMicrophoneData = object: GetMicrophoneData {
override fun inputPCMData(frame: Frame) {
audioEncoder.inputPCMData(frame)
}
}

private val getAacData: GetAacData = object : GetAacData {
override fun getAacData(aacBuffer: ByteBuffer, info: MediaCodec.BufferInfo) {
getAacDataRtp(aacBuffer, info)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ class RtmpStreamClient(
rtmpClient.reConnect(delay, backupUrl)
}

override fun hasCongestion(): Boolean = rtmpClient.hasCongestion()
override fun hasCongestion(percentUsed: Float): Boolean = rtmpClient.hasCongestion(percentUsed)

override fun setLogs(enabled: Boolean) {
rtmpClient.setLogs(enabled)
Expand All @@ -73,6 +73,10 @@ class RtmpStreamClient(
rtmpClient.resizeCache(newSize)
}

override fun clearCache() {
rtmpClient.clearCache()
}

override fun getCacheSize(): Int = rtmpClient.cacheSize

override fun getSentAudioFrames(): Long = rtmpClient.sentAudioFrames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class RtspStreamClient(
rtspClient.reConnect(delay, backupUrl)
}

override fun hasCongestion(): Boolean = rtspClient.hasCongestion()
override fun hasCongestion(percentUsed: Float): Boolean = rtspClient.hasCongestion(percentUsed)

override fun setLogs(enabled: Boolean) {
rtspClient.setLogs(enabled)
Expand All @@ -49,6 +49,10 @@ class RtspStreamClient(
rtspClient.resizeCache(newSize)
}

override fun clearCache() {
rtspClient.clearCache()
}

override fun getCacheSize(): Int = rtspClient.cacheSize

override fun getSentAudioFrames(): Long = rtspClient.sentAudioFrames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ class SrtStreamClient(
srtClient.reConnect(delay, backupUrl)
}

override fun hasCongestion(): Boolean = srtClient.hasCongestion()
override fun hasCongestion(percentUsed: Float): Boolean = srtClient.hasCongestion(percentUsed)

override fun setLogs(enabled: Boolean) {
srtClient.setLogs(enabled)
Expand All @@ -38,6 +38,10 @@ class SrtStreamClient(
srtClient.resizeCache(newSize)
}

override fun clearCache() {
srtClient.clearCache()
}

override fun getCacheSize(): Int = srtClient.cacheSize

override fun getSentAudioFrames(): Long = srtClient.sentAudioFrames
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,11 +34,13 @@ abstract class StreamBaseClient(
protected abstract fun shouldRetry(reason: String): Boolean
protected abstract fun reConnect(delay: Long, backupUrl: String?)
abstract fun setReTries(reTries: Int)
abstract fun hasCongestion(): Boolean
fun hasCongestion(): Boolean = hasCongestion(20f)
abstract fun hasCongestion(percentUsed: Float): Boolean
abstract fun setLogs(enabled: Boolean)
abstract fun setCheckServerAlive(enabled: Boolean)
@Throws(RuntimeException::class)
abstract fun resizeCache(newSize: Int)
abstract fun clearCache()
abstract fun getCacheSize(): Int
abstract fun getSentAudioFrames(): Long
abstract fun getSentVideoFrames(): Long
Expand Down
10 changes: 8 additions & 2 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -542,8 +542,10 @@ class RtmpClient(private val connectCheckerRtmp: ConnectCheckerRtmp) {
}
}

fun hasCongestion(): Boolean {
return rtmpSender.hasCongestion()
@JvmOverloads
@Throws(IllegalArgumentException::class)
fun hasCongestion(percentUsed: Float = 20f): Boolean {
return rtmpSender.hasCongestion(percentUsed)
}

fun resetSentAudioFrames() {
Expand All @@ -570,4 +572,8 @@ class RtmpClient(private val connectCheckerRtmp: ConnectCheckerRtmp) {
fun setLogs(enable: Boolean) {
rtmpSender.setLogs(enable)
}

fun clearCache() {
rtmpSender.clearCache()
}
}
24 changes: 20 additions & 4 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ import com.pedro.rtmp.utils.trySend
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.delay
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runInterruptible
Expand Down Expand Up @@ -130,6 +132,15 @@ class RtmpSender(
queue.clear()
running = true
job = scope.launch {
var bytesSend = 0L
val bitrateTask = async {
while (scope.isActive && running) {
//bytes to bits
bitrateManager.calculateBitrate(bytesSend * 8)
bytesSend = 0
delay(timeMillis = 1000)
}
}
while (scope.isActive && running) {
val error = runCatching {
val flvPacket = runInterruptible {
Expand All @@ -156,8 +167,7 @@ class RtmpSender(
}
}
}
//bytes to bits
bitrateManager.calculateBitrate(size * 8L)
bytesSend += size
}
}.exceptionOrNull()
if (error != null) {
Expand Down Expand Up @@ -185,11 +195,13 @@ class RtmpSender(
queue.clear()
}

fun hasCongestion(): Boolean {
@Throws(IllegalArgumentException::class)
fun hasCongestion(percentUsed: Float = 20f): Boolean {
if (percentUsed < 0 || percentUsed > 100) throw IllegalArgumentException("the value must be in range 0 to 100")
val size = queue.size.toFloat()
val remaining = queue.remainingCapacity().toFloat()
val capacity = size + remaining
return size >= capacity * 0.2f //more than 20% queue used. You could have congestion
return size >= capacity * (percentUsed / 100f)
}

fun resizeCache(newSize: Int) {
Expand All @@ -205,6 +217,10 @@ class RtmpSender(
return cacheSize
}

fun clearCache() {
queue.clear()
}

fun getSentAudioFrames(): Long {
return audioFramesSent
}
Expand Down
10 changes: 8 additions & 2 deletions rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -423,8 +423,10 @@ class RtspClient(private val connectCheckerRtsp: ConnectCheckerRtsp) {
}
}

fun hasCongestion(): Boolean {
return rtspSender.hasCongestion()
@JvmOverloads
@Throws(IllegalArgumentException::class)
fun hasCongestion(percentUsed: Float = 20f): Boolean {
return rtspSender.hasCongestion(percentUsed)
}

@JvmOverloads
Expand Down Expand Up @@ -462,4 +464,8 @@ class RtspClient(private val connectCheckerRtsp: ConnectCheckerRtsp) {
fun setLogs(enable: Boolean) {
rtspSender.setLogs(enable)
}

fun clearCache() {
rtspSender.clearCache()
}
}
29 changes: 23 additions & 6 deletions rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,10 @@ import com.pedro.rtsp.utils.trySend
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -138,25 +140,34 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) {
videoPacket?.setSSRC(ssrcVideo)
aacPacket?.setSSRC(ssrcAudio)
val isTcp = rtpSocket is RtpSocketTcp
var bytesSend = 0L
val bitrateTask = async {
while (scope.isActive && running) {
//bytes to bits
bitrateManager.calculateBitrate(bytesSend * 8)
bytesSend = 0
delay(timeMillis = 1000)
}
}
while (scope.isActive && running) {
val error = runCatching {
val rtpFrame = runInterruptible {
queue.poll(1, TimeUnit.SECONDS)
}
if (rtpFrame != null) {
rtpSocket?.sendFrame(rtpFrame, isEnableLogs)
//bytes to bits (4 is tcp header length)
//4 is tcp header length
val packetSize = if (isTcp) rtpFrame.length + 4 else rtpFrame.length
bitrateManager.calculateBitrate(packetSize * 8.toLong())
bytesSend += packetSize
if (rtpFrame.isVideoFrame()) {
videoFramesSent++
} else {
audioFramesSent++
}
if (baseSenderReport?.update(rtpFrame, isEnableLogs) == true) {
//bytes to bits (4 is tcp header length)
//4 is tcp header length
val reportSize = if (isTcp) baseSenderReport?.PACKET_LENGTH ?: (0 + 4) else baseSenderReport?.PACKET_LENGTH ?: 0
bitrateManager.calculateBitrate(reportSize * 8.toLong())
bytesSend += reportSize
}
}
}.exceptionOrNull()
Expand Down Expand Up @@ -187,11 +198,13 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) {
queue.clear()
}

fun hasCongestion(): Boolean {
@Throws(IllegalArgumentException::class)
fun hasCongestion(percentUsed: Float = 20f): Boolean {
if (percentUsed < 0 || percentUsed > 100) throw IllegalArgumentException("the value must be in range 0 to 100")
val size = queue.size.toFloat()
val remaining = queue.remainingCapacity().toFloat()
val capacity = size + remaining
return size >= capacity * 0.2f //more than 20% queue used. You could have congestion
return size >= capacity * (percentUsed / 100f)
}

fun resizeCache(newSize: Int) {
Expand All @@ -207,6 +220,10 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) {
return cacheSize
}

fun clearCache() {
queue.clear()
}

fun getSentAudioFrames(): Long {
return audioFramesSent
}
Expand Down
10 changes: 8 additions & 2 deletions srt/src/main/java/com/pedro/srt/srt/SrtClient.kt
Original file line number Diff line number Diff line change
Expand Up @@ -358,8 +358,10 @@ class SrtClient(private val connectCheckerSrt: ConnectCheckerSrt) {
}
}

fun hasCongestion(): Boolean {
return srtSender.hasCongestion()
@JvmOverloads
@Throws(IllegalArgumentException::class)
fun hasCongestion(percentUsed: Float = 20f): Boolean {
return srtSender.hasCongestion(percentUsed)
}

fun resetSentAudioFrames() {
Expand All @@ -386,4 +388,8 @@ class SrtClient(private val connectCheckerSrt: ConnectCheckerSrt) {
fun setLogs(enable: Boolean) {
srtSender.setLogs(enable)
}

fun clearCache() {
srtSender.clearCache()
}
}
24 changes: 20 additions & 4 deletions srt/src/main/java/com/pedro/srt/srt/SrtSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,10 @@ import com.pedro.srt.utils.trySend
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.async
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.delay
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
Expand Down Expand Up @@ -153,6 +155,15 @@ class SrtSender(
MpegTsPacket(b, MpegType.PSI, PacketPosition.SINGLE)
}
queue.trySend(psiPackets)
var bytesSend = 0L
val bitrateTask = async {
while (scope.isActive && running) {
//bytes to bits
bitrateManager.calculateBitrate(bytesSend * 8)
bytesSend = 0
delay(timeMillis = 1000)
}
}
while (scope.isActive && running) {
val error = runCatching {
val mpegTsPackets = runInterruptible {
Expand All @@ -164,8 +175,7 @@ class SrtSender(
if (isEnableLogs) {
Log.i(TAG, "wrote ${mpegTsPacket.type.name} packet, size $size")
}
//bytes to bits
bitrateManager.calculateBitrate(size * 8L)
bytesSend += size
}
}.exceptionOrNull()
if (error != null) {
Expand Down Expand Up @@ -219,11 +229,13 @@ class SrtSender(
queue.clear()
}

fun hasCongestion(): Boolean {
@Throws(IllegalArgumentException::class)
fun hasCongestion(percentUsed: Float = 20f): Boolean {
if (percentUsed < 0 || percentUsed > 100) throw IllegalArgumentException("the value must be in range 0 to 100")
val size = queue.size.toFloat()
val remaining = queue.remainingCapacity().toFloat()
val capacity = size + remaining
return size >= capacity * 0.2f //more than 20% queue used. You could have congestion
return size >= capacity * (percentUsed / 100f)
}

fun resizeCache(newSize: Int) {
Expand All @@ -239,6 +251,10 @@ class SrtSender(
return cacheSize
}

fun clearCache() {
queue.clear()
}

fun getSentAudioFrames(): Long {
return audioFramesSent
}
Expand Down

0 comments on commit caeb371

Please sign in to comment.