Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion client-sdk-rust
8 changes: 5 additions & 3 deletions include/livekit/data_track_stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,6 @@

#include <condition_variable>
#include <cstdint>
#include <deque>
#include <memory>
#include <mutex>
#include <optional>

Expand All @@ -32,7 +30,8 @@ namespace livekit {

namespace proto {
class FfiEvent;
}
class DataTrackStreamReadResponse;
} // namespace proto

/**
* Represents a pull-based stream of frames from a remote data track.
Expand Down Expand Up @@ -111,6 +110,9 @@ class LIVEKIT_API DataTrackStream {
/// FFI event handler, called by FfiClient.
void onFfiEvent(const proto::FfiEvent& event);

/// Handle the immediate response returned by a read request.
void handleReadResponse(const proto::DataTrackStreamReadResponse& response);

/// Push a received DataTrackFrame to the internal storage.
void pushFrame(DataTrackFrame&& frame);

Expand Down
41 changes: 33 additions & 8 deletions src/data_track_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@

#include "livekit/data_track_stream.h"

#include <optional>
#include <utility>

#include "data_track.pb.h"
Expand All @@ -27,6 +28,17 @@ namespace livekit {

using proto::FfiEvent;

namespace {

std::optional<SubscribeDataTrackError> terminalErrorFromEos(const proto::DataTrackStreamEOS& eos) {
if (!eos.has_error()) {
return std::nullopt;
}
return SubscribeDataTrackError::fromProto(eos.error());
}

} // namespace

DataTrackStream::~DataTrackStream() { close(); }

void DataTrackStream::init(FfiHandle subscription_handle) {
Expand All @@ -36,6 +48,8 @@ void DataTrackStream::init(FfiHandle subscription_handle) {
}

bool DataTrackStream::read(DataTrackFrame& out) {
proto::DataTrackStreamReadResponse read_response;

{
const std::scoped_lock<std::mutex> lock(mutex_);
if (closed_ || eof_) {
Expand All @@ -50,9 +64,18 @@ bool DataTrackStream::read(DataTrackFrame& out) {
proto::FfiRequest req;
auto* msg = req.mutable_data_track_stream_read();
msg->set_stream_handle(subscription_handle);
FfiClient::instance().sendRequest(req);
const proto::FfiResponse resp = FfiClient::instance().sendRequest(req);
if (!resp.has_data_track_stream_read()) {
// should we throw here? This would mean the proto is mismatched
LK_LOG_ERROR("DataTrackStream::read: FFI response missing data_track_stream_read");
return false;
}

read_response = resp.data_track_stream_read();
}

handleReadResponse(read_response);

std::unique_lock<std::mutex> lock(mutex_);
cv_.wait(lock, [this] { return frame_.has_value() || eof_ || closed_; });

Expand Down Expand Up @@ -113,13 +136,15 @@ void DataTrackStream::onFfiEvent(const FfiEvent& event) {
DataTrackFrame frame = DataTrackFrame::fromOwnedInfo(fr);
pushFrame(std::move(frame));
} else if (dts.has_eos()) {
std::optional<SubscribeDataTrackError> error;
const auto& eos = dts.eos();
if (eos.has_error()) {
error = SubscribeDataTrackError::fromProto(eos.error());
}
pushEos(std::move(error));
pushEos(terminalErrorFromEos(dts.eos()));
}
}

void DataTrackStream::handleReadResponse(const proto::DataTrackStreamReadResponse& response) {
if (!response.has_eos_event()) {
return;
}
pushEos(terminalErrorFromEos(response.eos_event()));
}

void DataTrackStream::pushFrame(DataTrackFrame&& frame) {
Expand All @@ -141,7 +166,7 @@ void DataTrackStream::pushFrame(DataTrackFrame&& frame) {
void DataTrackStream::pushEos(std::optional<SubscribeDataTrackError> error) {
{
const std::scoped_lock<std::mutex> lock(mutex_);
if (eof_) {
if (closed_ || eof_) {
return;
}
eof_ = true;
Expand Down
35 changes: 35 additions & 0 deletions src/tests/unit/test_data_track_stream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,10 @@ class DataTrackStreamTest : public ::testing::Test {

static void pushEvent(DataTrackStream& stream, const proto::FfiEvent& event) { stream.onFfiEvent(event); }

static void handleReadResponse(DataTrackStream& stream, const proto::DataTrackStreamReadResponse& response) {
stream.handleReadResponse(response);
}

static proto::FfiEvent makeEosEvent(std::optional<proto::SubscribeDataTrackErrorCode> code = std::nullopt,
const std::string& message = {}) {
proto::FfiEvent event;
Expand All @@ -47,6 +51,18 @@ class DataTrackStreamTest : public ::testing::Test {
return event;
}

static proto::DataTrackStreamReadResponse makeEosReadResponse(
std::optional<proto::SubscribeDataTrackErrorCode> code = std::nullopt, const std::string& message = {}) {
proto::DataTrackStreamReadResponse response;
auto* eos = response.mutable_eos_event();
if (code.has_value()) {
auto* error = eos->mutable_error();
error->set_code(code.value());
error->set_message(message);
}
return response;
}

static proto::FfiEvent makeAudioStreamEvent() {
proto::FfiEvent event;
event.mutable_audio_stream_event()->set_stream_handle(0);
Expand All @@ -71,6 +87,15 @@ TEST_F(DataTrackStreamTest, TerminalErrorEmptyForNormalEos) {
EXPECT_FALSE(stream->terminalError().has_value());
}

TEST_F(DataTrackStreamTest, ReadResponseNormalEosEndsStreamWithoutTerminalError) {
auto stream = makeStream();
handleReadResponse(*stream, makeEosReadResponse());

DataTrackFrame frame;
EXPECT_FALSE(stream->read(frame));
EXPECT_FALSE(stream->terminalError().has_value());
}

TEST_F(DataTrackStreamTest, TerminalErrorStoredForSubscribeFailureEos) {
auto stream = makeStream();
pushEvent(*stream, makeEosEvent(proto::SUBSCRIBE_DATA_TRACK_ERROR_CODE_UNPUBLISHED,
Expand All @@ -85,6 +110,16 @@ TEST_F(DataTrackStreamTest, TerminalErrorStoredForSubscribeFailureEos) {
EXPECT_EQ(error->message, "track unpublished before subscription completed");
}

TEST_F(DataTrackStreamTest, ReadResponseSubscribeFailureEosStoresTerminalError) {
auto stream = makeStream();
handleReadResponse(*stream, makeEosReadResponse(proto::SUBSCRIBE_DATA_TRACK_ERROR_CODE_UNPUBLISHED,
"track unpublished before read completed"));

DataTrackFrame frame;
EXPECT_FALSE(stream->read(frame));
expectTerminalError(*stream, SubscribeDataTrackErrorCode::UNPUBLISHED, "track unpublished before read completed");
}

TEST_F(DataTrackStreamTest, CloseBeforeEosSuppressesLaterTerminalError) {
auto stream = makeStream();
stream->close();
Expand Down
Loading