diff --git a/CMakeLists.txt b/CMakeLists.txt index fa5762e..cc5a94e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -6,6 +6,8 @@ set(CMAKE_CXX_STANDARD 17) set(CMAKE_CXX_STANDARD_REQUIRED ON) set(CMAKE_POSITION_INDEPENDENT_CODE ON) +############## Protobuf ################ + # ---- Protobuf (FFI protos) ---- set(FFI_PROTO_DIR ${CMAKE_SOURCE_DIR}/client-sdk-rust/livekit-ffi/protocol) set(FFI_PROTO_FILES @@ -25,7 +27,7 @@ set(FFI_PROTO_FILES set(PROTO_BINARY_DIR ${CMAKE_BINARY_DIR}/generated) file(MAKE_DIRECTORY ${PROTO_BINARY_DIR}) -find_package(Protobuf REQUIRED) # protobuf::libprotobuf, protoc +find_package(Protobuf REQUIRED) find_package(absl CONFIG REQUIRED) # Object library that owns generated .pb.cc/.pb.h @@ -44,6 +46,44 @@ protobuf_generate( IMPORT_DIRS ${FFI_PROTO_DIR} ) +########### auto-gen build.h ####################### + +# Where to place the generated header +set(GENERATED_INCLUDE_DIR "${CMAKE_CURRENT_BINARY_DIR}/generated") +file(MAKE_DIRECTORY "${GENERATED_INCLUDE_DIR}") + +# Try to capture git commit; fall back to "unknown" if git isn't available or repo isn't present. +set(GIT_COMMIT "unknown") +find_package(Git QUIET) +if(GIT_FOUND AND EXISTS "${CMAKE_SOURCE_DIR}/.git") + execute_process( + COMMAND "${GIT_EXECUTABLE}" rev-parse --short HEAD + WORKING_DIRECTORY "${CMAKE_SOURCE_DIR}" + OUTPUT_VARIABLE GIT_COMMIT + OUTPUT_STRIP_TRAILING_WHITESPACE + ERROR_QUIET + ) +endif() + +# Build timestamp +string(TIMESTAMP BUILD_DATE "%Y-%m-%d %H:%M:%S") + +# Comment shown at the top of the generated header +set(GENERATED_COMMENT "This file was auto-generated by CMake on ${LIVEKIT_BUILD_DATE}. Do NOT edit manually. Edit build.h.in instead.") + +# Generate the header from the template +configure_file( + "${CMAKE_CURRENT_SOURCE_DIR}/build.h.in" + "${GENERATED_INCLUDE_DIR}/build.h" + @ONLY +) + +# Include the directory for the headers +include_directories("${GENERATED_INCLUDE_DIR}") + + +########### Livekit Rust SDK ####################### + # Find cargo find_program(CARGO_EXECUTABLE NAMES cargo REQUIRED) @@ -104,10 +144,19 @@ add_custom_target(build_rust_ffi ALL # ---- C++ wrapper library ---- add_library(livekit include/livekit/room.h + include/livekit/ffi_handle.h include/livekit/ffi_client.h + include/livekit/participant.h include/livekit/livekit.h + include/livekit/stats.h + include/livekit/track.h + src/ffi_handle.cpp src/ffi_client.cpp src/room.cpp + src/room_event_converter.cpp + src/room_event_converter.h + src/stats.cpp + src/track.cpp ) # Add generated proto objects to the wrapper @@ -115,8 +164,10 @@ target_sources(livekit PRIVATE $) target_include_directories(livekit PUBLIC ${CMAKE_SOURCE_DIR}/include + ${CMAKE_SOURCE_DIR}/src ${RUST_ROOT}/livekit-ffi/include ${PROTO_BINARY_DIR} + ${GENERATED_INCLUDE_DIR} ) target_link_libraries(livekit diff --git a/README.md b/README.md index a5a7903..0b5f2e0 100644 --- a/README.md +++ b/README.md @@ -68,6 +68,7 @@ curl https://sh.rustup.rs -sSf | sh ## 🛠️ Development Tips ### Update Rust version ```bash +cd client-sdk-cpp git fetch origin git switch -c try-rust-main origin/main @@ -76,6 +77,7 @@ git submodule sync --recursive git submodule update --init --recursive --checkout # Now, in case the nested submodule under yuv-sys didn’t materialize, force it explicitly: +cd .. git -C client-sdk-rust/yuv-sys submodule sync --recursive git -C client-sdk-rust/yuv-sys submodule update --init --recursive --checkout @@ -90,8 +92,13 @@ cargo build -p yuv-sys -vv ``` ### Full clean (Rust + C++ build folders) - In some cases, you may need to perform a full clean that deletes all build artifacts from both the Rust and C++ folders: ```bash ./build.sh clean-all +``` + +### Clang format +CPP SDK is using clang C++ format +```bash +brew install clang-format ``` \ No newline at end of file diff --git a/build.h.in b/build.h.in new file mode 100644 index 0000000..eb7e8c3 --- /dev/null +++ b/build.h.in @@ -0,0 +1,20 @@ +// =============================================================== +// ⚠️ @GENERATED_COMMENT@ +// =============================================================== + +#pragma once + +#define LIVEKIT_BUILD_VERSION "0.1.0" // Manually maintained +#define LIVEKIT_BUILD_FLAVOR "cpp" + +#ifdef DEBUG +#define LIVEKIT_BUILD_SUFFIX "-debug" +#else +#define LIVEKIT_BUILD_SUFFIX "-release" +#endif + +// Follow params are auto generated by CMakeLists.txt +#define LIVEKIT_BUILD_DATE "@BUILD_DATE@" +#define LIVEKIT_BUILD_COMMIT "@GIT_COMMIT@" + +#define LIVEKIT_BUILD_VERSION_FULL LIVEKIT_BUILD_VERSION LIVEKIT_BUILD_SUFFIX \ No newline at end of file diff --git a/examples/simple_room/main.cpp b/examples/simple_room/main.cpp index 045164a..71f788a 100644 --- a/examples/simple_room/main.cpp +++ b/examples/simple_room/main.cpp @@ -1,116 +1,182 @@ -#include -#include -#include -#include -#include #include #include #include +#include +#include +#include +#include +#include #include "livekit/livekit.h" +// TODO(shijing), remove this livekit_ffi.h as it should be internal only. +#include "livekit_ffi.h" + using namespace livekit; namespace { + std::atomic g_running{true}; -void print_usage(const char* prog) { - std::cerr << "Usage:\n" - << " " << prog << " \n" - << "or:\n" - << " " << prog << " --url= --token=\n" - << " " << prog << " --url --token \n\n" - << "Env fallbacks:\n" - << " LIVEKIT_URL, LIVEKIT_TOKEN\n"; +void print_usage(const char *prog) { + std::cerr << "Usage:\n" + << " " << prog << " \n" + << "or:\n" + << " " << prog << " --url= --token=\n" + << " " << prog << " --url --token \n\n" + << "Env fallbacks:\n" + << " LIVEKIT_URL, LIVEKIT_TOKEN\n"; } -void handle_sigint(int) { - g_running = false; -} +void handle_sigint(int) { g_running = false; } -bool parse_args(int argc, char* argv[], std::string& url, std::string& token) { - // 1) --help - for (int i = 1; i < argc; ++i) { - std::string a = argv[i]; - if (a == "-h" || a == "--help") { - return false; - } +bool parse_args(int argc, char *argv[], std::string &url, std::string &token) { + // 1) --help + for (int i = 1; i < argc; ++i) { + std::string a = argv[i]; + if (a == "-h" || a == "--help") { + return false; } - - // 2) flags: --url= / --token= or split form - auto get_flag_value = [&](const std::string& name, int& i) -> std::string { - std::string arg = argv[i]; - const std::string eq = name + "="; - if (arg.rfind(name, 0) == 0) { // starts with name - if (arg.size() > name.size() && arg[name.size()] == '=') { - return arg.substr(eq.size()); - } else if (i + 1 < argc) { - return std::string(argv[++i]); - } - } - return {}; - }; - - for (int i = 1; i < argc; ++i) { - const std::string a = argv[i]; - if (a.rfind("--url", 0) == 0) { - auto v = get_flag_value("--url", i); - if (!v.empty()) url = v; - } else if (a.rfind("--token", 0) == 0) { - auto v = get_flag_value("--token", i); - if (!v.empty()) token = v; - } + } + + // 2) flags: --url= / --token= or split form + auto get_flag_value = [&](const std::string &name, int &i) -> std::string { + std::string arg = argv[i]; + const std::string eq = name + "="; + if (arg.rfind(name, 0) == 0) { // starts with name + if (arg.size() > name.size() && arg[name.size()] == '=') { + return arg.substr(eq.size()); + } else if (i + 1 < argc) { + return std::string(argv[++i]); + } } - - // 3) positional if still empty - if (url.empty() || token.empty()) { - std::vector pos; - for (int i = 1; i < argc; ++i) { - std::string a = argv[i]; - if (a.rfind("--", 0) == 0) continue; // skip flags we already parsed - pos.push_back(std::move(a)); - } - if (pos.size() >= 2) { - if (url.empty()) url = pos[0]; - if (token.empty()) token = pos[1]; - } + return {}; + }; + + for (int i = 1; i < argc; ++i) { + const std::string a = argv[i]; + if (a.rfind("--url", 0) == 0) { + auto v = get_flag_value("--url", i); + if (!v.empty()) + url = v; + } else if (a.rfind("--token", 0) == 0) { + auto v = get_flag_value("--token", i); + if (!v.empty()) + token = v; } + } - // 4) env fallbacks - if (url.empty()) { - const char* e = std::getenv("LIVEKIT_URL"); - if (e) url = e; + // 3) positional if still empty + if (url.empty() || token.empty()) { + std::vector pos; + for (int i = 1; i < argc; ++i) { + std::string a = argv[i]; + if (a.rfind("--", 0) == 0) + continue; // skip flags we already parsed + pos.push_back(std::move(a)); } - if (token.empty()) { - const char* e = std::getenv("LIVEKIT_TOKEN"); - if (e) token = e; + if (pos.size() >= 2) { + if (url.empty()) + url = pos[0]; + if (token.empty()) + token = pos[1]; } - - return !(url.empty() || token.empty()); + } + + // 4) env fallbacks + if (url.empty()) { + const char *e = std::getenv("LIVEKIT_URL"); + if (e) + url = e; + } + if (token.empty()) { + const char *e = std::getenv("LIVEKIT_TOKEN"); + if (e) + token = e; + } + + return !(url.empty() || token.empty()); } -} // namespace -int main(int argc, char* argv[]) { - std::string url, token; - if (!parse_args(argc, argv, url, token)) { - print_usage(argv[0]); - return 1; - } - - std::cout << "Connecting to: " << url << std::endl; - - // Handle Ctrl-C to exit the idle loop - std::signal(SIGINT, handle_sigint); +class SimpleRoomDelegate : public livekit::RoomDelegate { +public: + void onParticipantConnected( + livekit::Room & /*room*/, + const livekit::ParticipantConnectedEvent &ev) override { + std::cout << "[Room] participant connected: identity=" << ev.identity + << " name=" << ev.name << "\n"; + } + + void onTrackSubscribed(livekit::Room & /*room*/, + const livekit::TrackSubscribedEvent &ev) override { + std::cout << "[Room] track subscribed: participant_identity=" + << ev.participant_identity << " track_sid=" << ev.track_sid + << " name=" << ev.track_name << "\n"; + // TODO(shijing): when you expose Track kind/source here, you can check + // whether this is a video track and start a VideoStream-like consumer. Use + // the python code as reference. + } +}; - Room room{}; - room.Connect(url.c_str(), token.c_str()); - - // TODO: replace with proper event loop / callbacks. - // For now, keep the app alive until Ctrl-C. - while (g_running.load()) { - std::this_thread::sleep_for(std::chrono::milliseconds(100)); - } +} // namespace - std::cout << "Exiting.\n"; - return 0; +int main(int argc, char *argv[]) { + std::string url, token; + if (!parse_args(argc, argv, url, token)) { + print_usage(argv[0]); + return 1; + } + + // Exit if token and url are not set + if (url.empty() || token.empty()) { + std::cerr << "LIVEKIT_URL and LIVEKIT_TOKEN (or CLI args) are required\n"; + return 1; + } + + std::cout << "Connecting to: " << url << std::endl; + + // Handle Ctrl-C to exit the idle loop + std::signal(SIGINT, handle_sigint); + + livekit::Room room{}; + SimpleRoomDelegate delegate; + room.setDelegate(&delegate); + + bool res = room.Connect(url, token); + std::cout << "Connect result is " << std::boolalpha << res << std::endl; + if (!res) { + std::cerr << "Failed to connect to room\n"; + FfiClient::instance().shutdown(); + return 1; + } + + auto info = room.room_info(); + std::cout << "Connected to room:\n" + << " SID: " << (info.sid ? *info.sid : "(none)") << "\n" + << " Name: " << info.name << "\n" + << " Metadata: " << info.metadata << "\n" + << " Max participants: " << info.max_participants << "\n" + << " Num participants: " << info.num_participants << "\n" + << " Num publishers: " << info.num_publishers << "\n" + << " Active recording: " << (info.active_recording ? "yes" : "no") + << "\n" + << " Empty timeout (s): " << info.empty_timeout << "\n" + << " Departure timeout (s): " << info.departure_timeout << "\n" + << " Lossy DC low threshold: " + << info.lossy_dc_buffered_amount_low_threshold << "\n" + << " Reliable DC low threshold: " + << info.reliable_dc_buffered_amount_low_threshold << "\n" + << " Creation time (ms): " << info.creation_time << "\n"; + + // TOD(shijing), implement local and remoteParticipants in the room + + // Keep the app alive until Ctrl-C so we continue receiving events, + // similar to asyncio.run(main()) keeping the loop running. + while (g_running.load()) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + + FfiClient::instance().shutdown(); + std::cout << "Exiting.\n"; + return 0; } diff --git a/include/livekit/ffi_client.h b/include/livekit/ffi_client.h index cdded54..7d259b9 100644 --- a/include/livekit/ffi_client.h +++ b/include/livekit/ffi_client.h @@ -1,7 +1,7 @@ /* * Copyright 2023 LiveKit * - * Licensed under the Apache License, Version 2.0 (the “License”); + * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * @@ -17,64 +17,107 @@ #ifndef LIVEKIT_FFI_CLIENT_H #define LIVEKIT_FFI_CLIENT_H +#include +#include #include #include -#include #include #include -#include "ffi.pb.h" - -namespace livekit -{ - using FfiCallbackFn = void(*)(const uint8_t*, size_t); - extern "C" void livekit_ffi_initialize(FfiCallbackFn cb, - bool capture_logs, - const char* sdk, - const char* sdk_version); - - extern "C" void LivekitFfiCallback(const uint8_t *buf, size_t len); - - - // The FfiClient is used to communicate with the FFI interface of the Rust SDK - // We use the generated protocol messages to facilitate the communication - class FfiClient - { - public: - using ListenerId = int; - using Listener = std::function; - - FfiClient(const FfiClient&) = delete; - FfiClient& operator=(const FfiClient&) = delete; - - static FfiClient& getInstance() { - static FfiClient instance; - return instance; - } - - ListenerId AddListener(const Listener& listener); - void RemoveListener(ListenerId id); - - proto::FfiResponse SendRequest(const proto::FfiRequest& request)const; - - private: - std::unordered_map listeners_; - ListenerId nextListenerId = 1; - mutable std::mutex lock_; - - FfiClient(); - ~FfiClient() = default; - - void PushEvent(const proto::FfiEvent& event) const; - friend void LivekitFfiCallback(const uint8_t *buf, size_t len); - }; - - struct FfiHandle { - uintptr_t handle; - - FfiHandle(uintptr_t handle); - ~FfiHandle(); - }; -} +#include "livekit/stats.h" + +namespace livekit { + +namespace proto { +class FfiEvent; +class FfiResponse; +class FfiRequest; +class RoomInfo; +} // namespace proto + +using FfiCallbackFn = void (*)(const uint8_t *, size_t); +extern "C" void livekit_ffi_initialize(FfiCallbackFn cb, bool capture_logs, + const char *sdk, + const char *sdk_version); + +extern "C" void livekit_ffi_dispose(); + +extern "C" void LivekitFfiCallback(const uint8_t *buf, size_t len); + +// The FfiClient is used to communicate with the FFI interface of the Rust SDK +// We use the generated protocol messages to facilitate the communication +class FfiClient { +public: + using ListenerId = int; + using Listener = std::function; + using AsyncId = std::uint64_t; + + FfiClient(const FfiClient &) = delete; + FfiClient &operator=(const FfiClient &) = delete; + FfiClient(FfiClient &&) = delete; + FfiClient &operator=(FfiClient &&) = delete; + + static FfiClient &instance() noexcept { + static FfiClient instance; + return instance; + } + + // Called only once. After calling shutdown(), no further calls into FfiClient + // are valid. + void shutdown() noexcept; + + ListenerId AddListener(const Listener &listener); + void RemoveListener(ListenerId id); + + // Room APIs + std::future connectAsync(const std::string &url, + const std::string &token); + + // Track APIs + std::future> getTrackStatsAsync(uintptr_t track_handle); + + // Generic function for sending a request to the Rust FFI. + // Note: For asynchronous requests, use the dedicated async functions instead + // of SendRequest. + proto::FfiResponse SendRequest(const proto::FfiRequest &request) const; + +private: + // Base class for type-erased pending ops + struct PendingBase { + virtual ~PendingBase() = default; + virtual bool matches(const proto::FfiEvent &event) const = 0; + virtual void complete(const proto::FfiEvent &event) = 0; + }; + template struct Pending : PendingBase { + std::promise promise; + std::function match; + std::function &)> handler; + + bool matches(const proto::FfiEvent &event) const override { + return match && match(event); + } + + void complete(const proto::FfiEvent &event) override { + handler(event, promise); + } + }; + + template + std::future registerAsync( + std::function match, + std::function &)> handler); + + std::unordered_map listeners_; + ListenerId nextListenerId = 1; + mutable std::mutex lock_; + mutable std::vector> pending_; + + FfiClient(); + ~FfiClient() = default; + + void PushEvent(const proto::FfiEvent &event) const; + friend void LivekitFfiCallback(const uint8_t *buf, size_t len); +}; +} // namespace livekit #endif /* LIVEKIT_FFI_CLIENT_H */ diff --git a/include/livekit/ffi_handle.h b/include/livekit/ffi_handle.h new file mode 100644 index 0000000..c710454 --- /dev/null +++ b/include/livekit/ffi_handle.h @@ -0,0 +1,61 @@ +/* + * Copyright 2023 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an “AS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace livekit { + +/** + * @brief RAII wrapper for an FFI handle (uintptr_t) coming from Rust. + * + * Ensures that the handle is automatically released via + * livekit_ffi_drop_handle() when the object goes out of scope. + */ +class FfiHandle { +public: + explicit FfiHandle(uintptr_t h = 0) noexcept; + ~FfiHandle(); + + // Non-copyable + FfiHandle(const FfiHandle &) = delete; + FfiHandle &operator=(const FfiHandle &) = delete; + + // Movable + FfiHandle(FfiHandle &&other) noexcept; + FfiHandle &operator=(FfiHandle &&other) noexcept; + + // Replace the current handle with a new one, dropping the old if needed + void reset(uintptr_t new_handle = 0) noexcept; + + // Release ownership of the handle without dropping it + [[nodiscard]] uintptr_t release() noexcept; + + // Whether the handle is valid (non-zero) + [[nodiscard]] bool valid() const noexcept; + + // Get the raw handle value + [[nodiscard]] uintptr_t get() const noexcept; + + // Allow `if (handle)` syntax + explicit operator bool() const noexcept { return valid(); } + +private: + uintptr_t handle_{0}; +}; + +} // namespace livekit \ No newline at end of file diff --git a/include/livekit/livekit.h b/include/livekit/livekit.h index bc29d09..bfb2287 100644 --- a/include/livekit/livekit.h +++ b/include/livekit/livekit.h @@ -1,7 +1,7 @@ /* * Copyright 2023 LiveKit * - * Licensed under the Apache License, Version 2.0 (the “License”); + * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * @@ -15,3 +15,4 @@ */ #include "room.h" +#include "room_delegate.h" diff --git a/include/livekit/participant.h b/include/livekit/participant.h new file mode 100644 index 0000000..b09f4e8 --- /dev/null +++ b/include/livekit/participant.h @@ -0,0 +1,88 @@ +/* + * Copyright 2023 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an “AS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include + +#include "livekit/ffi_handle.h" +#include "livekit_ffi.h" + +namespace livekit { + +enum class ParticipantKind { Standard = 0, Ingress, Egress, Sip, Agent }; + +enum class DisconnectReason { + Unknown = 0, + ClientInitiated, + DuplicateIdentity, + ServerShutdown, + ParticipantRemoved, + RoomDeleted, + StateMismatch, + JoinFailure, + Migration, + SignalClose, + RoomClosed, + UserUnavailable, + UserRejected, + SipTrunkFailure, + ConnectionTimeout, + MediaFailure +}; + +class Participant { +public: + // TODO, consider holding a weak ptr of FfiHandle if it is useful. + Participant(std::weak_ptr handle, std::string sid, + std::string name, std::string identity, std::string metadata, + std::unordered_map attributes, + ParticipantKind kind, DisconnectReason reason) + : handle_(handle), sid_(std::move(sid)), name_(std::move(name)), + identity_(std::move(identity)), metadata_(std::move(metadata)), + attributes_(std::move(attributes)), kind_(kind), reason_(reason) {} + + // Plain getters/setters (caller ensures threading) + const std::string &sid() const noexcept { return sid_; } + const std::string &name() const noexcept { return name_; } + const std::string &identity() const noexcept { return identity_; } + const std::string &metadata() const noexcept { return metadata_; } + const std::unordered_map & + attributes() const noexcept { + return attributes_; + } + ParticipantKind kind() const noexcept { return kind_; } + DisconnectReason disconnectReason() const noexcept { return reason_; } + + uintptr_t ffiHandleId() const noexcept { + if (auto h = handle_.lock()) { + return h->get(); + } + return INVALID_HANDLE; + } + +private: + std::weak_ptr handle_; + std::string sid_, name_, identity_, metadata_; + std::unordered_map attributes_; + ParticipantKind kind_; + DisconnectReason reason_; +}; + +} // namespace livekit \ No newline at end of file diff --git a/include/livekit/room.h b/include/livekit/room.h index b5c68d0..8fe9a34 100644 --- a/include/livekit/room.h +++ b/include/livekit/room.h @@ -1,7 +1,7 @@ /* * Copyright 2023 LiveKit * - * Licensed under the Apache License, Version 2.0 (the “License”); + * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * @@ -17,29 +17,36 @@ #ifndef LIVEKIT_ROOM_H #define LIVEKIT_ROOM_H -#include -#include "ffi.pb.h" #include "livekit/ffi_client.h" -#include "livekit_ffi.h" - -namespace livekit -{ - class Room - { - public: - void Connect(const std::string& url, const std::string& token); - - private: - void OnConnect(const proto::ConnectCallback& cb); - - mutable std::mutex lock_; - FfiHandle handle_{INVALID_HANDLE}; - bool connected_{false}; - uint64_t connectAsyncId_{0}; - - - void OnEvent(const proto::FfiEvent& event); - }; +#include "livekit/ffi_handle.h" +#include "livekit/room_delegate.h" +#include + +namespace livekit { + +class RoomDelegate; +struct RoomInfoData; +namespace proto { +class FfiEvent; } +class Room { +public: + Room() = default; + void setDelegate(RoomDelegate *delegate); + bool Connect(const std::string &url, const std::string &token); + + // Accessors + RoomInfoData room_info() const; + +private: + mutable std::mutex lock_; + bool connected_{false}; + RoomDelegate *delegate_ = nullptr; // Not owned + RoomInfoData room_info_; + + void OnEvent(const proto::FfiEvent &event); +}; +} // namespace livekit + #endif /* LIVEKIT_ROOM_H */ diff --git a/include/livekit/room_delegate.h b/include/livekit/room_delegate.h new file mode 100644 index 0000000..8e5f03b --- /dev/null +++ b/include/livekit/room_delegate.h @@ -0,0 +1,451 @@ +/* + * Copyright 2023 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an “AS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include + +namespace livekit { + +class Room; + +enum class ConnectionQuality { + Poor, + Good, + Excellent, + Lost, +}; + +enum class ConnectionState { + Disconnected, + Connected, + Reconnecting, +}; + +enum class DataPacketKind { + Lossy, + Reliable, +}; + +enum class EncryptionState { + // mirror your proto enum values as needed + Unknown, + On, + Off, +}; + +enum class DisconnectReason { + // mirror your proto DisconnectReason values as needed + Unknown, + ClientInitiated, + ServerInitiated, +}; + +// --------------------------------------------------------- +// Basic data types corresponding to proto messages +// --------------------------------------------------------- + +struct TranscriptionSegmentData { + std::string id; + std::string text; + std::uint64_t start_time = 0; + std::uint64_t end_time = 0; + bool is_final = false; + std::string language; +}; + +struct ChatMessageData { + std::string id; + std::int64_t timestamp = 0; + std::string message; + std::optional edit_timestamp; + bool deleted = false; + bool generated = false; +}; + +struct UserPacketData { + std::vector data; + std::optional topic; // optional +}; + +struct SipDtmfData { + std::uint32_t code = 0; + std::optional digit; +}; + +struct RoomInfoData { + std::optional sid; + std::string name; + std::string metadata; + std::uint64_t lossy_dc_buffered_amount_low_threshold = 0; + std::uint64_t reliable_dc_buffered_amount_low_threshold = 0; + std::uint32_t empty_timeout = 0; + std::uint32_t departure_timeout = 0; + std::uint32_t max_participants = 0; + std::int64_t creation_time = 0; + std::uint32_t num_participants = 0; + std::uint32_t num_publishers = 0; + bool active_recording = false; +}; + +struct AttributeEntry { + std::string key; + std::string value; +}; + +struct DataStreamHeaderData { + std::string stream_id; + std::int64_t timestamp = 0; + std::string mime_type; + std::string topic; + std::optional total_length; + std::map attributes; + + // For content_header + enum class ContentType { + None, + Text, + Byte, + } content_type = ContentType::None; + + // TextHeader fields + enum class OperationType { + Create = 0, + Update = 1, + Delete = 2, + Reaction = 3, + }; + std::optional operation_type; + std::optional version; + std::optional reply_to_stream_id; + std::vector attached_stream_ids; + std::optional generated; + + // ByteHeader fields + std::optional name; +}; + +struct DataStreamChunkData { + std::string stream_id; + std::uint64_t chunk_index = 0; + std::vector content; + std::optional version; + std::vector iv; +}; + +struct DataStreamTrailerData { + std::string stream_id; + std::string reason; + std::map attributes; +}; + +// --------------------------------------------------------- +// Event structs – “public” representations of RoomEvent.* +// --------------------------------------------------------- + +struct ParticipantConnectedEvent { + // Typically you’d also attach a handle / participant object + std::string identity; // from OwnedParticipant / ParticipantInfo + std::string metadata; + std::string name; +}; + +struct ParticipantDisconnectedEvent { + std::string participant_identity; + DisconnectReason reason = DisconnectReason::Unknown; +}; + +struct LocalTrackPublishedEvent { + std::string track_sid; +}; + +struct LocalTrackUnpublishedEvent { + std::string publication_sid; +}; + +struct LocalTrackSubscribedEvent { + std::string track_sid; +}; + +struct TrackPublishedEvent { + std::string participant_identity; + std::string publication_sid; + std::string track_name; + std::string track_kind; // or an enum if you have one + std::string track_source; // or enum +}; + +struct TrackUnpublishedEvent { + std::string participant_identity; + std::string publication_sid; +}; + +struct TrackSubscribedEvent { + std::string participant_identity; + std::string track_sid; + std::string track_name; + std::string track_kind; // or enum + std::string track_source; // or enum +}; + +struct TrackUnsubscribedEvent { + std::string participant_identity; + std::string track_sid; +}; + +struct TrackSubscriptionFailedEvent { + std::string participant_identity; + std::string track_sid; + std::string error; +}; + +struct TrackMutedEvent { + std::string participant_identity; + std::string track_sid; +}; + +struct TrackUnmutedEvent { + std::string participant_identity; + std::string track_sid; +}; + +struct ActiveSpeakersChangedEvent { + std::vector participant_identities; +}; + +struct RoomMetadataChangedEvent { + std::string metadata; +}; + +struct RoomSidChangedEvent { + std::string sid; +}; + +struct ParticipantMetadataChangedEvent { + std::string participant_identity; + std::string metadata; +}; + +struct ParticipantNameChangedEvent { + std::string participant_identity; + std::string name; +}; + +struct ParticipantAttributesChangedEvent { + std::string participant_identity; + std::vector attributes; + std::vector changed_attributes; +}; + +struct ParticipantEncryptionStatusChangedEvent { + std::string participant_identity; + bool is_encrypted = false; +}; + +struct ConnectionQualityChangedEvent { + std::string participant_identity; + ConnectionQuality quality = ConnectionQuality::Good; +}; + +struct DataPacketReceivedEvent { + DataPacketKind kind = DataPacketKind::Reliable; + std::string participant_identity; // may be empty + std::optional user; + std::optional sip_dtmf; +}; + +struct TranscriptionReceivedEvent { + std::optional participant_identity; + std::optional track_sid; + std::vector segments; +}; + +struct ConnectionStateChangedEvent { + ConnectionState state = ConnectionState::Disconnected; +}; + +struct DisconnectedEvent { + DisconnectReason reason = DisconnectReason::Unknown; +}; + +struct ReconnectingEvent {}; +struct ReconnectedEvent {}; + +struct RoomEosEvent {}; + +struct DataStreamHeaderReceivedEvent { + std::string participant_identity; + DataStreamHeaderData header; +}; + +struct DataStreamChunkReceivedEvent { + std::string participant_identity; + DataStreamChunkData chunk; +}; + +struct DataStreamTrailerReceivedEvent { + std::string participant_identity; + DataStreamTrailerData trailer; +}; + +struct DataChannelBufferedAmountLowThresholdChangedEvent { + DataPacketKind kind = DataPacketKind::Reliable; + std::uint64_t threshold = 0; +}; + +struct ByteStreamOpenedEvent { + std::uint64_t reader_handle = 0; // from OwnedByteStreamReader.handle + std::string participant_identity; +}; + +struct TextStreamOpenedEvent { + std::uint64_t reader_handle = 0; // from OwnedTextStreamReader.handle + std::string participant_identity; +}; + +struct RoomUpdatedEvent { + RoomInfoData info; +}; + +struct RoomMovedEvent { + RoomInfoData info; +}; + +struct ParticipantsUpdatedEvent { + // You can expand this into a richer participant struct later + std::vector participant_identities; +}; + +struct E2eeStateChangedEvent { + std::string participant_identity; + EncryptionState state = EncryptionState::Unknown; +}; + +struct ChatMessageReceivedEvent { + ChatMessageData message; + std::string participant_identity; +}; + +// --------------------------------------------------------- +// RoomDelegate interface – NO protobuf dependency +// --------------------------------------------------------- + +class RoomDelegate { +public: + virtual ~RoomDelegate() = default; + + // Optional: generic hook with no payload + virtual void onRoomEvent(Room & /*room*/) {} + + // Per-event callbacks. All default no-op so you can add more later + // without breaking existing user code. + + // Participant lifecycle + virtual void onParticipantConnected(Room &, + const ParticipantConnectedEvent &) {} + virtual void onParticipantDisconnected(Room &, + const ParticipantDisconnectedEvent &) { + } + + // Local track publication + virtual void onLocalTrackPublished(Room &, const LocalTrackPublishedEvent &) { + } + virtual void onLocalTrackUnpublished(Room &, + const LocalTrackUnpublishedEvent &) {} + virtual void onLocalTrackSubscribed(Room &, + const LocalTrackSubscribedEvent &) {} + + // Remote track publication/subscription + virtual void onTrackPublished(Room &, const TrackPublishedEvent &) {} + virtual void onTrackUnpublished(Room &, const TrackUnpublishedEvent &) {} + virtual void onTrackSubscribed(Room &, const TrackSubscribedEvent &) {} + virtual void onTrackUnsubscribed(Room &, const TrackUnsubscribedEvent &) {} + virtual void onTrackSubscriptionFailed(Room &, + const TrackSubscriptionFailedEvent &) { + } + virtual void onTrackMuted(Room &, const TrackMutedEvent &) {} + virtual void onTrackUnmuted(Room &, const TrackUnmutedEvent &) {} + + // Active speakers + virtual void onActiveSpeakersChanged(Room &, + const ActiveSpeakersChangedEvent &) {} + + // Room info / metadata + virtual void onRoomMetadataChanged(Room &, const RoomMetadataChangedEvent &) { + } + virtual void onRoomSidChanged(Room &, const RoomSidChangedEvent &) {} + virtual void onRoomUpdated(Room &, const RoomUpdatedEvent &) {} + virtual void onRoomMoved(Room &, const RoomMovedEvent &) {} + + // Participant info changes + virtual void + onParticipantMetadataChanged(Room &, + const ParticipantMetadataChangedEvent &) {} + virtual void onParticipantNameChanged(Room &, + const ParticipantNameChangedEvent &) {} + virtual void + onParticipantAttributesChanged(Room &, + const ParticipantAttributesChangedEvent &) {} + virtual void onParticipantEncryptionStatusChanged( + Room &, const ParticipantEncryptionStatusChangedEvent &) {} + + // Connection quality / state + virtual void + onConnectionQualityChanged(Room &, const ConnectionQualityChangedEvent &) {} + virtual void onConnectionStateChanged(Room &, + const ConnectionStateChangedEvent &) {} + virtual void onDisconnected(Room &, const DisconnectedEvent &) {} + virtual void onReconnecting(Room &, const ReconnectingEvent &) {} + virtual void onReconnected(Room &, const ReconnectedEvent &) {} + + // E2EE + virtual void onE2eeStateChanged(Room &, const E2eeStateChangedEvent &) {} + + // EOS + virtual void onRoomEos(Room &, const RoomEosEvent &) {} + + // Data / transcription / chat + virtual void onDataPacketReceived(Room &, const DataPacketReceivedEvent &) {} + virtual void onTranscriptionReceived(Room &, + const TranscriptionReceivedEvent &) {} + virtual void onChatMessageReceived(Room &, const ChatMessageReceivedEvent &) { + } + + // Data streams + virtual void + onDataStreamHeaderReceived(Room &, const DataStreamHeaderReceivedEvent &) {} + virtual void onDataStreamChunkReceived(Room &, + const DataStreamChunkReceivedEvent &) { + } + virtual void + onDataStreamTrailerReceived(Room &, const DataStreamTrailerReceivedEvent &) {} + virtual void onDataChannelBufferedAmountLowThresholdChanged( + Room &, const DataChannelBufferedAmountLowThresholdChangedEvent &) {} + + // High-level byte/text streams + virtual void onByteStreamOpened(Room &, const ByteStreamOpenedEvent &) {} + virtual void onTextStreamOpened(Room &, const TextStreamOpenedEvent &) {} + + // Participants snapshot + virtual void onParticipantsUpdated(Room &, const ParticipantsUpdatedEvent &) { + } +}; + +} // namespace livekit diff --git a/include/livekit/stats.h b/include/livekit/stats.h new file mode 100644 index 0000000..d05a22f --- /dev/null +++ b/include/livekit/stats.h @@ -0,0 +1,534 @@ +/* + * Copyright 2023 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an “AS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include +#include +#include +#include +#include + +namespace livekit { + +namespace proto { +class RtcStats; +class RtcStatsData; +class CodecStats; +class RtpStreamStats; +class ReceivedRtpStreamStats; +class InboundRtpStreamStats; +class SentRtpStreamStats; +class OutboundRtpStreamStats; +class RemoteInboundRtpStreamStats; +class RemoteOutboundRtpStreamStats; +class MediaSourceStats; +class AudioSourceStats; +class VideoSourceStats; +class AudioPlayoutStats; +class PeerConnectionStats; +class DataChannelStats; +class TransportStats; +class CandidatePairStats; +class IceCandidateStats; +class CertificateStats; +class StreamStats; +} // namespace proto + +// ---------------------- +// SDK enums (decoupled from proto enums) +// ---------------------- + +enum class DataChannelState { + Connecting, + Open, + Closing, + Closed, + Unknown, +}; + +enum class QualityLimitationReason { + None, + Cpu, + Bandwidth, + Other, +}; + +enum class IceRole { + Unknown, + Controlling, + Controlled, +}; + +enum class DtlsTransportState { + New, + Connecting, + Connected, + Closed, + Failed, + Unknown, +}; + +enum class IceTransportState { + New, + Checking, + Connected, + Completed, + Disconnected, + Failed, + Closed, + Unknown, +}; + +enum class DtlsRole { + Client, + Server, + Unknown, +}; + +enum class IceCandidatePairState { + Frozen, + Waiting, + InProgress, + Failed, + Succeeded, + Unknown, +}; + +enum class IceCandidateType { + Host, + Srflx, + Prflx, + Relay, + Unknown, +}; + +enum class IceServerTransportProtocol { + Udp, + Tcp, + Tls, + Unknown, +}; + +enum class IceTcpCandidateType { + Active, + Passive, + So, + Unknown, +}; + +// ---------------------- +// Leaf stats types +// ---------------------- + +struct RtcStatsData { + std::string id; + std::int64_t timestamp_ms; +}; + +struct CodecStats { + std::uint32_t payload_type; + std::string transport_id; + std::string mime_type; + std::uint32_t clock_rate; + std::uint32_t channels; + std::string sdp_fmtp_line; +}; + +struct RtpStreamStats { + std::uint32_t ssrc; + std::string kind; + std::string transport_id; + std::string codec_id; +}; + +struct ReceivedRtpStreamStats { + std::uint64_t packets_received; + std::int64_t packets_lost; + double jitter; +}; + +struct InboundRtpStreamStats { + std::string track_identifier; + std::string mid; + std::string remote_id; + std::uint32_t frames_decoded; + std::uint32_t key_frames_decoded; + std::uint32_t frames_rendered; + std::uint32_t frames_dropped; + std::uint32_t frame_width; + std::uint32_t frame_height; + double frames_per_second; + std::uint64_t qp_sum; + double total_decode_time; + double total_inter_frame_delay; + double total_squared_inter_frame_delay; + std::uint32_t pause_count; + double total_pause_duration; + std::uint32_t freeze_count; + double total_freeze_duration; + double last_packet_received_timestamp; + std::uint64_t header_bytes_received; + std::uint64_t packets_discarded; + std::uint64_t fec_bytes_received; + std::uint64_t fec_packets_received; + std::uint64_t fec_packets_discarded; + std::uint64_t bytes_received; + std::uint32_t nack_count; + std::uint32_t fir_count; + std::uint32_t pli_count; + double total_processing_delay; + double estimated_playout_timestamp; + double jitter_buffer_delay; + double jitter_buffer_target_delay; + std::uint64_t jitter_buffer_emitted_count; + double jitter_buffer_minimum_delay; + std::uint64_t total_samples_received; + std::uint64_t concealed_samples; + std::uint64_t silent_concealed_samples; + std::uint64_t concealment_events; + std::uint64_t inserted_samples_for_deceleration; + std::uint64_t removed_samples_for_acceleration; + double audio_level; + double total_audio_energy; + double total_samples_duration; + std::uint64_t frames_received; + std::string decoder_implementation; + std::string playout_id; + bool power_efficient_decoder; + std::uint64_t frames_assembled_from_multiple_packets; + double total_assembly_time; + std::uint64_t retransmitted_packets_received; + std::uint64_t retransmitted_bytes_received; + std::uint32_t rtx_ssrc; + std::uint32_t fec_ssrc; +}; + +struct SentRtpStreamStats { + std::uint64_t packets_sent; + std::uint64_t bytes_sent; +}; + +struct OutboundRtpStreamStats { + std::string mid; + std::string media_source_id; + std::string remote_id; + std::string rid; + std::uint64_t header_bytes_sent; + std::uint64_t retransmitted_packets_sent; + std::uint64_t retransmitted_bytes_sent; + std::uint32_t rtx_ssrc; + double target_bitrate; + std::uint64_t total_encoded_bytes_target; + std::uint32_t frame_width; + std::uint32_t frame_height; + double frames_per_second; + std::uint32_t frames_sent; + std::uint32_t huge_frames_sent; + std::uint32_t frames_encoded; + std::uint32_t key_frames_encoded; + std::uint64_t qp_sum; + double total_encode_time; + double total_packet_send_delay; + QualityLimitationReason quality_limitation_reason; + std::unordered_map quality_limitation_durations; + std::uint32_t quality_limitation_resolution_changes; + std::uint32_t nack_count; + std::uint32_t fir_count; + std::uint32_t pli_count; + std::string encoder_implementation; + bool power_efficient_encoder; + bool active; + std::string scalability_mode; +}; + +struct RemoteInboundRtpStreamStats { + std::string local_id; + double round_trip_time; + double total_round_trip_time; + double fraction_lost; + std::uint64_t round_trip_time_measurements; +}; + +struct RemoteOutboundRtpStreamStats { + std::string local_id; + double remote_timestamp; + std::uint64_t reports_sent; + double round_trip_time; + double total_round_trip_time; + std::uint64_t round_trip_time_measurements; +}; + +struct MediaSourceStats { + std::string track_identifier; + std::string kind; +}; + +struct AudioSourceStats { + double audio_level; + double total_audio_energy; + double total_samples_duration; + double echo_return_loss; + double echo_return_loss_enhancement; + double dropped_samples_duration; + std::uint32_t dropped_samples_events; + double total_capture_delay; + std::uint64_t total_samples_captured; +}; + +struct VideoSourceStats { + std::uint32_t width; + std::uint32_t height; + std::uint32_t frames; + double frames_per_second; +}; + +struct AudioPlayoutStats { + std::string kind; + double synthesized_samples_duration; + std::uint32_t synthesized_samples_events; + double total_samples_duration; + double total_playout_delay; + std::uint64_t total_samples_count; +}; + +struct PeerConnectionStats { + std::uint32_t data_channels_opened; + std::uint32_t data_channels_closed; +}; + +struct DataChannelStats { + std::string label; + std::string protocol; + std::int32_t data_channel_identifier; + std::optional state; + std::uint32_t messages_sent; + std::uint64_t bytes_sent; + std::uint32_t messages_received; + std::uint64_t bytes_received; +}; + +struct TransportStats { + std::uint64_t packets_sent; + std::uint64_t packets_received; + std::uint64_t bytes_sent; + std::uint64_t bytes_received; + IceRole ice_role; + std::string ice_local_username_fragment; + std::optional dtls_state; + std::optional ice_state; + std::string selected_candidate_pair_id; + std::string local_certificate_id; + std::string remote_certificate_id; + std::string tls_version; + std::string dtls_cipher; + DtlsRole dtls_role; + std::string srtp_cipher; + std::uint32_t selected_candidate_pair_changes; +}; + +struct CandidatePairStats { + std::string transport_id; + std::string local_candidate_id; + std::string remote_candidate_id; + std::optional state; + bool nominated; + std::uint64_t packets_sent; + std::uint64_t packets_received; + std::uint64_t bytes_sent; + std::uint64_t bytes_received; + double last_packet_sent_timestamp; + double last_packet_received_timestamp; + double total_round_trip_time; + double current_round_trip_time; + double available_outgoing_bitrate; + double available_incoming_bitrate; + std::uint64_t requests_received; + std::uint64_t requests_sent; + std::uint64_t responses_received; + std::uint64_t responses_sent; + std::uint64_t consent_requests_sent; + std::uint32_t packets_discarded_on_send; + std::uint64_t bytes_discarded_on_send; +}; + +struct IceCandidateStats { + std::string transport_id; + std::string address; + std::int32_t port; + std::string protocol; + std::optional candidate_type; + std::int32_t priority; + std::string url; + std::optional relay_protocol; + std::string foundation; + std::string related_address; + std::int32_t related_port; + std::string username_fragment; + std::optional tcp_type; +}; + +struct CertificateStats { + std::string fingerprint; + std::string fingerprint_algorithm; + std::string base64_certificate; + std::string issuer_certificate_id; +}; + +struct StreamStats { + std::string id; + std::string stream_identifier; +}; + +// ---------------------- +// High-level RtcStats wrapper +// ---------------------- + +struct RtcCodecStats { + RtcStatsData rtc; + CodecStats codec; +}; + +struct RtcInboundRtpStats { + RtcStatsData rtc; + RtpStreamStats stream; + ReceivedRtpStreamStats received; + InboundRtpStreamStats inbound; +}; + +struct RtcOutboundRtpStats { + RtcStatsData rtc; + RtpStreamStats stream; + SentRtpStreamStats sent; + OutboundRtpStreamStats outbound; +}; + +struct RtcRemoteInboundRtpStats { + RtcStatsData rtc; + RtpStreamStats stream; + ReceivedRtpStreamStats received; + RemoteInboundRtpStreamStats remote_inbound; +}; + +struct RtcRemoteOutboundRtpStats { + RtcStatsData rtc; + RtpStreamStats stream; + SentRtpStreamStats sent; + RemoteOutboundRtpStreamStats remote_outbound; +}; + +struct RtcMediaSourceStats { + RtcStatsData rtc; + MediaSourceStats source; + AudioSourceStats audio; + VideoSourceStats video; +}; + +struct RtcMediaPlayoutStats { + RtcStatsData rtc; + AudioPlayoutStats audio_playout; +}; + +struct RtcPeerConnectionStats { + RtcStatsData rtc; + PeerConnectionStats pc; +}; + +struct RtcDataChannelStats { + RtcStatsData rtc; + DataChannelStats dc; +}; + +struct RtcTransportStats { + RtcStatsData rtc; + TransportStats transport; +}; + +struct RtcCandidatePairStats { + RtcStatsData rtc; + CandidatePairStats candidate_pair; +}; + +struct RtcLocalCandidateStats { + RtcStatsData rtc; + IceCandidateStats candidate; +}; + +struct RtcRemoteCandidateStats { + RtcStatsData rtc; + IceCandidateStats candidate; +}; + +struct RtcCertificateStats { + RtcStatsData rtc; + CertificateStats certificate; +}; + +struct RtcStreamStats { + RtcStatsData rtc; + StreamStats stream; +}; + +// Deprecated Track omitted on purpose. + +using RtcStatsVariant = + std::variant; + +struct RtcStats { + RtcStatsVariant stats; +}; + +// ---------------------- +// fromProto declarations +// ---------------------- + +RtcStatsData fromProto(const proto::RtcStatsData &); + +CodecStats fromProto(const proto::CodecStats &); +RtpStreamStats fromProto(const proto::RtpStreamStats &); +ReceivedRtpStreamStats fromProto(const proto::ReceivedRtpStreamStats &); +InboundRtpStreamStats fromProto(const proto::InboundRtpStreamStats &); +SentRtpStreamStats fromProto(const proto::SentRtpStreamStats &); +OutboundRtpStreamStats fromProto(const proto::OutboundRtpStreamStats &); +RemoteInboundRtpStreamStats +fromProto(const proto::RemoteInboundRtpStreamStats &); +RemoteOutboundRtpStreamStats +fromProto(const proto::RemoteOutboundRtpStreamStats &); +MediaSourceStats fromProto(const proto::MediaSourceStats &); +AudioSourceStats fromProto(const proto::AudioSourceStats &); +VideoSourceStats fromProto(const proto::VideoSourceStats &); +AudioPlayoutStats fromProto(const proto::AudioPlayoutStats &); +PeerConnectionStats fromProto(const proto::PeerConnectionStats &); +DataChannelStats fromProto(const proto::DataChannelStats &); +TransportStats fromProto(const proto::TransportStats &); +CandidatePairStats fromProto(const proto::CandidatePairStats &); +IceCandidateStats fromProto(const proto::IceCandidateStats &); +CertificateStats fromProto(const proto::CertificateStats &); +StreamStats fromProto(const proto::StreamStats &); + +// High-level: +RtcStats fromProto(const proto::RtcStats &); + +// helper if you have repeated RtcStats in proto: +std::vector fromProto(const std::vector &); + +} // namespace livekit diff --git a/include/livekit/track.h b/include/livekit/track.h new file mode 100644 index 0000000..ae5fd0c --- /dev/null +++ b/include/livekit/track.h @@ -0,0 +1,129 @@ +/* + * Copyright 2023 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an “AS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once +#include "livekit/ffi_handle.h" +#include "livekit/stats.h" +#include +#include +#include +#include +#include +#include +#include + +namespace livekit { + +// ----- Enums from track.proto ----- +enum class TrackKind { + KIND_UNKNOWN = 0, + KIND_AUDIO = 1, + KIND_VIDEO = 2, +}; + +enum class TrackSource { + SOURCE_UNKNOWN = 0, + SOURCE_CAMERA = 1, + SOURCE_MICROPHONE = 2, + SOURCE_SCREENSHARE = 3, + SOURCE_SCREENSHARE_AUDIO = 4, +}; + +enum class StreamState { + STATE_UNKNOWN = 0, + STATE_ACTIVE = 1, + STATE_PAUSED = 2, +}; + +enum class AudioTrackFeature { + TF_STEREO = 0, + TF_NO_DTX = 1, + TF_AUTO_GAIN_CONTROL = 2, + TF_ECHO_CANCELLATION = 3, + TF_NOISE_SUPPRESSION = 4, + TF_ENHANCED_NOISE_CANCELLATION = 5, + TF_PRECONNECT_BUFFER = 6, +}; + +// ============================================================ +// Base Track +// ============================================================ +class Track { +public: + virtual ~Track() = default; + + // Read-only properties + const std::string &sid() const noexcept { return sid_; } + const std::string &name() const noexcept { return name_; } + TrackKind kind() const noexcept { return kind_; } + StreamState stream_state() const noexcept { return state_; } + bool muted() const noexcept { return muted_; } + bool remote() const noexcept { return remote_; } + + // Optional publication info + std::optional source() const noexcept { return source_; } + std::optional simulcasted() const noexcept { return simulcasted_; } + std::optional width() const noexcept { return width_; } + std::optional height() const noexcept { return height_; } + std::optional mime_type() const noexcept { return mime_type_; } + + // Handle access + bool has_handle() const noexcept { return !handle_.expired(); } + uintptr_t ffi_handle_id() const noexcept { + if (auto h = handle_.lock()) + return h->get(); + return 0; + } + std::shared_ptr lock_handle() const noexcept { + return handle_.lock(); + } + + // Async get stats + std::future> getStats() const; + + // Internal updates (called by Room) + void setStreamState(StreamState s) noexcept { state_ = s; } + void setMuted(bool m) noexcept { muted_ = m; } + void setName(std::string n) noexcept { name_ = std::move(n); } + +protected: + Track(std::weak_ptr handle, std::string sid, std::string name, + TrackKind kind, StreamState state, bool muted, bool remote); + + void setPublicationFields(std::optional source, + std::optional simulcasted, + std::optional width, + std::optional height, + std::optional mime_type); + +private: + std::weak_ptr handle_; // non-owning + + std::string sid_; + std::string name_; + TrackKind kind_{TrackKind::KIND_UNKNOWN}; + StreamState state_{StreamState::STATE_UNKNOWN}; + bool muted_{false}; + bool remote_{false}; + + std::optional source_; + std::optional simulcasted_; + std::optional width_; + std::optional height_; + std::optional mime_type_; +}; + +} // namespace livekit \ No newline at end of file diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 4cef546..d09d048 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -1,7 +1,7 @@ /* * Copyright 2023 LiveKit * - * Licensed under the Apache License, Version 2.0 (the “License”); + * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * @@ -16,84 +16,201 @@ #include -#include "livekit/ffi_client.h" +#include "build.h" #include "ffi.pb.h" +#include "livekit/ffi_client.h" +#include "livekit/ffi_handle.h" #include "livekit_ffi.h" -namespace livekit -{ +namespace livekit { FfiClient::FfiClient() { - livekit_ffi_initialize(&LivekitFfiCallback, - true, - "cpp", - "0.0.0-dev"); + livekit_ffi_initialize(&LivekitFfiCallback, true, LIVEKIT_BUILD_FLAVOR, + LIVEKIT_BUILD_VERSION_FULL); } -FfiClient::ListenerId FfiClient::AddListener(const FfiClient::Listener& listener) { - std::lock_guard guard(lock_); - FfiClient::ListenerId id = nextListenerId++; - listeners_[id] = listener; - return id; +void FfiClient::shutdown() noexcept { livekit_ffi_dispose(); } + +FfiClient::ListenerId +FfiClient::AddListener(const FfiClient::Listener &listener) { + std::lock_guard guard(lock_); + FfiClient::ListenerId id = nextListenerId++; + listeners_[id] = listener; + return id; } void FfiClient::RemoveListener(ListenerId id) { - std::lock_guard guard(lock_); - listeners_.erase(id); + std::lock_guard guard(lock_); + listeners_.erase(id); } -proto::FfiResponse FfiClient::SendRequest(const proto::FfiRequest &request) const { - std::string bytes; - if (!request.SerializeToString(&bytes) || bytes.empty()) { - throw std::runtime_error("failed to serialize FfiRequest"); - } - const uint8_t* resp_ptr = nullptr; - size_t resp_len = 0; - FfiHandleId handle = livekit_ffi_request( - reinterpret_cast(bytes.data()), - bytes.size(), &resp_ptr, &resp_len); - std::cout << "receive a handle " << handle << std::endl; - - if (handle == INVALID_HANDLE) { - throw std::runtime_error("failed to send request, received an invalid handle"); - } +proto::FfiResponse +FfiClient::SendRequest(const proto::FfiRequest &request) const { + std::string bytes; + if (!request.SerializeToString(&bytes) || bytes.empty()) { + throw std::runtime_error("failed to serialize FfiRequest"); + } + const uint8_t *resp_ptr = nullptr; + size_t resp_len = 0; + FfiHandleId handle = + livekit_ffi_request(reinterpret_cast(bytes.data()), + bytes.size(), &resp_ptr, &resp_len); + std::cout << "receive a handle " << handle << std::endl; + + if (handle == INVALID_HANDLE) { + throw std::runtime_error( + "failed to send request, received an invalid handle"); + } + + // Ensure we drop the handle exactly once on all paths + FfiHandle handle_guard(static_cast(handle)); + if (!resp_ptr || resp_len == 0) { + throw std::runtime_error("FFI returned empty response bytes"); + } + + proto::FfiResponse response; + if (!response.ParseFromArray(resp_ptr, static_cast(resp_len))) { + throw std::runtime_error("failed to parse FfiResponse"); + } + return response; +} - // Ensure we drop the handle exactly once on all paths - FfiHandle handle_guard(static_cast(handle)); - if (!resp_ptr || resp_len == 0) { - throw std::runtime_error("FFI returned empty response bytes"); +void FfiClient::PushEvent(const proto::FfiEvent &event) const { + std::vector> to_complete; + { + std::lock_guard guard(lock_); + for (auto it = pending_.begin(); it != pending_.end();) { + if ((*it)->matches(event)) { + to_complete.push_back(std::move(*it)); + it = pending_.erase(it); + } else { + ++it; + } } + } - proto::FfiResponse response; - if (!response.ParseFromArray(resp_ptr, static_cast(resp_len))) { - throw std::runtime_error("failed to parse FfiResponse"); - } - return response; -} + // Run handlers outside lock + for (auto &p : to_complete) { + p->complete(event); + } -void FfiClient::PushEvent(const proto::FfiEvent &event) const { - // Dispatch the events to the internal listeners + // Notify listeners. Note, we copy the listeners here to avoid calling into + // the listeners under the lock, which could potentially cause deadlock. + std::vector listeners_copy; + { std::lock_guard guard(lock_); - for (auto& [_, listener] : listeners_) { - listener(event); + listeners_copy.reserve(listeners_.size()); + for (auto &[_, listener] : listeners_) { + listeners_copy.push_back(listener); } + } + for (auto &listener : listeners_copy) { + listener(event); + } } void LivekitFfiCallback(const uint8_t *buf, size_t len) { - proto::FfiEvent event; - event.ParseFromArray(buf, len); + proto::FfiEvent event; + event.ParseFromArray(buf, len); - FfiClient::getInstance().PushEvent(event); + FfiClient::instance().PushEvent(event); } -// FfiHandle - -FfiHandle::FfiHandle(uintptr_t id) : handle(id) {} +template +std::future FfiClient::registerAsync( + std::function match, + std::function &)> handler) { + auto pending = std::make_unique>(); + auto fut = pending->promise.get_future(); + pending->match = std::move(match); + pending->handler = std::move(handler); + { + std::lock_guard guard(lock_); + pending_.push_back(std::move(pending)); + } + return fut; +} -FfiHandle::~FfiHandle() { - if (handle != INVALID_HANDLE) { - livekit_ffi_drop_handle(handle); - } +// Room APIs Implementation +std::future +FfiClient::connectAsync(const std::string &url, const std::string &token) { + + livekit::proto::FfiRequest req; + auto *connect = req.mutable_connect(); + connect->set_url(url); + connect->set_token(token); + connect->mutable_options()->set_auto_subscribe(true); + + livekit::proto::FfiResponse resp = SendRequest(req); + if (!resp.has_connect()) { + throw std::runtime_error("FfiResponse missing connect"); + } + + const AsyncId async_id = resp.connect().async_id(); + + // Now we register an async op that completes with RoomInfo + return registerAsync( + // match lambda: is this the connect event with our async_id? + [async_id](const livekit::proto::FfiEvent &event) { + return event.has_connect() && event.connect().async_id() == async_id; + }, + // handler lambda: fill the promise with RoomInfo or an exception + [](const livekit::proto::FfiEvent &event, + std::promise &pr) { + const auto &ce = event.connect(); + + if (!ce.error().empty()) { + pr.set_exception( + std::make_exception_ptr(std::runtime_error(ce.error()))); + return; + } + + // ce.result().room().info() is a const ref, so we copy it + livekit::proto::RoomInfo info = ce.result().room().info(); + pr.set_value(std::move(info)); + }); } +// Track APIs Implementation +std::future> +FfiClient::getTrackStatsAsync(uintptr_t track_handle) { + proto::FfiRequest req; + auto *get_stats_req = req.mutable_get_stats(); + get_stats_req->set_track_handle(track_handle); + proto::FfiResponse resp = SendRequest(req); + if (!resp.has_get_stats()) { + throw std::runtime_error("FfiResponse missing get_stats"); + } + + const AsyncId async_id = resp.get_stats().async_id(); + + // Register pending op: + // - match: event.has_get_stats() && ids equal + // - handler: convert proto stats to C++ wrapper + fulfill promise + return registerAsync>( + // match + [async_id](const proto::FfiEvent &event) { + return event.has_get_stats() && + event.get_stats().async_id() == async_id; + }, + // handler + [](const proto::FfiEvent &event, + std::promise> &pr) { + const auto &gs = event.get_stats(); + + if (!gs.error().empty()) { + pr.set_exception( + std::make_exception_ptr(std::runtime_error(gs.error()))); + return; + } + + std::vector stats_vec; + stats_vec.reserve(gs.stats_size()); + for (const auto &ps : gs.stats()) { + stats_vec.push_back(fromProto(ps)); + } + pr.set_value(std::move(stats_vec)); + }); } + +} // namespace livekit diff --git a/src/ffi_handle.cpp b/src/ffi_handle.cpp new file mode 100644 index 0000000..d6c61cd --- /dev/null +++ b/src/ffi_handle.cpp @@ -0,0 +1,52 @@ +/* + * Copyright 2023 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an “AS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "livekit/ffi_handle.h" +#include "livekit_ffi.h" + +namespace livekit { + +FfiHandle::FfiHandle(uintptr_t h) noexcept : handle_(h) {} + +FfiHandle::~FfiHandle() { reset(); } + +FfiHandle::FfiHandle(FfiHandle &&other) noexcept : handle_(other.release()) {} + +FfiHandle &FfiHandle::operator=(FfiHandle &&other) noexcept { + if (this != &other) { + reset(other.release()); + } + return *this; +} + +void FfiHandle::reset(uintptr_t new_handle) noexcept { + if (handle_) { + livekit_ffi_drop_handle(handle_); + } + handle_ = new_handle; +} + +uintptr_t FfiHandle::release() noexcept { + uintptr_t old = handle_; + handle_ = 0; + return old; +} + +bool FfiHandle::valid() const noexcept { return handle_ != 0; } + +uintptr_t FfiHandle::get() const noexcept { return handle_; } + +} // namespace livekit diff --git a/src/room.cpp b/src/room.cpp index fbf66b7..e33109e 100644 --- a/src/room.cpp +++ b/src/room.cpp @@ -1,7 +1,7 @@ /* * Copyright 2023 LiveKit * - * Licensed under the Apache License, Version 2.0 (the “License”); + * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * @@ -15,93 +15,289 @@ */ #include "livekit/room.h" + #include "livekit/ffi_client.h" +#include "livekit/room_delegate.h" #include "ffi.pb.h" #include "room.pb.h" +#include "room_event_converter.h" #include #include -namespace livekit -{ +namespace livekit { +using proto::ConnectCallback; +using proto::ConnectRequest; +using proto::FfiEvent; using proto::FfiRequest; using proto::FfiResponse; -using proto::ConnectRequest; using proto::RoomOptions; -using proto::ConnectCallback; -using proto::FfiEvent; -void Room::Connect(const std::string& url, const std::string& token) { - // Register listener first (outside Room lock to avoid lock inversion) - auto listenerId = FfiClient::getInstance().AddListener( - std::bind(&Room::OnEvent, this, std::placeholders::_1)); - - // Build request without heap allocs - livekit::proto::FfiRequest req; - auto* connect = req.mutable_connect(); - connect->set_url(url); - connect->set_token(token); - connect->mutable_options()->set_auto_subscribe(true); +void Room::setDelegate(RoomDelegate *delegate) { + std::lock_guard g(lock_); + delegate_ = delegate; +} - // Mark “connecting” under lock, but DO NOT keep the lock across SendRequest - { - std::lock_guard g(lock_); - if (connected_) { - FfiClient::getInstance().RemoveListener(listenerId); - throw std::runtime_error("already connected"); - } - connectAsyncId_ = listenerId; +bool Room::Connect(const std::string &url, const std::string &token) { + auto listenerId = FfiClient::instance().AddListener( + std::bind(&Room::OnEvent, this, std::placeholders::_1)); + { + std::lock_guard g(lock_); + if (connected_) { + FfiClient::instance().RemoveListener(listenerId); + throw std::runtime_error("already connected"); } - - // Call into FFI with no Room lock held (avoid re-entrancy deadlock) - livekit::proto::FfiResponse resp = FfiClient::getInstance().SendRequest(req); - // Store async id under lock + } + auto fut = FfiClient::instance().connectAsync(url, token); + try { + auto info = fut.get(); // fut will throw if it fails to connect to the room { - std::lock_guard g(lock_); - connectAsyncId_ = resp.connect().async_id(); + std::lock_guard g(lock_); + connected_ = true; + room_info_ = fromProto(info); } + return true; + } catch (const std::exception &e) { + // On error, remove the listener and rethrow + FfiClient::instance().RemoveListener(listenerId); + std::cerr << "Room::Connect failed: " << e.what() << std::endl; + return false; + } } -void Room::OnEvent(const FfiEvent& event) { - // TODO, it is not a good idea to lock all the callbacks, improve it. +RoomInfoData Room::room_info() const { + std::lock_guard g(lock_); + return room_info_; +} + +void Room::OnEvent(const FfiEvent &event) { + // Take a snapshot of the delegate under lock, but do NOT call it under the + // lock. + RoomDelegate *delegate_snapshot = nullptr; + + { std::lock_guard guard(lock_); - switch (event.message_case()) { - case FfiEvent::kConnect: - OnConnect(event.connect()); - break; + delegate_snapshot = delegate_; + // If you want, you can also update internal state here (participants, room + // info, etc.). + } - // TODO: Handle other FfiEvent types here (e.g. room_event, track_event, etc.) - default: - break; - } -} + if (!delegate_snapshot) { + return; + } -void Room::OnConnect(const ConnectCallback& cb) { - // Match the async_id with the pending connectAsyncId_ - if (cb.async_id() != connectAsyncId_) { - return; - } + switch (event.message_case()) { + case FfiEvent::kRoomEvent: { + const proto::RoomEvent &re = event.room_event(); - std::cout << "Received ConnectCallback" << std::endl; + // Optional generic hook + delegate_snapshot->onRoomEvent(*this); - if (cb.message_case() == ConnectCallback::kError) { - std::cerr << "Failed to connect to room: " << cb.error() << std::endl; - connected_ = false; - return; + switch (re.message_case()) { + case proto::RoomEvent::kParticipantConnected: { + auto ev = fromProto(re.participant_connected()); + delegate_snapshot->onParticipantConnected(*this, ev); + break; + } + case proto::RoomEvent::kParticipantDisconnected: { + auto ev = fromProto(re.participant_disconnected()); + delegate_snapshot->onParticipantDisconnected(*this, ev); + break; + } + case proto::RoomEvent::kLocalTrackPublished: { + auto ev = fromProto(re.local_track_published()); + delegate_snapshot->onLocalTrackPublished(*this, ev); + break; + } + case proto::RoomEvent::kLocalTrackUnpublished: { + auto ev = fromProto(re.local_track_unpublished()); + delegate_snapshot->onLocalTrackUnpublished(*this, ev); + break; + } + case proto::RoomEvent::kLocalTrackSubscribed: { + auto ev = fromProto(re.local_track_subscribed()); + delegate_snapshot->onLocalTrackSubscribed(*this, ev); + break; + } + case proto::RoomEvent::kTrackPublished: { + auto ev = fromProto(re.track_published()); + delegate_snapshot->onTrackPublished(*this, ev); + break; + } + case proto::RoomEvent::kTrackUnpublished: { + auto ev = fromProto(re.track_unpublished()); + delegate_snapshot->onTrackUnpublished(*this, ev); + break; + } + case proto::RoomEvent::kTrackSubscribed: { + auto ev = fromProto(re.track_subscribed()); + delegate_snapshot->onTrackSubscribed(*this, ev); + break; + } + case proto::RoomEvent::kTrackUnsubscribed: { + auto ev = fromProto(re.track_unsubscribed()); + delegate_snapshot->onTrackUnsubscribed(*this, ev); + break; + } + case proto::RoomEvent::kTrackSubscriptionFailed: { + auto ev = fromProto(re.track_subscription_failed()); + delegate_snapshot->onTrackSubscriptionFailed(*this, ev); + break; + } + case proto::RoomEvent::kTrackMuted: { + auto ev = fromProto(re.track_muted()); + delegate_snapshot->onTrackMuted(*this, ev); + break; + } + case proto::RoomEvent::kTrackUnmuted: { + auto ev = fromProto(re.track_unmuted()); + delegate_snapshot->onTrackUnmuted(*this, ev); + break; + } + case proto::RoomEvent::kActiveSpeakersChanged: { + auto ev = fromProto(re.active_speakers_changed()); + delegate_snapshot->onActiveSpeakersChanged(*this, ev); + break; + } + case proto::RoomEvent::kRoomMetadataChanged: { + auto ev = fromProto(re.room_metadata_changed()); + delegate_snapshot->onRoomMetadataChanged(*this, ev); + break; + } + case proto::RoomEvent::kRoomSidChanged: { + auto ev = fromProto(re.room_sid_changed()); + delegate_snapshot->onRoomSidChanged(*this, ev); + break; + } + case proto::RoomEvent::kParticipantMetadataChanged: { + auto ev = fromProto(re.participant_metadata_changed()); + delegate_snapshot->onParticipantMetadataChanged(*this, ev); + break; + } + case proto::RoomEvent::kParticipantNameChanged: { + auto ev = fromProto(re.participant_name_changed()); + delegate_snapshot->onParticipantNameChanged(*this, ev); + break; + } + case proto::RoomEvent::kParticipantAttributesChanged: { + auto ev = fromProto(re.participant_attributes_changed()); + delegate_snapshot->onParticipantAttributesChanged(*this, ev); + break; + } + case proto::RoomEvent::kParticipantEncryptionStatusChanged: { + auto ev = fromProto(re.participant_encryption_status_changed()); + delegate_snapshot->onParticipantEncryptionStatusChanged(*this, ev); + break; + } + case proto::RoomEvent::kConnectionQualityChanged: { + auto ev = fromProto(re.connection_quality_changed()); + delegate_snapshot->onConnectionQualityChanged(*this, ev); + break; + } + case proto::RoomEvent::kConnectionStateChanged: { + auto ev = fromProto(re.connection_state_changed()); + delegate_snapshot->onConnectionStateChanged(*this, ev); + break; + } + case proto::RoomEvent::kDisconnected: { + auto ev = fromProto(re.disconnected()); + delegate_snapshot->onDisconnected(*this, ev); + break; + } + case proto::RoomEvent::kReconnecting: { + auto ev = fromProto(re.reconnecting()); + delegate_snapshot->onReconnecting(*this, ev); + break; + } + case proto::RoomEvent::kReconnected: { + auto ev = fromProto(re.reconnected()); + delegate_snapshot->onReconnected(*this, ev); + break; + } + case proto::RoomEvent::kE2EeStateChanged: { + auto ev = fromProto(re.e2ee_state_changed()); + delegate_snapshot->onE2eeStateChanged(*this, ev); + break; + } + case proto::RoomEvent::kEos: { + auto ev = fromProto(re.eos()); + delegate_snapshot->onRoomEos(*this, ev); + break; + } + case proto::RoomEvent::kDataPacketReceived: { + auto ev = fromProto(re.data_packet_received()); + delegate_snapshot->onDataPacketReceived(*this, ev); + break; + } + case proto::RoomEvent::kTranscriptionReceived: { + auto ev = fromProto(re.transcription_received()); + delegate_snapshot->onTranscriptionReceived(*this, ev); + break; + } + case proto::RoomEvent::kChatMessage: { + auto ev = fromProto(re.chat_message()); + delegate_snapshot->onChatMessageReceived(*this, ev); + break; + } + case proto::RoomEvent::kStreamHeaderReceived: { + auto ev = fromProto(re.stream_header_received()); + delegate_snapshot->onDataStreamHeaderReceived(*this, ev); + break; + } + case proto::RoomEvent::kStreamChunkReceived: { + auto ev = fromProto(re.stream_chunk_received()); + delegate_snapshot->onDataStreamChunkReceived(*this, ev); + break; + } + case proto::RoomEvent::kStreamTrailerReceived: { + auto ev = fromProto(re.stream_trailer_received()); + delegate_snapshot->onDataStreamTrailerReceived(*this, ev); + break; + } + case proto::RoomEvent::kDataChannelLowThresholdChanged: { + auto ev = fromProto(re.data_channel_low_threshold_changed()); + delegate_snapshot->onDataChannelBufferedAmountLowThresholdChanged(*this, + ev); + break; + } + case proto::RoomEvent::kByteStreamOpened: { + auto ev = fromProto(re.byte_stream_opened()); + delegate_snapshot->onByteStreamOpened(*this, ev); + break; + } + case proto::RoomEvent::kTextStreamOpened: { + auto ev = fromProto(re.text_stream_opened()); + delegate_snapshot->onTextStreamOpened(*this, ev); + break; + } + case proto::RoomEvent::kRoomUpdated: { + auto ev = roomUpdatedFromProto(re.room_updated()); + delegate_snapshot->onRoomUpdated(*this, ev); + break; + } + case proto::RoomEvent::kMoved: { + auto ev = roomMovedFromProto(re.moved()); + delegate_snapshot->onRoomMoved(*this, ev); + break; + } + case proto::RoomEvent::kParticipantsUpdated: { + auto ev = fromProto(re.participants_updated()); + delegate_snapshot->onParticipantsUpdated(*this, ev); + break; } - // Success path - const auto& result = cb.result(); - const auto& owned_room = result.room(); - // OwnedRoom { FfiOwnedHandle handle = 1; RoomInfo info = 2; } - handle_ = FfiHandle(static_cast(owned_room.handle().id())); - if (owned_room.info().has_sid()) { - std::cout << "Room SID: " << owned_room.info().sid() << std::endl; + case proto::RoomEvent::MESSAGE_NOT_SET: + default: + break; } - connected_ = true; - std::cout << "Connected to room" << std::endl; -} + break; + } + default: + break; + } } + +} // namespace livekit diff --git a/src/room_event_converter.cpp b/src/room_event_converter.cpp new file mode 100644 index 0000000..651896f --- /dev/null +++ b/src/room_event_converter.cpp @@ -0,0 +1,532 @@ +/* + * Copyright 2023 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an “AS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "room_event_converter.h" + +#include "room.pb.h" + +namespace livekit { + +// --------- enum conversions --------- + +ConnectionQuality toConnectionQuality(proto::ConnectionQuality src) { + switch (src) { + case proto::QUALITY_POOR: + return ConnectionQuality::Poor; + case proto::QUALITY_GOOD: + return ConnectionQuality::Good; + case proto::QUALITY_EXCELLENT: + return ConnectionQuality::Excellent; + case proto::QUALITY_LOST: + return ConnectionQuality::Lost; + default: + return ConnectionQuality::Good; + } +} + +ConnectionState toConnectionState(proto::ConnectionState src) { + switch (src) { + case proto::CONN_DISCONNECTED: + return ConnectionState::Disconnected; + case proto::CONN_CONNECTED: + return ConnectionState::Connected; + case proto::CONN_RECONNECTING: + return ConnectionState::Reconnecting; + default: + return ConnectionState::Disconnected; + } +} + +DataPacketKind toDataPacketKind(proto::DataPacketKind src) { + switch (src) { + case proto::KIND_LOSSY: + return DataPacketKind::Lossy; + case proto::KIND_RELIABLE: + return DataPacketKind::Reliable; + default: + return DataPacketKind::Reliable; + } +} + +EncryptionState toEncryptionState(proto::EncryptionState /*src*/) { + // TODO: fill out once you have the proto::EncryptionState enum + return EncryptionState::Unknown; +} + +DisconnectReason toDisconnectReason(proto::DisconnectReason /*src*/) { + // TODO: map each proto::DisconnectReason to your DisconnectReason enum + return DisconnectReason::Unknown; +} + +// --------- basic helper conversions --------- + +TranscriptionSegmentData fromProto(const proto::TranscriptionSegment &src) { + TranscriptionSegmentData out; + out.id = src.id(); + out.text = src.text(); + out.start_time = src.start_time(); + out.end_time = src.end_time(); + out.is_final = src.final(); + out.language = src.language(); + return out; +} + +ChatMessageData fromProto(const proto::ChatMessage &src) { + ChatMessageData out; + out.id = src.id(); + out.timestamp = src.timestamp(); + out.message = src.message(); + if (src.has_edit_timestamp()) { + out.edit_timestamp = src.edit_timestamp(); + } + if (src.has_deleted()) { + out.deleted = src.deleted(); + } + if (src.has_generated()) { + out.generated = src.generated(); + } + return out; +} + +UserPacketData fromProto(const proto::UserPacket &src) { + UserPacketData out; + // TODO, double check following code is safe + const auto &buf = src.data().data(); + auto ptr = reinterpret_cast(buf.data_ptr()); + auto len = static_cast(buf.data_len()); + out.data.assign(ptr, ptr + len); + if (src.has_topic()) { + out.topic = src.topic(); + } + return out; +} + +SipDtmfData fromProto(const proto::SipDTMF &src) { + SipDtmfData out; + out.code = src.code(); + if (src.has_digit()) { + out.digit = src.digit(); + } + return out; +} + +RoomInfoData fromProto(const proto::RoomInfo &src) { + RoomInfoData out; + if (src.has_sid()) { + out.sid = src.sid(); + } + out.name = src.name(); + out.metadata = src.metadata(); + out.lossy_dc_buffered_amount_low_threshold = + src.lossy_dc_buffered_amount_low_threshold(); + out.reliable_dc_buffered_amount_low_threshold = + src.reliable_dc_buffered_amount_low_threshold(); + out.empty_timeout = src.empty_timeout(); + out.departure_timeout = src.departure_timeout(); + out.max_participants = src.max_participants(); + out.creation_time = src.creation_time(); + out.num_participants = src.num_participants(); + out.num_publishers = src.num_publishers(); + out.active_recording = src.active_recording(); + return out; +} + +AttributeEntry fromProto(const proto::AttributesEntry &src) { + AttributeEntry a; + a.key = src.key(); + a.value = src.value(); + return a; +} + +DataStreamHeaderData fromProto(const proto::DataStream_Header &src) { + DataStreamHeaderData out; + out.stream_id = src.stream_id(); + out.timestamp = src.timestamp(); + out.mime_type = src.mime_type(); + out.topic = src.topic(); + if (src.has_total_length()) { + out.total_length = src.total_length(); + } + for (const auto &kv : src.attributes()) { + out.attributes.emplace(kv.first, kv.second); + } + + // content_header oneof + switch (src.content_header_case()) { + case proto::DataStream_Header::kTextHeader: { + out.content_type = DataStreamHeaderData::ContentType::Text; + const auto &t = src.text_header(); + out.operation_type = + static_cast(t.operation_type()); + if (t.has_version()) { + out.version = t.version(); + } + if (t.has_reply_to_stream_id()) { + out.reply_to_stream_id = t.reply_to_stream_id(); + } + for (const auto &id : t.attached_stream_ids()) { + out.attached_stream_ids.push_back(id); + } + if (t.has_generated()) { + out.generated = t.generated(); + } + break; + } + case proto::DataStream_Header::kByteHeader: { + out.content_type = DataStreamHeaderData::ContentType::Byte; + const auto &b = src.byte_header(); + out.name = b.name(); + break; + } + case proto::DataStream_Header::CONTENT_HEADER_NOT_SET: + default: + out.content_type = DataStreamHeaderData::ContentType::None; + break; + } + + return out; +} + +DataStreamChunkData fromProto(const proto::DataStream_Chunk &src) { + DataStreamChunkData out; + out.stream_id = src.stream_id(); + out.chunk_index = src.chunk_index(); + out.content.assign(src.content().begin(), src.content().end()); + if (src.has_version()) { + out.version = src.version(); + } + if (src.has_iv()) { + out.iv.assign(src.iv().begin(), src.iv().end()); + } + return out; +} + +DataStreamTrailerData fromProto(const proto::DataStream_Trailer &src) { + DataStreamTrailerData out; + out.stream_id = src.stream_id(); + out.reason = src.reason(); + for (const auto &kv : src.attributes()) { + out.attributes.emplace(kv.first, kv.second); + } + return out; +} + +// --------- event conversions --------- + +ParticipantConnectedEvent fromProto(const proto::ParticipantConnected &src) { + ParticipantConnectedEvent ev; + // src.info() is OwnedParticipant; you can fill more fields once you inspect + // it. For now, leave metadata/name/identity as TODO. + // TODO: map src.info().info().identity(), name(), metadata(), etc. + return ev; +} + +ParticipantDisconnectedEvent +fromProto(const proto::ParticipantDisconnected &src) { + ParticipantDisconnectedEvent ev; + ev.participant_identity = src.participant_identity(); + ev.reason = toDisconnectReason(src.disconnect_reason()); + return ev; +} + +LocalTrackPublishedEvent fromProto(const proto::LocalTrackPublished &src) { + LocalTrackPublishedEvent ev; + ev.track_sid = src.track_sid(); + return ev; +} + +LocalTrackUnpublishedEvent fromProto(const proto::LocalTrackUnpublished &src) { + LocalTrackUnpublishedEvent ev; + ev.publication_sid = src.publication_sid(); + return ev; +} + +LocalTrackSubscribedEvent fromProto(const proto::LocalTrackSubscribed &src) { + LocalTrackSubscribedEvent ev; + ev.track_sid = src.track_sid(); + return ev; +} + +TrackPublishedEvent fromProto(const proto::TrackPublished &src) { + TrackPublishedEvent ev; + ev.participant_identity = src.participant_identity(); + // OwnedTrackPublication publication = 2; + // TODO: map publication info once you inspect OwnedTrackPublication + // ev.publication_sid = src.publication().info().sid(); + // ev.track_name = src.publication().info().name(); + // ev.track_kind = ...; + // ev.track_source = ...; + return ev; +} + +TrackUnpublishedEvent fromProto(const proto::TrackUnpublished &src) { + TrackUnpublishedEvent ev; + ev.participant_identity = src.participant_identity(); + ev.publication_sid = src.publication_sid(); + return ev; +} + +TrackSubscribedEvent fromProto(const proto::TrackSubscribed &src) { + TrackSubscribedEvent ev; + ev.participant_identity = src.participant_identity(); + // OwnedTrack track = 2; + // TODO: map track info once you inspect OwnedTrack + // ev.track_sid = src.track().info().sid(); + // ev.track_name = src.track().info().name(); + // ev.track_kind = ...; + // ev.track_source = ...; + return ev; +} + +TrackUnsubscribedEvent fromProto(const proto::TrackUnsubscribed &src) { + TrackUnsubscribedEvent ev; + ev.participant_identity = src.participant_identity(); + ev.track_sid = src.track_sid(); + return ev; +} + +TrackSubscriptionFailedEvent +fromProto(const proto::TrackSubscriptionFailed &src) { + TrackSubscriptionFailedEvent ev; + ev.participant_identity = src.participant_identity(); + ev.track_sid = src.track_sid(); + ev.error = src.error(); + return ev; +} + +TrackMutedEvent fromProto(const proto::TrackMuted &src) { + TrackMutedEvent ev; + ev.participant_identity = src.participant_identity(); + ev.track_sid = src.track_sid(); + return ev; +} + +TrackUnmutedEvent fromProto(const proto::TrackUnmuted &src) { + TrackUnmutedEvent ev; + ev.participant_identity = src.participant_identity(); + ev.track_sid = src.track_sid(); + return ev; +} + +ActiveSpeakersChangedEvent fromProto(const proto::ActiveSpeakersChanged &src) { + ActiveSpeakersChangedEvent ev; + for (const auto &id : src.participant_identities()) { + ev.participant_identities.push_back(id); + } + return ev; +} + +RoomMetadataChangedEvent fromProto(const proto::RoomMetadataChanged &src) { + RoomMetadataChangedEvent ev; + ev.metadata = src.metadata(); + return ev; +} + +RoomSidChangedEvent fromProto(const proto::RoomSidChanged &src) { + RoomSidChangedEvent ev; + ev.sid = src.sid(); + return ev; +} + +ParticipantMetadataChangedEvent +fromProto(const proto::ParticipantMetadataChanged &src) { + ParticipantMetadataChangedEvent ev; + ev.participant_identity = src.participant_identity(); + ev.metadata = src.metadata(); + return ev; +} + +ParticipantNameChangedEvent +fromProto(const proto::ParticipantNameChanged &src) { + ParticipantNameChangedEvent ev; + ev.participant_identity = src.participant_identity(); + ev.name = src.name(); + return ev; +} + +ParticipantAttributesChangedEvent +fromProto(const proto::ParticipantAttributesChanged &src) { + ParticipantAttributesChangedEvent ev; + ev.participant_identity = src.participant_identity(); + for (const auto &a : src.attributes()) { + ev.attributes.push_back(fromProto(a)); + } + for (const auto &a : src.changed_attributes()) { + ev.changed_attributes.push_back(fromProto(a)); + } + return ev; +} + +ParticipantEncryptionStatusChangedEvent +fromProto(const proto::ParticipantEncryptionStatusChanged &src) { + ParticipantEncryptionStatusChangedEvent ev; + ev.participant_identity = src.participant_identity(); + ev.is_encrypted = src.is_encrypted(); + return ev; +} + +ConnectionQualityChangedEvent +fromProto(const proto::ConnectionQualityChanged &src) { + ConnectionQualityChangedEvent ev; + ev.participant_identity = src.participant_identity(); + ev.quality = toConnectionQuality(src.quality()); + return ev; +} + +DataPacketReceivedEvent fromProto(const proto::DataPacketReceived &src) { + DataPacketReceivedEvent ev; + ev.kind = toDataPacketKind(src.kind()); + ev.participant_identity = src.participant_identity(); + + switch (src.value_case()) { + case proto::DataPacketReceived::kUser: + ev.user = fromProto(src.user()); + break; + case proto::DataPacketReceived::kSipDtmf: + ev.sip_dtmf = fromProto(src.sip_dtmf()); + break; + case proto::DataPacketReceived::VALUE_NOT_SET: + default: + break; + } + + return ev; +} + +TranscriptionReceivedEvent fromProto(const proto::TranscriptionReceived &src) { + TranscriptionReceivedEvent ev; + if (src.has_participant_identity()) { + ev.participant_identity = src.participant_identity(); + } + if (src.has_track_sid()) { + ev.track_sid = src.track_sid(); + } + for (const auto &seg : src.segments()) { + ev.segments.push_back(fromProto(seg)); + } + return ev; +} + +ConnectionStateChangedEvent +fromProto(const proto::ConnectionStateChanged &src) { + ConnectionStateChangedEvent ev; + ev.state = toConnectionState(src.state()); + return ev; +} + +DisconnectedEvent fromProto(const proto::Disconnected &src) { + DisconnectedEvent ev; + ev.reason = toDisconnectReason(src.reason()); + return ev; +} + +ReconnectingEvent fromProto(const proto::Reconnecting & /*src*/) { + return ReconnectingEvent{}; +} + +ReconnectedEvent fromProto(const proto::Reconnected & /*src*/) { + return ReconnectedEvent{}; +} + +RoomEosEvent fromProto(const proto::RoomEOS & /*src*/) { + return RoomEosEvent{}; +} + +DataStreamHeaderReceivedEvent +fromProto(const proto::DataStreamHeaderReceived &src) { + DataStreamHeaderReceivedEvent ev; + ev.participant_identity = src.participant_identity(); + ev.header = fromProto(src.header()); + return ev; +} + +DataStreamChunkReceivedEvent +fromProto(const proto::DataStreamChunkReceived &src) { + DataStreamChunkReceivedEvent ev; + ev.participant_identity = src.participant_identity(); + ev.chunk = fromProto(src.chunk()); + return ev; +} + +DataStreamTrailerReceivedEvent +fromProto(const proto::DataStreamTrailerReceived &src) { + DataStreamTrailerReceivedEvent ev; + ev.participant_identity = src.participant_identity(); + ev.trailer = fromProto(src.trailer()); + return ev; +} + +DataChannelBufferedAmountLowThresholdChangedEvent +fromProto(const proto::DataChannelBufferedAmountLowThresholdChanged &src) { + DataChannelBufferedAmountLowThresholdChangedEvent ev; + ev.kind = toDataPacketKind(src.kind()); + ev.threshold = src.threshold(); + return ev; +} + +ByteStreamOpenedEvent fromProto(const proto::ByteStreamOpened &src) { + ByteStreamOpenedEvent ev; + // TODO: map reader handle once OwnedByteStreamReader is known + // ev.reader_handle = src.reader().handle().id(); + ev.participant_identity = src.participant_identity(); + return ev; +} + +TextStreamOpenedEvent fromProto(const proto::TextStreamOpened &src) { + TextStreamOpenedEvent ev; + // TODO: map reader handle once OwnedTextStreamReader is known + // ev.reader_handle = src.reader().handle().id(); + ev.participant_identity = src.participant_identity(); + return ev; +} + +RoomUpdatedEvent roomUpdatedFromProto(const proto::RoomInfo &src) { + RoomUpdatedEvent ev; + ev.info = fromProto(src); + return ev; +} + +RoomMovedEvent roomMovedFromProto(const proto::RoomInfo &src) { + RoomMovedEvent ev; + ev.info = fromProto(src); + return ev; +} + +ParticipantsUpdatedEvent fromProto(const proto::ParticipantsUpdated &src) { + ParticipantsUpdatedEvent ev; + // We only know that it has ParticipantInfo participants = 1; + // TODO: fill real identities once you inspect proto::ParticipantInfo + for (const auto &p : src.participants()) { + ev.participant_identities.push_back(p.identity()); + } + return ev; +} + +E2eeStateChangedEvent fromProto(const proto::E2eeStateChanged &src) { + E2eeStateChangedEvent ev; + ev.participant_identity = src.participant_identity(); + ev.state = toEncryptionState(src.state()); + return ev; +} + +ChatMessageReceivedEvent fromProto(const proto::ChatMessageReceived &src) { + ChatMessageReceivedEvent ev; + ev.message = fromProto(src.message()); + ev.participant_identity = src.participant_identity(); + return ev; +} + +} // namespace livekit diff --git a/src/room_event_converter.h b/src/room_event_converter.h new file mode 100644 index 0000000..96d9c39 --- /dev/null +++ b/src/room_event_converter.h @@ -0,0 +1,108 @@ +/* + * Copyright 2023 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an “AS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "livekit/room_delegate.h" +#include "room.pb.h" + +namespace livekit { + +// --------- basic helper conversions --------- + +ConnectionQuality toConnectionQuality(proto::ConnectionQuality src); +ConnectionState toConnectionState(proto::ConnectionState src); +DataPacketKind toDataPacketKind(proto::DataPacketKind src); +EncryptionState toEncryptionState(proto::EncryptionState src); +DisconnectReason toDisconnectReason(proto::DisconnectReason src); + +TranscriptionSegmentData fromProto(const proto::TranscriptionSegment &src); +ChatMessageData fromProto(const proto::ChatMessage &src); +UserPacketData fromProto(const proto::UserPacket &src); +SipDtmfData fromProto(const proto::SipDTMF &src); +RoomInfoData fromProto(const proto::RoomInfo &src); +AttributeEntry fromProto(const proto::AttributesEntry &src); + +DataStreamHeaderData fromProto(const proto::DataStream_Header &src); +DataStreamChunkData fromProto(const proto::DataStream_Chunk &src); +DataStreamTrailerData fromProto(const proto::DataStream_Trailer &src); + +// --------- event conversions (RoomEvent.oneof message) --------- + +ParticipantConnectedEvent fromProto(const proto::ParticipantConnected &src); +ParticipantDisconnectedEvent +fromProto(const proto::ParticipantDisconnected &src); + +LocalTrackPublishedEvent fromProto(const proto::LocalTrackPublished &src); +LocalTrackUnpublishedEvent fromProto(const proto::LocalTrackUnpublished &src); +LocalTrackSubscribedEvent fromProto(const proto::LocalTrackSubscribed &src); + +TrackPublishedEvent fromProto(const proto::TrackPublished &src); +TrackUnpublishedEvent fromProto(const proto::TrackUnpublished &src); +TrackSubscribedEvent fromProto(const proto::TrackSubscribed &src); +TrackUnsubscribedEvent fromProto(const proto::TrackUnsubscribed &src); +TrackSubscriptionFailedEvent +fromProto(const proto::TrackSubscriptionFailed &src); +TrackMutedEvent fromProto(const proto::TrackMuted &src); +TrackUnmutedEvent fromProto(const proto::TrackUnmuted &src); + +ActiveSpeakersChangedEvent fromProto(const proto::ActiveSpeakersChanged &src); + +RoomMetadataChangedEvent fromProto(const proto::RoomMetadataChanged &src); +RoomSidChangedEvent fromProto(const proto::RoomSidChanged &src); + +ParticipantMetadataChangedEvent +fromProto(const proto::ParticipantMetadataChanged &src); +ParticipantNameChangedEvent fromProto(const proto::ParticipantNameChanged &src); +ParticipantAttributesChangedEvent +fromProto(const proto::ParticipantAttributesChanged &src); +ParticipantEncryptionStatusChangedEvent +fromProto(const proto::ParticipantEncryptionStatusChanged &src); + +ConnectionQualityChangedEvent +fromProto(const proto::ConnectionQualityChanged &src); + +DataPacketReceivedEvent fromProto(const proto::DataPacketReceived &src); +TranscriptionReceivedEvent fromProto(const proto::TranscriptionReceived &src); + +ConnectionStateChangedEvent fromProto(const proto::ConnectionStateChanged &src); +DisconnectedEvent fromProto(const proto::Disconnected &src); +ReconnectingEvent fromProto(const proto::Reconnecting &src); +ReconnectedEvent fromProto(const proto::Reconnected &src); +RoomEosEvent fromProto(const proto::RoomEOS &src); + +DataStreamHeaderReceivedEvent +fromProto(const proto::DataStreamHeaderReceived &src); +DataStreamChunkReceivedEvent +fromProto(const proto::DataStreamChunkReceived &src); +DataStreamTrailerReceivedEvent +fromProto(const proto::DataStreamTrailerReceived &src); + +DataChannelBufferedAmountLowThresholdChangedEvent +fromProto(const proto::DataChannelBufferedAmountLowThresholdChanged &src); + +ByteStreamOpenedEvent fromProto(const proto::ByteStreamOpened &src); +TextStreamOpenedEvent fromProto(const proto::TextStreamOpened &src); + +RoomUpdatedEvent +roomUpdatedFromProto(const proto::RoomInfo &src); // room_updated +RoomMovedEvent roomMovedFromProto(const proto::RoomInfo &src); // moved + +ParticipantsUpdatedEvent fromProto(const proto::ParticipantsUpdated &src); +E2eeStateChangedEvent fromProto(const proto::E2eeStateChanged &src); +ChatMessageReceivedEvent fromProto(const proto::ChatMessageReceived &src); + +} // namespace livekit diff --git a/src/stats.cpp b/src/stats.cpp new file mode 100644 index 0000000..8f3ba27 --- /dev/null +++ b/src/stats.cpp @@ -0,0 +1,658 @@ +/* + * Copyright 2023 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an “AS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "livekit/stats.h" + +#include "stats.pb.h" + +namespace livekit { + +namespace { + +// --- enum helpers --- + +DataChannelState fromProto(livekit::proto::DataChannelState s) { + using P = livekit::proto::DataChannelState; + switch (s) { + case P::DC_CONNECTING: + return DataChannelState::Connecting; + case P::DC_OPEN: + return DataChannelState::Open; + case P::DC_CLOSING: + return DataChannelState::Closing; + case P::DC_CLOSED: + return DataChannelState::Closed; + default: + return DataChannelState::Unknown; + } +} + +QualityLimitationReason fromProto(livekit::proto::QualityLimitationReason r) { + using P = livekit::proto::QualityLimitationReason; + switch (r) { + case P::LIMITATION_NONE: + return QualityLimitationReason::None; + case P::LIMITATION_CPU: + return QualityLimitationReason::Cpu; + case P::LIMITATION_BANDWIDTH: + return QualityLimitationReason::Bandwidth; + case P::LIMITATION_OTHER: + return QualityLimitationReason::Other; + default: + return QualityLimitationReason::Other; + } +} + +IceRole fromProto(livekit::proto::IceRole r) { + using P = livekit::proto::IceRole; + switch (r) { + case P::ICE_CONTROLLING: + return IceRole::Controlling; + case P::ICE_CONTROLLED: + return IceRole::Controlled; + case P::ICE_UNKNOWN: + default: + return IceRole::Unknown; + } +} + +DtlsTransportState fromProto(livekit::proto::DtlsTransportState s) { + using P = livekit::proto::DtlsTransportState; + switch (s) { + case P::DTLS_TRANSPORT_NEW: + return DtlsTransportState::New; + case P::DTLS_TRANSPORT_CONNECTING: + return DtlsTransportState::Connecting; + case P::DTLS_TRANSPORT_CONNECTED: + return DtlsTransportState::Connected; + case P::DTLS_TRANSPORT_CLOSED: + return DtlsTransportState::Closed; + case P::DTLS_TRANSPORT_FAILED: + return DtlsTransportState::Failed; + default: + return DtlsTransportState::Unknown; + } +} + +IceTransportState fromProto(livekit::proto::IceTransportState s) { + using P = livekit::proto::IceTransportState; + switch (s) { + case P::ICE_TRANSPORT_NEW: + return IceTransportState::New; + case P::ICE_TRANSPORT_CHECKING: + return IceTransportState::Checking; + case P::ICE_TRANSPORT_CONNECTED: + return IceTransportState::Connected; + case P::ICE_TRANSPORT_COMPLETED: + return IceTransportState::Completed; + case P::ICE_TRANSPORT_DISCONNECTED: + return IceTransportState::Disconnected; + case P::ICE_TRANSPORT_FAILED: + return IceTransportState::Failed; + case P::ICE_TRANSPORT_CLOSED: + return IceTransportState::Closed; + default: + return IceTransportState::Unknown; + } +} + +DtlsRole fromProto(livekit::proto::DtlsRole r) { + using P = livekit::proto::DtlsRole; + switch (r) { + case P::DTLS_CLIENT: + return DtlsRole::Client; + case P::DTLS_SERVER: + return DtlsRole::Server; + case P::DTLS_UNKNOWN: + default: + return DtlsRole::Unknown; + } +} + +IceCandidatePairState fromProto(livekit::proto::IceCandidatePairState s) { + using P = livekit::proto::IceCandidatePairState; + switch (s) { + case P::PAIR_FROZEN: + return IceCandidatePairState::Frozen; + case P::PAIR_WAITING: + return IceCandidatePairState::Waiting; + case P::PAIR_IN_PROGRESS: + return IceCandidatePairState::InProgress; + case P::PAIR_FAILED: + return IceCandidatePairState::Failed; + case P::PAIR_SUCCEEDED: + return IceCandidatePairState::Succeeded; + default: + return IceCandidatePairState::Unknown; + } +} + +IceCandidateType fromProto(livekit::proto::IceCandidateType t) { + using P = livekit::proto::IceCandidateType; + switch (t) { + case P::HOST: + return IceCandidateType::Host; + case P::SRFLX: + return IceCandidateType::Srflx; + case P::PRFLX: + return IceCandidateType::Prflx; + case P::RELAY: + return IceCandidateType::Relay; + default: + return IceCandidateType::Unknown; + } +} + +IceServerTransportProtocol +fromProto(livekit::proto::IceServerTransportProtocol p) { + using P = livekit::proto::IceServerTransportProtocol; + switch (p) { + case P::TRANSPORT_UDP: + return IceServerTransportProtocol::Udp; + case P::TRANSPORT_TCP: + return IceServerTransportProtocol::Tcp; + case P::TRANSPORT_TLS: + return IceServerTransportProtocol::Tls; + default: + return IceServerTransportProtocol::Unknown; + } +} + +IceTcpCandidateType fromProto(livekit::proto::IceTcpCandidateType t) { + using P = livekit::proto::IceTcpCandidateType; + switch (t) { + case P::CANDIDATE_ACTIVE: + return IceTcpCandidateType::Active; + case P::CANDIDATE_PASSIVE: + return IceTcpCandidateType::Passive; + case P::CANDIDATE_SO: + return IceTcpCandidateType::So; + default: + return IceTcpCandidateType::Unknown; + } +} + +} // namespace + +// ---------------------- +// Leaf conversions +// ---------------------- + +RtcStatsData fromProto(const proto::RtcStatsData &s) { + RtcStatsData out; + out.id = s.id(); + out.timestamp_ms = s.timestamp(); + return out; +} + +CodecStats fromProto(const proto::CodecStats &s) { + CodecStats out; + out.payload_type = s.payload_type(); + out.transport_id = s.transport_id(); + out.mime_type = s.mime_type(); + out.clock_rate = s.clock_rate(); + out.channels = s.channels(); + out.sdp_fmtp_line = s.sdp_fmtp_line(); + return out; +} + +RtpStreamStats fromProto(const proto::RtpStreamStats &s) { + RtpStreamStats out; + out.ssrc = s.ssrc(); + out.kind = s.kind(); + out.transport_id = s.transport_id(); + out.codec_id = s.codec_id(); + return out; +} + +ReceivedRtpStreamStats fromProto(const proto::ReceivedRtpStreamStats &s) { + ReceivedRtpStreamStats out; + out.packets_received = s.packets_received(); + out.packets_lost = s.packets_lost(); + out.jitter = s.jitter(); + return out; +} + +InboundRtpStreamStats fromProto(const proto::InboundRtpStreamStats &s) { + InboundRtpStreamStats out; + out.track_identifier = s.track_identifier(); + out.mid = s.mid(); + out.remote_id = s.remote_id(); + out.frames_decoded = s.frames_decoded(); + out.key_frames_decoded = s.key_frames_decoded(); + out.frames_rendered = s.frames_rendered(); + out.frames_dropped = s.frames_dropped(); + out.frame_width = s.frame_width(); + out.frame_height = s.frame_height(); + out.frames_per_second = s.frames_per_second(); + out.qp_sum = s.qp_sum(); + out.total_decode_time = s.total_decode_time(); + out.total_inter_frame_delay = s.total_inter_frame_delay(); + out.total_squared_inter_frame_delay = s.total_squared_inter_frame_delay(); + out.pause_count = s.pause_count(); + out.total_pause_duration = s.total_pause_duration(); + out.freeze_count = s.freeze_count(); + out.total_freeze_duration = s.total_freeze_duration(); + out.last_packet_received_timestamp = s.last_packet_received_timestamp(); + out.header_bytes_received = s.header_bytes_received(); + out.packets_discarded = s.packets_discarded(); + out.fec_bytes_received = s.fec_bytes_received(); + out.fec_packets_received = s.fec_packets_received(); + out.fec_packets_discarded = s.fec_packets_discarded(); + out.bytes_received = s.bytes_received(); + out.nack_count = s.nack_count(); + out.fir_count = s.fir_count(); + out.pli_count = s.pli_count(); + out.total_processing_delay = s.total_processing_delay(); + out.estimated_playout_timestamp = s.estimated_playout_timestamp(); + out.jitter_buffer_delay = s.jitter_buffer_delay(); + out.jitter_buffer_target_delay = s.jitter_buffer_target_delay(); + out.jitter_buffer_emitted_count = s.jitter_buffer_emitted_count(); + out.jitter_buffer_minimum_delay = s.jitter_buffer_minimum_delay(); + out.total_samples_received = s.total_samples_received(); + out.concealed_samples = s.concealed_samples(); + out.silent_concealed_samples = s.silent_concealed_samples(); + out.concealment_events = s.concealment_events(); + out.inserted_samples_for_deceleration = s.inserted_samples_for_deceleration(); + out.removed_samples_for_acceleration = s.removed_samples_for_acceleration(); + out.audio_level = s.audio_level(); + out.total_audio_energy = s.total_audio_energy(); + out.total_samples_duration = s.total_samples_duration(); + out.frames_received = s.frames_received(); + out.decoder_implementation = s.decoder_implementation(); + out.playout_id = s.playout_id(); + out.power_efficient_decoder = s.power_efficient_decoder(); + out.frames_assembled_from_multiple_packets = + s.frames_assembled_from_multiple_packets(); + out.total_assembly_time = s.total_assembly_time(); + out.retransmitted_packets_received = s.retransmitted_packets_received(); + out.retransmitted_bytes_received = s.retransmitted_bytes_received(); + out.rtx_ssrc = s.rtx_ssrc(); + out.fec_ssrc = s.fec_ssrc(); + return out; +} + +SentRtpStreamStats fromProto(const proto::SentRtpStreamStats &s) { + SentRtpStreamStats out; + out.packets_sent = s.packets_sent(); + out.bytes_sent = s.bytes_sent(); + return out; +} + +OutboundRtpStreamStats fromProto(const proto::OutboundRtpStreamStats &s) { + OutboundRtpStreamStats out; + out.mid = s.mid(); + out.media_source_id = s.media_source_id(); + out.remote_id = s.remote_id(); + out.rid = s.rid(); + out.header_bytes_sent = s.header_bytes_sent(); + out.retransmitted_packets_sent = s.retransmitted_packets_sent(); + out.retransmitted_bytes_sent = s.retransmitted_bytes_sent(); + out.rtx_ssrc = s.rtx_ssrc(); + out.target_bitrate = s.target_bitrate(); + out.total_encoded_bytes_target = s.total_encoded_bytes_target(); + out.frame_width = s.frame_width(); + out.frame_height = s.frame_height(); + out.frames_per_second = s.frames_per_second(); + out.frames_sent = s.frames_sent(); + out.huge_frames_sent = s.huge_frames_sent(); + out.frames_encoded = s.frames_encoded(); + out.key_frames_encoded = s.key_frames_encoded(); + out.qp_sum = s.qp_sum(); + out.total_encode_time = s.total_encode_time(); + out.total_packet_send_delay = s.total_packet_send_delay(); + out.quality_limitation_reason = fromProto(s.quality_limitation_reason()); + out.quality_limitation_durations.clear(); + for (const auto &kv : s.quality_limitation_durations()) { + out.quality_limitation_durations.emplace(kv.first, kv.second); + } + out.quality_limitation_resolution_changes = + s.quality_limitation_resolution_changes(); + out.nack_count = s.nack_count(); + out.fir_count = s.fir_count(); + out.pli_count = s.pli_count(); + out.encoder_implementation = s.encoder_implementation(); + out.power_efficient_encoder = s.power_efficient_encoder(); + out.active = s.active(); + out.scalability_mode = s.scalability_mode(); + return out; +} + +RemoteInboundRtpStreamStats +fromProto(const proto::RemoteInboundRtpStreamStats &s) { + RemoteInboundRtpStreamStats out; + out.local_id = s.local_id(); + out.round_trip_time = s.round_trip_time(); + out.total_round_trip_time = s.total_round_trip_time(); + out.fraction_lost = s.fraction_lost(); + out.round_trip_time_measurements = s.round_trip_time_measurements(); + return out; +} + +RemoteOutboundRtpStreamStats +fromProto(const proto::RemoteOutboundRtpStreamStats &s) { + RemoteOutboundRtpStreamStats out; + out.local_id = s.local_id(); + out.remote_timestamp = s.remote_timestamp(); + out.reports_sent = s.reports_sent(); + out.round_trip_time = s.round_trip_time(); + out.total_round_trip_time = s.total_round_trip_time(); + out.round_trip_time_measurements = s.round_trip_time_measurements(); + return out; +} + +MediaSourceStats fromProto(const proto::MediaSourceStats &s) { + MediaSourceStats out; + out.track_identifier = s.track_identifier(); + out.kind = s.kind(); + return out; +} + +AudioSourceStats fromProto(const proto::AudioSourceStats &s) { + AudioSourceStats out; + out.audio_level = s.audio_level(); + out.total_audio_energy = s.total_audio_energy(); + out.total_samples_duration = s.total_samples_duration(); + out.echo_return_loss = s.echo_return_loss(); + out.echo_return_loss_enhancement = s.echo_return_loss_enhancement(); + out.dropped_samples_duration = s.dropped_samples_duration(); + out.dropped_samples_events = s.dropped_samples_events(); + out.total_capture_delay = s.total_capture_delay(); + out.total_samples_captured = s.total_samples_captured(); + return out; +} + +VideoSourceStats fromProto(const proto::VideoSourceStats &s) { + VideoSourceStats out; + out.width = s.width(); + out.height = s.height(); + out.frames = s.frames(); + out.frames_per_second = s.frames_per_second(); + return out; +} + +AudioPlayoutStats fromProto(const proto::AudioPlayoutStats &s) { + AudioPlayoutStats out; + out.kind = s.kind(); + out.synthesized_samples_duration = s.synthesized_samples_duration(); + out.synthesized_samples_events = s.synthesized_samples_events(); + out.total_samples_duration = s.total_samples_duration(); + out.total_playout_delay = s.total_playout_delay(); + out.total_samples_count = s.total_samples_count(); + return out; +} + +PeerConnectionStats fromProto(const proto::PeerConnectionStats &s) { + PeerConnectionStats out; + out.data_channels_opened = s.data_channels_opened(); + out.data_channels_closed = s.data_channels_closed(); + return out; +} + +DataChannelStats fromProto(const proto::DataChannelStats &s) { + DataChannelStats out; + out.label = s.label(); + out.protocol = s.protocol(); + out.data_channel_identifier = s.data_channel_identifier(); + if (s.has_state()) { + out.state = fromProto(s.state()); + } else { + out.state.reset(); + } + out.messages_sent = s.messages_sent(); + out.bytes_sent = s.bytes_sent(); + out.messages_received = s.messages_received(); + out.bytes_received = s.bytes_received(); + return out; +} + +TransportStats fromProto(const proto::TransportStats &s) { + TransportStats out; + out.packets_sent = s.packets_sent(); + out.packets_received = s.packets_received(); + out.bytes_sent = s.bytes_sent(); + out.bytes_received = s.bytes_received(); + out.ice_role = fromProto(s.ice_role()); + out.ice_local_username_fragment = s.ice_local_username_fragment(); + if (s.has_dtls_state()) { + out.dtls_state = fromProto(s.dtls_state()); + } else { + out.dtls_state.reset(); + } + if (s.has_ice_state()) { + out.ice_state = fromProto(s.ice_state()); + } else { + out.ice_state.reset(); + } + out.selected_candidate_pair_id = s.selected_candidate_pair_id(); + out.local_certificate_id = s.local_certificate_id(); + out.remote_certificate_id = s.remote_certificate_id(); + out.tls_version = s.tls_version(); + out.dtls_cipher = s.dtls_cipher(); + out.dtls_role = fromProto(s.dtls_role()); + out.srtp_cipher = s.srtp_cipher(); + out.selected_candidate_pair_changes = s.selected_candidate_pair_changes(); + return out; +} + +CandidatePairStats fromProto(const proto::CandidatePairStats &s) { + CandidatePairStats out; + out.transport_id = s.transport_id(); + out.local_candidate_id = s.local_candidate_id(); + out.remote_candidate_id = s.remote_candidate_id(); + if (s.has_state()) { + out.state = fromProto(s.state()); + } else { + out.state.reset(); + } + out.nominated = s.nominated(); + out.packets_sent = s.packets_sent(); + out.packets_received = s.packets_received(); + out.bytes_sent = s.bytes_sent(); + out.bytes_received = s.bytes_received(); + out.last_packet_sent_timestamp = s.last_packet_sent_timestamp(); + out.last_packet_received_timestamp = s.last_packet_received_timestamp(); + out.total_round_trip_time = s.total_round_trip_time(); + out.current_round_trip_time = s.current_round_trip_time(); + out.available_outgoing_bitrate = s.available_outgoing_bitrate(); + out.available_incoming_bitrate = s.available_incoming_bitrate(); + out.requests_received = s.requests_received(); + out.requests_sent = s.requests_sent(); + out.responses_received = s.responses_received(); + out.responses_sent = s.responses_sent(); + out.consent_requests_sent = s.consent_requests_sent(); + out.packets_discarded_on_send = s.packets_discarded_on_send(); + out.bytes_discarded_on_send = s.bytes_discarded_on_send(); + return out; +} + +IceCandidateStats fromProto(const proto::IceCandidateStats &s) { + IceCandidateStats out; + out.transport_id = s.transport_id(); + out.address = s.address(); + out.port = s.port(); + out.protocol = s.protocol(); + if (s.has_candidate_type()) { + out.candidate_type = fromProto(s.candidate_type()); + } else { + out.candidate_type.reset(); + } + out.priority = s.priority(); + out.url = s.url(); + if (s.has_relay_protocol()) { + out.relay_protocol = fromProto(s.relay_protocol()); + } else { + out.relay_protocol.reset(); + } + out.foundation = s.foundation(); + out.related_address = s.related_address(); + out.related_port = s.related_port(); + out.username_fragment = s.username_fragment(); + if (s.has_tcp_type()) { + out.tcp_type = fromProto(s.tcp_type()); + } else { + out.tcp_type.reset(); + } + return out; +} + +CertificateStats fromProto(const proto::CertificateStats &s) { + CertificateStats out; + out.fingerprint = s.fingerprint(); + out.fingerprint_algorithm = s.fingerprint_algorithm(); + out.base64_certificate = s.base64_certificate(); + out.issuer_certificate_id = s.issuer_certificate_id(); + return out; +} + +StreamStats fromProto(const proto::StreamStats &s) { + StreamStats out; + out.id = s.id(); + out.stream_identifier = s.stream_identifier(); + return out; +} + +// ---------------------- +// High-level RtcStats fromProto +// ---------------------- + +RtcStats fromProto(const proto::RtcStats &s) { + using P = proto::RtcStats; + + switch (s.stats_case()) { + case P::kCodec: { + RtcCodecStats out; + out.rtc = fromProto(s.codec().rtc()); + out.codec = fromProto(s.codec().codec()); + return RtcStats{std::move(out)}; + } + case P::kInboundRtp: { + RtcInboundRtpStats out; + out.rtc = fromProto(s.inbound_rtp().rtc()); + out.stream = fromProto(s.inbound_rtp().stream()); + out.received = fromProto(s.inbound_rtp().received()); + out.inbound = fromProto(s.inbound_rtp().inbound()); + return RtcStats{std::move(out)}; + } + case P::kOutboundRtp: { + RtcOutboundRtpStats out; + out.rtc = fromProto(s.outbound_rtp().rtc()); + out.stream = fromProto(s.outbound_rtp().stream()); + out.sent = fromProto(s.outbound_rtp().sent()); + out.outbound = fromProto(s.outbound_rtp().outbound()); + return RtcStats{std::move(out)}; + } + case P::kRemoteInboundRtp: { + RtcRemoteInboundRtpStats out; + out.rtc = fromProto(s.remote_inbound_rtp().rtc()); + out.stream = fromProto(s.remote_inbound_rtp().stream()); + out.received = fromProto(s.remote_inbound_rtp().received()); + out.remote_inbound = fromProto(s.remote_inbound_rtp().remote_inbound()); + return RtcStats{std::move(out)}; + } + case P::kRemoteOutboundRtp: { + RtcRemoteOutboundRtpStats out; + out.rtc = fromProto(s.remote_outbound_rtp().rtc()); + out.stream = fromProto(s.remote_outbound_rtp().stream()); + out.sent = fromProto(s.remote_outbound_rtp().sent()); + out.remote_outbound = fromProto(s.remote_outbound_rtp().remote_outbound()); + return RtcStats{std::move(out)}; + } + case P::kMediaSource: { + RtcMediaSourceStats out; + out.rtc = fromProto(s.media_source().rtc()); + out.source = fromProto(s.media_source().source()); + out.audio = fromProto(s.media_source().audio()); + out.video = fromProto(s.media_source().video()); + return RtcStats{std::move(out)}; + } + case P::kMediaPlayout: { + RtcMediaPlayoutStats out; + out.rtc = fromProto(s.media_playout().rtc()); + out.audio_playout = fromProto(s.media_playout().audio_playout()); + return RtcStats{std::move(out)}; + } + case P::kPeerConnection: { + RtcPeerConnectionStats out; + out.rtc = fromProto(s.peer_connection().rtc()); + out.pc = fromProto(s.peer_connection().pc()); + return RtcStats{std::move(out)}; + } + case P::kDataChannel: { + RtcDataChannelStats out; + out.rtc = fromProto(s.data_channel().rtc()); + out.dc = fromProto(s.data_channel().dc()); + return RtcStats{std::move(out)}; + } + case P::kTransport: { + RtcTransportStats out; + out.rtc = fromProto(s.transport().rtc()); + out.transport = fromProto(s.transport().transport()); + return RtcStats{std::move(out)}; + } + case P::kCandidatePair: { + RtcCandidatePairStats out; + out.rtc = fromProto(s.candidate_pair().rtc()); + out.candidate_pair = fromProto(s.candidate_pair().candidate_pair()); + return RtcStats{std::move(out)}; + } + case P::kLocalCandidate: { + RtcLocalCandidateStats out; + out.rtc = fromProto(s.local_candidate().rtc()); + out.candidate = fromProto(s.local_candidate().candidate()); + return RtcStats{std::move(out)}; + } + case P::kRemoteCandidate: { + RtcRemoteCandidateStats out; + out.rtc = fromProto(s.remote_candidate().rtc()); + out.candidate = fromProto(s.remote_candidate().candidate()); + return RtcStats{std::move(out)}; + } + case P::kCertificate: { + RtcCertificateStats out; + out.rtc = fromProto(s.certificate().rtc()); + out.certificate = fromProto(s.certificate().certificate()); + return RtcStats{std::move(out)}; + } + case P::kStream: { + RtcStreamStats out; + out.rtc = fromProto(s.stream().rtc()); + out.stream = fromProto(s.stream().stream()); + return RtcStats{std::move(out)}; + } + case P::kTrack: + // Deprecated; fall through to default + case P::STATS_NOT_SET: + default: { + // You might want to handle this differently (throw, assert, etc.) + RtcCodecStats dummy{}; + dummy.rtc = RtcStatsData{}; + dummy.codec = CodecStats{}; + return RtcStats{std::move(dummy)}; + } + } +} + +std::vector fromProto(const std::vector &src) { + std::vector out; + out.reserve(src.size()); + for (const auto &s : src) { + out.push_back(fromProto(s)); + } + return out; +} + +} // namespace livekit diff --git a/src/track.cpp b/src/track.cpp new file mode 100644 index 0000000..f817001 --- /dev/null +++ b/src/track.cpp @@ -0,0 +1,55 @@ +/* + * Copyright 2023 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an “AS IS” BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "livekit/track.h" + +#include "livekit/ffi_client.h" +#include +#include + +namespace livekit { + +Track::Track(std::weak_ptr handle, std::string sid, std::string name, + TrackKind kind, StreamState state, bool muted, bool remote) + : handle_(std::move(handle)), sid_(std::move(sid)), name_(std::move(name)), + kind_(kind), state_(state), muted_(muted), remote_(remote) {} + +void Track::setPublicationFields(std::optional source, + std::optional simulcasted, + std::optional width, + std::optional height, + std::optional mime_type) { + source_ = source; + simulcasted_ = simulcasted; + width_ = width; + height_ = height; + mime_type_ = std::move(mime_type); +} + +std::future> Track::getStats() const { + auto id = ffi_handle_id(); + if (!id) { + // make a ready future with an empty vector + std::promise> pr; + pr.set_value({}); + return pr.get_future(); + } + + // just forward the future from FfiClient + return FfiClient::instance().getTrackStatsAsync(id); +} + +} // namespace livekit \ No newline at end of file