diff --git a/client-sdk-rust b/client-sdk-rust index 8d4d069a..d60a7ae0 160000 --- a/client-sdk-rust +++ b/client-sdk-rust @@ -1 +1 @@ -Subproject commit 8d4d069ac6b47e610160bf44b8891a730ce91154 +Subproject commit d60a7ae091196e8d9974089c18cdbf7ca45328cf diff --git a/include/livekit/data_track_stream.h b/include/livekit/data_track_stream.h index 5c8c3cc3..16e7a49d 100644 --- a/include/livekit/data_track_stream.h +++ b/include/livekit/data_track_stream.h @@ -18,8 +18,6 @@ #include #include -#include -#include #include #include @@ -32,7 +30,8 @@ namespace livekit { namespace proto { class FfiEvent; -} +class DataTrackStreamReadResponse; +} // namespace proto /** * Represents a pull-based stream of frames from a remote data track. @@ -111,6 +110,9 @@ class LIVEKIT_API DataTrackStream { /// FFI event handler, called by FfiClient. void onFfiEvent(const proto::FfiEvent& event); + /// Handle the immediate response returned by a read request. + void handleReadResponse(const proto::DataTrackStreamReadResponse& response); + /// Push a received DataTrackFrame to the internal storage. void pushFrame(DataTrackFrame&& frame); diff --git a/src/data_track_stream.cpp b/src/data_track_stream.cpp index fc86f3e7..cf76be6c 100644 --- a/src/data_track_stream.cpp +++ b/src/data_track_stream.cpp @@ -16,6 +16,7 @@ #include "livekit/data_track_stream.h" +#include #include #include "data_track.pb.h" @@ -27,6 +28,17 @@ namespace livekit { using proto::FfiEvent; +namespace { + +std::optional terminalErrorFromEos(const proto::DataTrackStreamEOS& eos) { + if (!eos.has_error()) { + return std::nullopt; + } + return SubscribeDataTrackError::fromProto(eos.error()); +} + +} // namespace + DataTrackStream::~DataTrackStream() { close(); } void DataTrackStream::init(FfiHandle subscription_handle) { @@ -36,6 +48,8 @@ void DataTrackStream::init(FfiHandle subscription_handle) { } bool DataTrackStream::read(DataTrackFrame& out) { + proto::DataTrackStreamReadResponse read_response; + { const std::scoped_lock lock(mutex_); if (closed_ || eof_) { @@ -50,9 +64,18 @@ bool DataTrackStream::read(DataTrackFrame& out) { proto::FfiRequest req; auto* msg = req.mutable_data_track_stream_read(); msg->set_stream_handle(subscription_handle); - FfiClient::instance().sendRequest(req); + const proto::FfiResponse resp = FfiClient::instance().sendRequest(req); + if (!resp.has_data_track_stream_read()) { + // should we throw here? This would mean the proto is mismatched + LK_LOG_ERROR("DataTrackStream::read: FFI response missing data_track_stream_read"); + return false; + } + + read_response = resp.data_track_stream_read(); } + handleReadResponse(read_response); + std::unique_lock lock(mutex_); cv_.wait(lock, [this] { return frame_.has_value() || eof_ || closed_; }); @@ -113,13 +136,15 @@ void DataTrackStream::onFfiEvent(const FfiEvent& event) { DataTrackFrame frame = DataTrackFrame::fromOwnedInfo(fr); pushFrame(std::move(frame)); } else if (dts.has_eos()) { - std::optional error; - const auto& eos = dts.eos(); - if (eos.has_error()) { - error = SubscribeDataTrackError::fromProto(eos.error()); - } - pushEos(std::move(error)); + pushEos(terminalErrorFromEos(dts.eos())); + } +} + +void DataTrackStream::handleReadResponse(const proto::DataTrackStreamReadResponse& response) { + if (!response.has_eos_event()) { + return; } + pushEos(terminalErrorFromEos(response.eos_event())); } void DataTrackStream::pushFrame(DataTrackFrame&& frame) { @@ -141,7 +166,7 @@ void DataTrackStream::pushFrame(DataTrackFrame&& frame) { void DataTrackStream::pushEos(std::optional error) { { const std::scoped_lock lock(mutex_); - if (eof_) { + if (closed_ || eof_) { return; } eof_ = true; diff --git a/src/tests/unit/test_data_track_stream.cpp b/src/tests/unit/test_data_track_stream.cpp index 71d14e8c..1bcc8d8a 100644 --- a/src/tests/unit/test_data_track_stream.cpp +++ b/src/tests/unit/test_data_track_stream.cpp @@ -33,6 +33,10 @@ class DataTrackStreamTest : public ::testing::Test { static void pushEvent(DataTrackStream& stream, const proto::FfiEvent& event) { stream.onFfiEvent(event); } + static void handleReadResponse(DataTrackStream& stream, const proto::DataTrackStreamReadResponse& response) { + stream.handleReadResponse(response); + } + static proto::FfiEvent makeEosEvent(std::optional code = std::nullopt, const std::string& message = {}) { proto::FfiEvent event; @@ -47,6 +51,18 @@ class DataTrackStreamTest : public ::testing::Test { return event; } + static proto::DataTrackStreamReadResponse makeEosReadResponse( + std::optional code = std::nullopt, const std::string& message = {}) { + proto::DataTrackStreamReadResponse response; + auto* eos = response.mutable_eos_event(); + if (code.has_value()) { + auto* error = eos->mutable_error(); + error->set_code(code.value()); + error->set_message(message); + } + return response; + } + static proto::FfiEvent makeAudioStreamEvent() { proto::FfiEvent event; event.mutable_audio_stream_event()->set_stream_handle(0); @@ -71,6 +87,15 @@ TEST_F(DataTrackStreamTest, TerminalErrorEmptyForNormalEos) { EXPECT_FALSE(stream->terminalError().has_value()); } +TEST_F(DataTrackStreamTest, ReadResponseNormalEosEndsStreamWithoutTerminalError) { + auto stream = makeStream(); + handleReadResponse(*stream, makeEosReadResponse()); + + DataTrackFrame frame; + EXPECT_FALSE(stream->read(frame)); + EXPECT_FALSE(stream->terminalError().has_value()); +} + TEST_F(DataTrackStreamTest, TerminalErrorStoredForSubscribeFailureEos) { auto stream = makeStream(); pushEvent(*stream, makeEosEvent(proto::SUBSCRIBE_DATA_TRACK_ERROR_CODE_UNPUBLISHED, @@ -85,6 +110,16 @@ TEST_F(DataTrackStreamTest, TerminalErrorStoredForSubscribeFailureEos) { EXPECT_EQ(error->message, "track unpublished before subscription completed"); } +TEST_F(DataTrackStreamTest, ReadResponseSubscribeFailureEosStoresTerminalError) { + auto stream = makeStream(); + handleReadResponse(*stream, makeEosReadResponse(proto::SUBSCRIBE_DATA_TRACK_ERROR_CODE_UNPUBLISHED, + "track unpublished before read completed")); + + DataTrackFrame frame; + EXPECT_FALSE(stream->read(frame)); + expectTerminalError(*stream, SubscribeDataTrackErrorCode::UNPUBLISHED, "track unpublished before read completed"); +} + TEST_F(DataTrackStreamTest, CloseBeforeEosSuppressesLaterTerminalError) { auto stream = makeStream(); stream->close();