Skip to content

Commit

Permalink
Merge pull request #1292 from pedroSG94/fix-queue
Browse files Browse the repository at this point in the history
Fix queue
  • Loading branch information
pedroSG94 committed Oct 5, 2023
2 parents 0b0e015 + 043945c commit 3036969
Show file tree
Hide file tree
Showing 8 changed files with 138 additions and 120 deletions.
90 changes: 44 additions & 46 deletions rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -28,15 +28,18 @@ import com.pedro.rtmp.utils.BitrateManager
import com.pedro.rtmp.utils.ConnectCheckerRtmp
import com.pedro.rtmp.utils.onMainThread
import com.pedro.rtmp.utils.socket.RtmpSocket
import com.pedro.rtmp.utils.trySend
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
import kotlinx.coroutines.cancelAndJoin
import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runInterruptible
import java.nio.ByteBuffer
import java.util.concurrent.BlockingQueue
import java.util.concurrent.LinkedBlockingQueue
import java.util.concurrent.TimeUnit

/**
* Created by pedro on 8/04/21.
Expand All @@ -51,15 +54,12 @@ class RtmpSender(
private var h265Packet = H265Packet()
@Volatile
private var running = false

private var cacheSize = 60
@Volatile
private var itemsInQueue = 0
private var cacheSize = 200

private var job: Job? = null
private val scope = CoroutineScope(Dispatchers.IO)
private var queue = Channel<FlvPacket>(cacheSize)
private var queueFlow = queue.receiveAsFlow()
@Volatile
private var queue: BlockingQueue<FlvPacket> = LinkedBlockingQueue(cacheSize)
private var audioFramesSent: Long = 0
private var videoFramesSent: Long = 0
var socket: RtmpSocket? = null
Expand Down Expand Up @@ -94,11 +94,9 @@ class RtmpSender(

private fun enqueueVideoFrame(flvPacket: FlvPacket) {
val result = queue.trySend(flvPacket)
if (!result.isSuccess) {
if (!result) {
Log.i(TAG, "Video frame discarded")
droppedVideoFrames++
} else {
itemsInQueue++
}
}

Expand All @@ -120,62 +118,61 @@ class RtmpSender(
if (running) {
aacPacket.createFlvAudioPacket(aacBuffer, info) { flvPacket ->
val result = queue.trySend(flvPacket)
if (!result.isSuccess) {
if (!result) {
Log.i(TAG, "Audio frame discarded")
droppedAudioFrames++
} else {
itemsInQueue++
}
}
}
}

fun start() {
queue = Channel(cacheSize)
queueFlow = queue.receiveAsFlow()
queue.clear()
running = true
job = scope.launch {
queueFlow.collect { flvPacket ->
itemsInQueue--
while (scope.isActive && running) {
val error = runCatching {
var size = 0
if (flvPacket.type == FlvType.VIDEO) {
videoFramesSent++
socket?.let { socket ->
size = commandsManager.sendVideoPacket(flvPacket, socket)
if (isEnableLogs) {
Log.i(TAG, "wrote Video packet, size $size")
}
}
val flvPacket = runInterruptible {
queue.poll(1, TimeUnit.SECONDS)
}
if (flvPacket == null) {
Log.i(TAG, "Skipping iteration, frame null")
} else {
audioFramesSent++
socket?.let { socket ->
size = commandsManager.sendAudioPacket(flvPacket, socket)
if (isEnableLogs) {
Log.i(TAG, "wrote Audio packet, size $size")
var size = 0
if (flvPacket.type == FlvType.VIDEO) {
videoFramesSent++
socket?.let { socket ->
size = commandsManager.sendVideoPacket(flvPacket, socket)
if (isEnableLogs) {
Log.i(TAG, "wrote Video packet, size $size")
}
}
} else {
audioFramesSent++
socket?.let { socket ->
size = commandsManager.sendAudioPacket(flvPacket, socket)
if (isEnableLogs) {
Log.i(TAG, "wrote Audio packet, size $size")
}
}
}
//bytes to bits
bitrateManager.calculateBitrate(size * 8L)
}
//bytes to bits
bitrateManager.calculateBitrate(size * 8L)
}.exceptionOrNull()
if (error != null) {
onMainThread {
connectCheckerRtmp.onConnectionFailedRtmp("Error send packet, " + error.message)
}
Log.e(TAG, "send error: ", error)
return@collect
return@launch
}
}
}
}

suspend fun stop(clear: Boolean = true) {
running = false
queue.cancel()
itemsInQueue = 0
queue = Channel(cacheSize)
queueFlow = queue.receiveAsFlow()
aacPacket.reset()
h264Packet.reset(clear)
h265Packet.reset(clear)
Expand All @@ -185,22 +182,23 @@ class RtmpSender(
resetDroppedVideoFrames()
job?.cancelAndJoin()
job = null
queue.clear()
}

fun hasCongestion(): Boolean {
val size = cacheSize
val remaining = cacheSize - itemsInQueue
val size = queue.size.toFloat()
val remaining = queue.remainingCapacity().toFloat()
val capacity = size + remaining
return size >= capacity * 0.2f //more than 20% queue used. You could have congestion
}

fun resizeCache(newSize: Int) {
if (!scope.isActive) {
val tempQueue = Channel<FlvPacket>(newSize)
queue = tempQueue
queueFlow = queue.receiveAsFlow()
cacheSize = newSize
if (newSize < queue.size - queue.remainingCapacity()) {
throw RuntimeException("Can't fit current cache inside new cache size")
}
val tempQueue: BlockingQueue<FlvPacket> = LinkedBlockingQueue(newSize)
queue.drainTo(tempQueue)
queue = tempQueue
}

fun getCacheSize(): Int {
Expand Down
10 changes: 10 additions & 0 deletions rtmp/src/main/java/com/pedro/rtmp/utils/Utils.kt
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,21 @@ import kotlinx.coroutines.withContext
import java.io.InputStream
import java.io.OutputStream
import java.nio.ByteBuffer
import java.util.concurrent.BlockingQueue

/**
* Created by pedro on 20/04/21.
*/

