Skip to content

Commit

Permalink
fix: Fixed some race conditions when handling process exit and settin…
Browse files Browse the repository at this point in the history
…g process result
  • Loading branch information
vemilyus committed May 10, 2024
1 parent 14a3a6b commit ff485f3
Show file tree
Hide file tree
Showing 7 changed files with 49 additions and 42 deletions.
23 changes: 12 additions & 11 deletions src/main/kotlin/io/v47/jaffree/ffmpeg/FFmpegProcessHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ internal class FFmpegProcessHandler(
private val outputListener: OutputListener?
) : LinesProcessHandler<FFmpegResult>(), ProcessAccessor {
private lateinit var processAccess: ProcessAccess
private var possibleResult: FFmpegResult? = null

override fun setProcessAccess(processAccess: ProcessAccess) {
this.processAccess = processAccess
Expand All @@ -44,9 +45,9 @@ internal class FFmpegProcessHandler(
if ("frame=" in line && "bitrate=" in line && "speed=" in line)
return

val possibleResult = ParseUtil.parseResult(line)
if (possibleResult != null) {
result = possibleResult
val parseResult = ParseUtil.parseResult(line)
if (parseResult != null) {
possibleResult = parseResult
finalErrorMessage = null
return
}
Expand All @@ -64,13 +65,13 @@ internal class FFmpegProcessHandler(
appendOrLogLine(ffmpegLogger, line.trim())
}

override fun onExit() {
override fun onExit(exitCode: Int) {
processLastLogMessage(ffmpegLogger, ::handleLogMessage)

if (finalErrorMessage != null)
exception = JaffreeException(finalErrorMessage)
else if (result == null)
result = FFmpegResult(null, null, null, null, null, null)
if (exitCode != 0 && finalErrorMessage != null)
finishExceptionally(JaffreeException(finalErrorMessage))
else
finish(possibleResult ?: FFmpegResult(null, null, null, null, null, null))
}

private fun handleLogMessage(logLevel: LogLevel, message: String) {
Expand All @@ -81,9 +82,9 @@ internal class FFmpegProcessHandler(
ffmpegLogger.warn("Exception in output listener", x)
}

val possibleResult = ParseUtil.parseResult(message)
if (possibleResult != null) {
result = possibleResult
val parseResult = ParseUtil.parseResult(message)
if (parseResult != null) {
possibleResult = parseResult
finalErrorMessage = null
}
}
Expand Down
10 changes: 5 additions & 5 deletions src/main/kotlin/io/v47/jaffree/ffprobe/FFprobeProcessHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -56,14 +56,14 @@ internal class FFprobeProcessHandler(
contentWritten = true
}

override fun onExit() {
override fun onExit(exitCode: Int) {
stdOutputWriter.close()
stdOutputContent.close()

processLastLogMessage(ffprobeLogger)

if (finalErrorMessage != null) {
exception = JaffreeException(finalErrorMessage)
if (exitCode != 0 && finalErrorMessage != null) {
finishExceptionally(JaffreeException(finalErrorMessage))
return
}

Expand All @@ -73,9 +73,9 @@ internal class FFprobeProcessHandler(

parser.parse(input)
}.onFailure { x ->
exception = JaffreeException("Failed to parse probe data", x)
finishExceptionally(JaffreeException("Failed to parse probe data", x))
}.onSuccess { probeData ->
result = FFprobeResult(probeData)
finish(FFprobeResult(probeData))
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ internal class DelegatingProcessHandler(
private val delegate: JaffreeProcessHandler<*>,
) : DefaultProcessHandler() {
override fun onExit(exitCode: Int) {
delegate.onExit()
delegate.onExit(exitCode)
}

override fun onStdout(buffer: ByteBuffer, closed: Boolean) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,11 @@ import com.github.kokorin.jaffree.log.LogMessage
import java.nio.ByteBuffer

internal interface JaffreeProcessHandler<R> {
val result: R?
val exception: Exception?

val errorLogMessages: List<LogMessage>

fun onStderr(buffer: ByteBuffer, closed: Boolean)
fun onStdout(buffer: ByteBuffer, closed: Boolean)
fun onExit()
fun onExit(exitCode: Int)

fun await(): Result<R>
}
37 changes: 21 additions & 16 deletions src/main/kotlin/io/v47/jaffree/process/LinesProcessHandler.kt
Original file line number Diff line number Diff line change
Expand Up @@ -20,24 +20,13 @@ import com.github.kokorin.jaffree.LogLevel
import com.github.kokorin.jaffree.log.LogMessage
import org.slf4j.Logger
import java.nio.ByteBuffer
import kotlin.concurrent.Volatile
import java.util.concurrent.CompletableFuture

private const val CARRIAGE_RETURN = '\r'.code.toByte()
private const val NEWLINE = '\n'.code.toByte()

internal abstract class LinesProcessHandler<R> : JaffreeProcessHandler<R> {
@Volatile
override var result: R? = null
protected set

@Volatile
override var exception: Exception? = null
protected set(value) {
if (field == null)
field = value
else
value?.let { field!!.addSuppressed(it) }
}
private val result = CompletableFuture<R>()

override val errorLogMessages = mutableListOf<LogMessage>()

Expand Down Expand Up @@ -94,6 +83,7 @@ internal abstract class LinesProcessHandler<R> : JaffreeProcessHandler<R> {
val message = "$lastLogMessageBuilder"

when (logLevel) {
LogLevel.QUIET,
LogLevel.TRACE -> logger.trace(message)

LogLevel.VERBOSE,
Expand All @@ -102,7 +92,6 @@ internal abstract class LinesProcessHandler<R> : JaffreeProcessHandler<R> {
LogLevel.INFO -> logger.info(message)
LogLevel.WARNING -> logger.warn(message)

LogLevel.QUIET,
LogLevel.PANIC,
LogLevel.FATAL,
LogLevel.ERROR -> {
Expand All @@ -111,12 +100,28 @@ internal abstract class LinesProcessHandler<R> : JaffreeProcessHandler<R> {
}
}

if (logLevel.isErrorOrHigher)
finalErrorMessage = message
finalErrorMessage =
if (logLevel.isErrorOrHigher)
message
else
null

additionalAction?.invoke(logLevel, message)
}
}

override fun await(): Result<R> =
runCatching {
result.get()
}

protected fun finish(value: R) {
result.complete(value)
}

protected fun finishExceptionally(ex: Exception) {
result.completeExceptionally(ex)
}
}

private fun addToBytes(source: ByteBuffer, target: MutableList<ByteArray>): Boolean {
Expand Down
8 changes: 5 additions & 3 deletions src/main/kotlin/io/v47/jaffree/process/ProcessRunner.kt
Original file line number Diff line number Diff line change
Expand Up @@ -100,16 +100,18 @@ internal class ProcessRunner<T>(
it.get()
}

val result = processHandler.await()

if (status != 0)
throw JaffreeAbnormalExitException(
errorExceptionMessage(status),
processHandler.errorLogMessages
).also {
processHandler.exception?.let { x -> it.initCause(x) }
if (result.isFailure)
it.addSuppressed(result.exceptionOrNull()!!)
}

processHandler.result
?: throw NullPointerException("The result must not be null")
result.getOrThrow()
}.whenComplete { result, x ->
if (x != null)
logger.trace(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ internal class VersionInfoProcessHandler : LinesProcessHandler<VersionInfo>() {
}
}

override fun onExit() {
result = createVersionInfo()
override fun onExit(exitCode: Int) {
finish(createVersionInfo())
}

private fun createVersionInfo(): VersionInfo {
Expand Down

0 comments on commit ff485f3

Please sign in to comment.