diff --git a/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt b/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt index 41720d13f..3696f8984 100644 --- a/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt +++ b/rtmp/src/main/java/com/pedro/rtmp/rtmp/RtmpSender.kt @@ -28,15 +28,18 @@ import com.pedro.rtmp.utils.BitrateManager import com.pedro.rtmp.utils.ConnectCheckerRtmp import com.pedro.rtmp.utils.onMainThread import com.pedro.rtmp.utils.socket.RtmpSocket +import com.pedro.rtmp.utils.trySend import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job import kotlinx.coroutines.cancelAndJoin -import kotlinx.coroutines.channels.Channel -import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import kotlinx.coroutines.runInterruptible import java.nio.ByteBuffer +import java.util.concurrent.BlockingQueue +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit /** * Created by pedro on 8/04/21. @@ -51,15 +54,12 @@ class RtmpSender( private var h265Packet = H265Packet() @Volatile private var running = false - - private var cacheSize = 60 - @Volatile - private var itemsInQueue = 0 + private var cacheSize = 200 private var job: Job? = null private val scope = CoroutineScope(Dispatchers.IO) - private var queue = Channel(cacheSize) - private var queueFlow = queue.receiveAsFlow() + @Volatile + private var queue: BlockingQueue = LinkedBlockingQueue(cacheSize) private var audioFramesSent: Long = 0 private var videoFramesSent: Long = 0 var socket: RtmpSocket? = null @@ -94,11 +94,9 @@ class RtmpSender( private fun enqueueVideoFrame(flvPacket: FlvPacket) { val result = queue.trySend(flvPacket) - if (!result.isSuccess) { + if (!result) { Log.i(TAG, "Video frame discarded") droppedVideoFrames++ - } else { - itemsInQueue++ } } @@ -120,51 +118,54 @@ class RtmpSender( if (running) { aacPacket.createFlvAudioPacket(aacBuffer, info) { flvPacket -> val result = queue.trySend(flvPacket) - if (!result.isSuccess) { + if (!result) { Log.i(TAG, "Audio frame discarded") droppedAudioFrames++ - } else { - itemsInQueue++ } } } } fun start() { - queue = Channel(cacheSize) - queueFlow = queue.receiveAsFlow() + queue.clear() running = true job = scope.launch { - queueFlow.collect { flvPacket -> - itemsInQueue-- + while (scope.isActive && running) { val error = runCatching { - var size = 0 - if (flvPacket.type == FlvType.VIDEO) { - videoFramesSent++ - socket?.let { socket -> - size = commandsManager.sendVideoPacket(flvPacket, socket) - if (isEnableLogs) { - Log.i(TAG, "wrote Video packet, size $size") - } - } + val flvPacket = runInterruptible { + queue.poll(1, TimeUnit.SECONDS) + } + if (flvPacket == null) { + Log.i(TAG, "Skipping iteration, frame null") } else { - audioFramesSent++ - socket?.let { socket -> - size = commandsManager.sendAudioPacket(flvPacket, socket) - if (isEnableLogs) { - Log.i(TAG, "wrote Audio packet, size $size") + var size = 0 + if (flvPacket.type == FlvType.VIDEO) { + videoFramesSent++ + socket?.let { socket -> + size = commandsManager.sendVideoPacket(flvPacket, socket) + if (isEnableLogs) { + Log.i(TAG, "wrote Video packet, size $size") + } + } + } else { + audioFramesSent++ + socket?.let { socket -> + size = commandsManager.sendAudioPacket(flvPacket, socket) + if (isEnableLogs) { + Log.i(TAG, "wrote Audio packet, size $size") + } } } + //bytes to bits + bitrateManager.calculateBitrate(size * 8L) } - //bytes to bits - bitrateManager.calculateBitrate(size * 8L) }.exceptionOrNull() if (error != null) { onMainThread { connectCheckerRtmp.onConnectionFailedRtmp("Error send packet, " + error.message) } Log.e(TAG, "send error: ", error) - return@collect + return@launch } } } @@ -172,10 +173,6 @@ class RtmpSender( suspend fun stop(clear: Boolean = true) { running = false - queue.cancel() - itemsInQueue = 0 - queue = Channel(cacheSize) - queueFlow = queue.receiveAsFlow() aacPacket.reset() h264Packet.reset(clear) h265Packet.reset(clear) @@ -185,22 +182,23 @@ class RtmpSender( resetDroppedVideoFrames() job?.cancelAndJoin() job = null + queue.clear() } fun hasCongestion(): Boolean { - val size = cacheSize - val remaining = cacheSize - itemsInQueue + val size = queue.size.toFloat() + val remaining = queue.remainingCapacity().toFloat() val capacity = size + remaining return size >= capacity * 0.2f //more than 20% queue used. You could have congestion } fun resizeCache(newSize: Int) { - if (!scope.isActive) { - val tempQueue = Channel(newSize) - queue = tempQueue - queueFlow = queue.receiveAsFlow() - cacheSize = newSize + if (newSize < queue.size - queue.remainingCapacity()) { + throw RuntimeException("Can't fit current cache inside new cache size") } + val tempQueue: BlockingQueue = LinkedBlockingQueue(newSize) + queue.drainTo(tempQueue) + queue = tempQueue } fun getCacheSize(): Int { diff --git a/rtmp/src/main/java/com/pedro/rtmp/utils/Utils.kt b/rtmp/src/main/java/com/pedro/rtmp/utils/Utils.kt index 14947703b..057e5e3b5 100644 --- a/rtmp/src/main/java/com/pedro/rtmp/utils/Utils.kt +++ b/rtmp/src/main/java/com/pedro/rtmp/utils/Utils.kt @@ -21,11 +21,21 @@ import kotlinx.coroutines.withContext import java.io.InputStream import java.io.OutputStream import java.nio.ByteBuffer +import java.util.concurrent.BlockingQueue /** * Created by pedro on 20/04/21. */ +inline infix fun BlockingQueue.trySend(item: T): Boolean { + return try { + this.add(item) + true + } catch (e: IllegalStateException) { + false + } +} + suspend fun onMainThread(code: () -> Unit) { withContext(Dispatchers.Main) { code() diff --git a/rtmp/src/main/java/com/pedro/rtmp/utils/socket/TcpSocket.kt b/rtmp/src/main/java/com/pedro/rtmp/utils/socket/TcpSocket.kt index d5347a93b..0c149d70e 100644 --- a/rtmp/src/main/java/com/pedro/rtmp/utils/socket/TcpSocket.kt +++ b/rtmp/src/main/java/com/pedro/rtmp/utils/socket/TcpSocket.kt @@ -18,6 +18,7 @@ package com.pedro.rtmp.utils.socket import com.pedro.rtmp.utils.TLSSocketFactory import java.io.BufferedInputStream +import java.io.BufferedOutputStream import java.io.ByteArrayInputStream import java.io.ByteArrayOutputStream import java.io.IOException @@ -34,8 +35,8 @@ import java.security.GeneralSecurityException class TcpSocket(private val host: String, private val port: Int, private val secured: Boolean): RtmpSocket() { private var socket: Socket = Socket() - private var input: BufferedInputStream = BufferedInputStream(ByteArrayInputStream(byteArrayOf())) - private var output: OutputStream = ByteArrayOutputStream() + private var input = ByteArrayInputStream(byteArrayOf()).buffered() + private var output = ByteArrayOutputStream().buffered() override fun getOutStream(): OutputStream = output @@ -58,8 +59,8 @@ class TcpSocket(private val host: String, private val port: Int, private val sec val socketAddress: SocketAddress = InetSocketAddress(host, port) socket.connect(socketAddress, timeout) } - output = socket.getOutputStream() - input = BufferedInputStream(socket.getInputStream()) + output = socket.getOutputStream().buffered() + input = socket.getInputStream().buffered() socket.soTimeout = timeout } diff --git a/rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt b/rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt index 07675a0fe..96ce6891f 100644 --- a/rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt +++ b/rtsp/src/main/java/com/pedro/rtsp/rtsp/RtspSender.kt @@ -26,6 +26,7 @@ import com.pedro.rtsp.utils.BitrateManager import com.pedro.rtsp.utils.ConnectCheckerRtsp import com.pedro.rtsp.utils.RtpConstants import com.pedro.rtsp.utils.onMainThread +import com.pedro.rtsp.utils.trySend import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job @@ -34,6 +35,7 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import kotlinx.coroutines.runInterruptible import java.io.IOException import java.io.OutputStream import java.nio.ByteBuffer @@ -54,14 +56,12 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) { get() = 10 * 1024 * 1024 / RtpConstants.MTU private var cacheSize = defaultCacheSize @Volatile - private var itemsInQueue = 0 - @Volatile private var running = false private var job: Job? = null private val scope = CoroutineScope(Dispatchers.IO) - private var queue = Channel(cacheSize) - private var queueFlow = queue.receiveAsFlow() + @Volatile + private var queue: BlockingQueue = LinkedBlockingQueue(cacheSize) private var audioFramesSent: Long = 0 private var videoFramesSent: Long = 0 @@ -108,11 +108,9 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) { if (running) { videoPacket?.createAndSendPacket(h264Buffer, info) { rtpFrame -> val result = queue.trySend(rtpFrame) - if (!result.isSuccess) { + if (!result) { Log.i(TAG, "Video frame discarded") droppedVideoFrames++ - } else { - itemsInQueue++ } } } @@ -122,19 +120,16 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) { if (running) { aacPacket?.createAndSendPacket(aacBuffer, info) { rtpFrame -> val result = queue.trySend(rtpFrame) - if (!result.isSuccess) { + if (!result) { Log.i(TAG, "Audio frame discarded") droppedAudioFrames++ - } else { - itemsInQueue++ } } } } fun start() { - queue = Channel(cacheSize) - queueFlow = queue.receiveAsFlow() + queue.clear() running = true job = scope.launch { val ssrcVideo = Random().nextInt().toLong() @@ -143,30 +138,34 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) { videoPacket?.setSSRC(ssrcVideo) aacPacket?.setSSRC(ssrcAudio) val isTcp = rtpSocket is RtpSocketTcp - queueFlow.collect { rtpFrame -> - itemsInQueue-- + while (scope.isActive && running) { val error = runCatching { - rtpSocket?.sendFrame(rtpFrame, isEnableLogs) - //bytes to bits (4 is tcp header length) - val packetSize = if (isTcp) rtpFrame.length + 4 else rtpFrame.length - bitrateManager.calculateBitrate(packetSize * 8.toLong()) - if (rtpFrame.isVideoFrame()) { - videoFramesSent++ - } else { - audioFramesSent++ + val rtpFrame = runInterruptible { + queue.poll(1, TimeUnit.SECONDS) } - if (baseSenderReport?.update(rtpFrame, isEnableLogs) == true) { + if (rtpFrame != null) { + rtpSocket?.sendFrame(rtpFrame, isEnableLogs) //bytes to bits (4 is tcp header length) - val reportSize = if (isTcp) baseSenderReport?.PACKET_LENGTH ?: (0 + 4) else baseSenderReport?.PACKET_LENGTH ?: 0 - bitrateManager.calculateBitrate(reportSize * 8.toLong()) + val packetSize = if (isTcp) rtpFrame.length + 4 else rtpFrame.length + bitrateManager.calculateBitrate(packetSize * 8.toLong()) + if (rtpFrame.isVideoFrame()) { + videoFramesSent++ + } else { + audioFramesSent++ + } + if (baseSenderReport?.update(rtpFrame, isEnableLogs) == true) { + //bytes to bits (4 is tcp header length) + val reportSize = if (isTcp) baseSenderReport?.PACKET_LENGTH ?: (0 + 4) else baseSenderReport?.PACKET_LENGTH ?: 0 + bitrateManager.calculateBitrate(reportSize * 8.toLong()) + } } }.exceptionOrNull() if (error != null) { onMainThread { - connectCheckerRtsp.onConnectionFailedRtsp("Error send packet, " + error.message) + connectCheckerRtsp.onConnectionFailedRtsp("Error send packet, ${error.message}") } Log.e(TAG, "send error: ", error) - return@collect + return@launch } } } @@ -174,10 +173,6 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) { suspend fun stop() { running = false - queue.cancel() - itemsInQueue = 0 - queue = Channel(cacheSize) - queueFlow = queue.receiveAsFlow() baseSenderReport?.reset() baseSenderReport?.close() rtpSocket?.close() @@ -189,24 +184,23 @@ class RtspSender(private val connectCheckerRtsp: ConnectCheckerRtsp) { resetDroppedVideoFrames() job?.cancelAndJoin() job = null + queue.clear() } fun hasCongestion(): Boolean { - val size = cacheSize - val remaining = cacheSize - itemsInQueue + val size = queue.size.toFloat() + val remaining = queue.remainingCapacity().toFloat() val capacity = size + remaining return size >= capacity * 0.2f //more than 20% queue used. You could have congestion } fun resizeCache(newSize: Int) { - if (!scope.isActive) { - val tempQueue = Channel(newSize) - queue = tempQueue - queueFlow = queue.receiveAsFlow() - cacheSize = newSize - } else { - throw RuntimeException("resize cache while streaming is not available") + if (newSize < queue.size - queue.remainingCapacity()) { + throw RuntimeException("Can't fit current cache inside new cache size") } + val tempQueue: BlockingQueue = LinkedBlockingQueue(newSize) + queue.drainTo(tempQueue) + queue = tempQueue } fun getCacheSize(): Int { diff --git a/rtsp/src/main/java/com/pedro/rtsp/utils/Extensions.kt b/rtsp/src/main/java/com/pedro/rtsp/utils/Extensions.kt index 7c574e17b..dbfb33890 100644 --- a/rtsp/src/main/java/com/pedro/rtsp/utils/Extensions.kt +++ b/rtsp/src/main/java/com/pedro/rtsp/utils/Extensions.kt @@ -17,9 +17,20 @@ package com.pedro.rtsp.utils import android.util.Base64 +import com.pedro.rtsp.rtsp.RtpFrame import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.withContext import java.nio.ByteBuffer +import java.util.concurrent.BlockingQueue + +inline infix fun BlockingQueue.trySend(item: T): Boolean { + return try { + this.add(item) + true + } catch (e: IllegalStateException) { + false + } +} fun ByteArray.encodeToString(flags: Int = Base64.NO_WRAP): String { return Base64.encodeToString(this, flags) diff --git a/srt/src/main/java/com/pedro/srt/srt/CommandsManager.kt b/srt/src/main/java/com/pedro/srt/srt/CommandsManager.kt index 9423a7dc9..c7e4d307f 100644 --- a/srt/src/main/java/com/pedro/srt/srt/CommandsManager.kt +++ b/srt/src/main/java/com/pedro/srt/srt/CommandsManager.kt @@ -97,7 +97,6 @@ class CommandsManager { packetHandlingQueue.add(dataPacket) dataPacket.write() socket?.write(dataPacket) - Log.i(TAG, dataPacket.toString()) return dataPacket.getSize() } } diff --git a/srt/src/main/java/com/pedro/srt/srt/SrtSender.kt b/srt/src/main/java/com/pedro/srt/srt/SrtSender.kt index ecf438427..9b5b165d5 100644 --- a/srt/src/main/java/com/pedro/srt/srt/SrtSender.kt +++ b/srt/src/main/java/com/pedro/srt/srt/SrtSender.kt @@ -34,6 +34,7 @@ import com.pedro.srt.utils.BitrateManager import com.pedro.srt.utils.ConnectCheckerSrt import com.pedro.srt.utils.SrtSocket import com.pedro.srt.utils.onMainThread +import com.pedro.srt.utils.trySend import kotlinx.coroutines.CoroutineScope import kotlinx.coroutines.Dispatchers import kotlinx.coroutines.Job @@ -42,7 +43,11 @@ import kotlinx.coroutines.channels.Channel import kotlinx.coroutines.flow.receiveAsFlow import kotlinx.coroutines.isActive import kotlinx.coroutines.launch +import kotlinx.coroutines.runInterruptible import java.nio.ByteBuffer +import java.util.concurrent.BlockingQueue +import java.util.concurrent.LinkedBlockingQueue +import java.util.concurrent.TimeUnit import java.util.concurrent.atomic.AtomicInteger /** @@ -66,15 +71,12 @@ class SrtSender( @Volatile private var running = false - - private var cacheSize = 60 - @Volatile - private var itemsInQueue = 0 + private var cacheSize = 200 private var job: Job? = null private val scope = CoroutineScope(Dispatchers.IO) - private var queue = Channel>(cacheSize) - private var queueFlow = queue.receiveAsFlow() + @Volatile + private var queue: BlockingQueue> = LinkedBlockingQueue(cacheSize) private var audioFramesSent: Long = 0 private var videoFramesSent: Long = 0 var socket: SrtSocket? = null @@ -120,11 +122,9 @@ class SrtSender( checkSendInfo() h26XPacket.createAndSendPacket(h264Buffer, info) { mpegTsPackets -> val result = queue.trySend(mpegTsPackets) - if (!result.isSuccess) { + if (!result) { Log.i(TAG, "Video frame discarded") droppedVideoFrames++ - } else { - itemsInQueue++ } } } @@ -135,21 +135,17 @@ class SrtSender( checkSendInfo() aacPacket.createAndSendPacket(aacBuffer, info) { mpegTsPackets -> val result = queue.trySend(mpegTsPackets) - if (!result.isSuccess) { + if (!result) { Log.i(TAG, "Audio frame discarded") droppedAudioFrames++ - } else { - itemsInQueue++ } } } } fun start() { + queue.clear() setTrackConfig(!commandsManager.videoDisabled, !commandsManager.audioDisabled) - - queue = Channel(cacheSize) - queueFlow = queue.receiveAsFlow() running = true job = scope.launch { //send config @@ -157,9 +153,11 @@ class SrtSender( MpegTsPacket(b, MpegType.PSI, PacketPosition.SINGLE) } queue.trySend(psiPackets) - queueFlow.collect { mpegTsPackets -> - itemsInQueue-- + while (scope.isActive && running) { val error = runCatching { + val mpegTsPackets = runInterruptible { + queue.poll(1, TimeUnit.SECONDS) + } mpegTsPackets.forEach { mpegTsPacket -> var size = 0 size += commandsManager.writeData(mpegTsPacket, socket) @@ -175,7 +173,7 @@ class SrtSender( connectCheckerSrt.onConnectionFailedSrt("Error send packet, " + error.message) } Log.e(TAG, "send error: ", error) - return@collect + return@launch } } } @@ -207,10 +205,6 @@ class SrtSender( suspend fun stop() { running = false - queue.cancel() - itemsInQueue = 0 - queue = Channel(cacheSize) - queueFlow = queue.receiveAsFlow() psiManager.reset() service.clear() mpegTsPacketizer.reset() @@ -222,22 +216,23 @@ class SrtSender( resetDroppedVideoFrames() job?.cancelAndJoin() job = null + queue.clear() } fun hasCongestion(): Boolean { - val size = cacheSize - val remaining = cacheSize - itemsInQueue + val size = queue.size.toFloat() + val remaining = queue.remainingCapacity().toFloat() val capacity = size + remaining return size >= capacity * 0.2f //more than 20% queue used. You could have congestion } fun resizeCache(newSize: Int) { - if (!scope.isActive) { - val tempQueue = Channel>(newSize) - queue = tempQueue - queueFlow = queue.receiveAsFlow() - cacheSize = newSize + if (newSize < queue.size - queue.remainingCapacity()) { + throw RuntimeException("Can't fit current cache inside new cache size") } + val tempQueue: BlockingQueue> = LinkedBlockingQueue(newSize) + queue.drainTo(tempQueue) + queue = tempQueue } fun getCacheSize(): Int { diff --git a/srt/src/main/java/com/pedro/srt/utils/Extensions.kt b/srt/src/main/java/com/pedro/srt/utils/Extensions.kt index 5ca960bf9..51c290293 100644 --- a/srt/src/main/java/com/pedro/srt/utils/Extensions.kt +++ b/srt/src/main/java/com/pedro/srt/utils/Extensions.kt @@ -21,6 +21,16 @@ import kotlinx.coroutines.withContext import java.io.InputStream import java.io.OutputStream import java.nio.ByteBuffer +import java.util.concurrent.BlockingQueue + +inline infix fun BlockingQueue.trySend(item: T): Boolean { + return try { + this.add(item) + true + } catch (e: IllegalStateException) { + false + } +} fun ByteBuffer.toByteArray(): ByteArray { return if (this.hasArray() && !isDirect) {