inline infix fun <reified T: Any> BlockingQueue<T>.trySend(item: T): Boolean {
return try {
this.add(item)
true
} catch (e: IllegalStateException) {
false
}
}

suspend fun onMainThread(code: () -> Unit) {
withContext(Dispatchers.Main) {
code()
Expand Down
9 changes: 5 additions & 4 deletions rtmp/src/main/java/com/pedro/rtmp/utils/socket/TcpSocket.kt
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package com.pedro.rtmp.utils.socket

import com.pedro.rtmp.utils.TLSSocketFactory
import java.io.BufferedInputStream
import java.io.BufferedOutputStream
import java.io.ByteArrayInputStream
import java.io.ByteArrayOutputStream
import java.io.IOException
Expand All @@ -34,8 +35,8 @@ import java.security.GeneralSecurityException
class TcpSocket(private val host: String, private val port: Int, private val secured: Boolean): RtmpSocket() {

private var socket: Socket = Socket()
private var input: BufferedInputStream = BufferedInputStream(ByteArrayInputStream(byteArrayOf()))
private var output: OutputStream = ByteArrayOutputStream()
private var input = ByteArrayInputStream(byteArrayOf()).buffered()
private var output = ByteArrayOutputStream().buffered()

override fun getOutStream(): OutputStream = output

Expand All @@ -58,8 +59,8 @@ class TcpSocket(private val host: String, private val port: Int, private val sec
val socketAddress: SocketAddress = InetSocketAddress(host, port)
socket.connect(socketAddress, timeout)
}
output = socket.getOutputStream()
input = BufferedInputStream(socket.getInputStream())
output = socket.getOutputStream().buffered()
input = socket.getInputStream().buffered()
socket.soTimeout = timeout
}

Expand Down
74 changes: 34 additions & 40 deletions rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import com.pedro.rtsp.utils.BitrateManager
import com.pedro.rtsp.utils.ConnectCheckerRtsp
import com.pedro.rtsp.utils.RtpConstants
import com.pedro.rtsp.utils.onMainThread
import com.pedro.rtsp.utils.trySend
import kotlinx.coroutines.CoroutineScope
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.Job
Expand All @@ -34,6 +35,7 @@ import kotlinx.coroutines.channels.Channel
import kotlinx.coroutines.flow.receiveAsFlow
import kotlinx.coroutines.isActive
import kotlinx.coroutines.launch
import kotlinx.coroutines.runInterruptible
import java.io.IOException
import java.io.OutputStream
import java.nio.ByteBuffer
Expand All @@ -54,14 +56,12 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) {
get() = 10 * 1024 * 1024 / RtpConstants.MTU
private var cacheSize = defaultCacheSize
@Volatile
private var itemsInQueue = 0
@Volatile
private var running = false

private var job: Job? = null
private val scope = CoroutineScope(Dispatchers.IO)
private var queue = Channel<RtpFrame>(cacheSize)
private var queueFlow = queue.receiveAsFlow()
@Volatile
private var queue: BlockingQueue<RtpFrame> = LinkedBlockingQueue(cacheSize)

private var audioFramesSent: Long = 0
private var videoFramesSent: Long = 0
Expand Down Expand Up @@ -108,11 +108,9 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) {
if (running) {
videoPacket?.createAndSendPacket(h264Buffer, info) { rtpFrame ->
val result = queue.trySend(rtpFrame)
if (!result.isSuccess) {
if (!result) {
Log.i(TAG, "Video frame discarded")
droppedVideoFrames++
} else {
itemsInQueue++
}
}
}
Expand All @@ -122,19 +120,16 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) {
if (running) {
aacPacket?.createAndSendPacket(aacBuffer, info) { rtpFrame ->
val result = queue.trySend(rtpFrame)
if (!result.isSuccess) {
if (!result) {
Log.i(TAG, "Audio frame discarded")
droppedAudioFrames++
} else {
itemsInQueue++
}
}
}
}

fun start() {
queue = Channel(cacheSize)
queueFlow = queue.receiveAsFlow()
queue.clear()
running = true
job = scope.launch {
val ssrcVideo = Random().nextInt().toLong()
Expand All @@ -143,41 +138,41 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) {
videoPacket?.setSSRC(ssrcVideo)
aacPacket?.setSSRC(ssrcAudio)
val isTcp = rtpSocket is RtpSocketTcp
queueFlow.collect { rtpFrame ->
itemsInQueue--
while (scope.isActive && running) {
val error = runCatching {
rtpSocket?.sendFrame(rtpFrame, isEnableLogs)
//bytes to bits (4 is tcp header length)
val packetSize = if (isTcp) rtpFrame.length + 4 else rtpFrame.length
bitrateManager.calculateBitrate(packetSize * 8.toLong())
if (rtpFrame.isVideoFrame()) {
videoFramesSent++
} else {
audioFramesSent++
val rtpFrame = runInterruptible {
queue.poll(1, TimeUnit.SECONDS)
}
if (baseSenderReport?.update(rtpFrame, isEnableLogs) == true) {
if (rtpFrame != null) {
rtpSocket?.sendFrame(rtpFrame, isEnableLogs)
//bytes to bits (4 is tcp header length)
val reportSize = if (isTcp) baseSenderReport?.PACKET_LENGTH ?: (0 + 4) else baseSenderReport?.PACKET_LENGTH ?: 0
bitrateManager.calculateBitrate(reportSize * 8.toLong())
val packetSize = if (isTcp) rtpFrame.length + 4 else rtpFrame.length
bitrateManager.calculateBitrate(packetSize * 8.toLong())
if (rtpFrame.isVideoFrame()) {
videoFramesSent++
} else {
audioFramesSent++
}
if (baseSenderReport?.update(rtpFrame, isEnableLogs) == true) {
//bytes to bits (4 is tcp header length)
val reportSize = if (isTcp) baseSenderReport?.PACKET_LENGTH ?: (0 + 4) else baseSenderReport?.PACKET_LENGTH ?: 0
bitrateManager.calculateBitrate(reportSize * 8.toLong())
}
}
}.exceptionOrNull()
if (error != null) {
onMainThread {
connectCheckerRtsp.onConnectionFailedRtsp("Error send packet, " + error.message)
connectCheckerRtsp.onConnectionFailedRtsp("Error send packet, ${error.message}")
}
Log.e(TAG, "send error: ", error)
return@collect
return@launch
}
}
}
}

suspend fun stop() {
running = false
queue.cancel()
itemsInQueue = 0
queue = Channel(cacheSize)
queueFlow = queue.receiveAsFlow()
baseSenderReport?.reset()
baseSenderReport?.close()
rtpSocket?.close()
Expand All @@ -189,24 +184,23 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) {
resetDroppedVideoFrames()
job?.cancelAndJoin()
job = null
queue.clear()
}

fun hasCongestion(): Boolean {
val size = cacheSize
val remaining = cacheSize - itemsInQueue
val size = queue.size.toFloat()
val remaining = queue.remainingCapacity().toFloat()
val capacity = size + remaining
return size >= capacity * 0.2f //more than 20% queue used. You could have congestion
}

fun resizeCache(newSize: Int) {
if (!scope.isActive) {
val tempQueue = Channel<RtpFrame>(newSize)
queue = tempQueue
queueFlow = queue.receiveAsFlow()
cacheSize = newSize
} else {
throw RuntimeException("resize cache while streaming is not available")
if (newSize < queue.size - queue.remainingCapacity()) {
throw RuntimeException("Can't fit current cache inside new cache size")
}
val tempQueue: BlockingQueue<RtpFrame> = LinkedBlockingQueue(newSize)
queue.drainTo(tempQueue)
queue = tempQueue
}

fun getCacheSize(): Int {
Expand Down

0 comments on commit 3036969

Please sign in to comment.