From a529b2b81bc5957d006c8def11d0bfc69a5cc2ac Mon Sep 17 00:00:00 2001 From: aee <117306596+aee-google@users.noreply.github.com> Date: Mon, 29 Apr 2024 13:36:50 -0700 Subject: [PATCH] Clean up telemetry_test and fix. (#3047) b/330355045 The fix includes fixing the starboard implementation of `MessagePump`: `MessagePumpUIStarboard` and `MessagePumpIOStarboard`. --- base/BUILD.gn | 5 + .../message_loop/message_pump_io_starboard.cc | 66 ++-- base/message_loop/message_pump_io_starboard.h | 24 +- .../message_pump_io_starboard_unittest.cc | 284 ++++++++++++++++++ .../message_loop/message_pump_ui_starboard.cc | 162 +++++----- base/message_loop/message_pump_ui_starboard.h | 34 +-- .../waitable_event_watcher_starboard.cc | 16 + cobalt/black_box_tests/black_box_tests.py | 2 +- .../testdata/telemetry_test.html | 98 +++--- .../cobalt_metrics_services_manager.cc | 24 +- .../metrics/cobalt_metrics_services_manager.h | 5 +- cobalt/h5vcc/h5vcc_metrics.cc | 43 ++- cobalt/h5vcc/h5vcc_metrics.h | 11 +- cobalt/h5vcc/h5vcc_metrics.idl | 4 +- .../metrics/metrics_upload_scheduler.cc | 5 +- components/metrics/metrics_upload_scheduler.h | 3 - net/socket/tcp_socket_starboard.cc | 1 + net/socket/udp_socket_starboard.cc | 1 + 18 files changed, 556 insertions(+), 232 deletions(-) create mode 100644 base/message_loop/message_pump_io_starboard_unittest.cc create mode 100644 base/synchronization/waitable_event_watcher_starboard.cc diff --git a/base/BUILD.gn b/base/BUILD.gn index 18d32d13d39..90b49d1a464 100644 --- a/base/BUILD.gn +++ b/base/BUILD.gn @@ -994,6 +994,7 @@ component("base") { "synchronization/condition_variable_starboard.cc", "synchronization/lock_impl_starboard.cc", "synchronization/waitable_event_starboard.cc", + "synchronization/waitable_event_watcher_starboard.cc", "sys_info_starboard.cc", "threading/platform_thread_starboard.cc", "threading/thread_local_storage_starboard.cc", @@ -3896,6 +3897,10 @@ test("base_unittests") { sources += [ "debug/allocation_trace_unittest.cc" ] } + if (is_starboard) { + sources += [ "message_loop/message_pump_io_starboard_unittest.cc" ] + } + if (is_ios && !is_starboard) { sources += [ "ios/device_util_unittest.mm", diff --git a/base/message_loop/message_pump_io_starboard.cc b/base/message_loop/message_pump_io_starboard.cc index bec2652af4d..d9ce934a204 100644 --- a/base/message_loop/message_pump_io_starboard.cc +++ b/base/message_loop/message_pump_io_starboard.cc @@ -25,11 +25,12 @@ namespace base { -MessagePumpIOStarboard::SocketWatcher::SocketWatcher() - : interests_(kSbSocketWaiterInterestNone), +MessagePumpIOStarboard::SocketWatcher::SocketWatcher(const Location& from_here) + : created_from_location_(from_here), + interests_(kSbSocketWaiterInterestNone), socket_(kSbSocketInvalid), - pump_(NULL), - watcher_(NULL), + pump_(nullptr), + watcher_(nullptr), weak_factory_(this) {} MessagePumpIOStarboard::SocketWatcher::~SocketWatcher() { @@ -39,7 +40,10 @@ MessagePumpIOStarboard::SocketWatcher::~SocketWatcher() { } bool MessagePumpIOStarboard::SocketWatcher::StopWatchingSocket() { + watcher_ = nullptr; + interests_ = kSbSocketWaiterInterestNone; if (!SbSocketIsValid(socket_)) { + pump_ = nullptr; // If this watcher is not watching anything, no-op and return success. return true; } @@ -50,9 +54,7 @@ bool MessagePumpIOStarboard::SocketWatcher::StopWatchingSocket() { DCHECK(pump_); result = pump_->StopWatching(socket); } - pump_ = NULL; - watcher_ = NULL; - interests_ = kSbSocketWaiterInterestNone; + pump_ = nullptr; return result; } @@ -92,7 +94,6 @@ void MessagePumpIOStarboard::SocketWatcher::OnSocketReadyToWrite( MessagePumpIOStarboard::MessagePumpIOStarboard() : keep_running_(true), - in_run_(false), processed_io_events_(false), waiter_(SbSocketWaiterCreate()) {} @@ -170,16 +171,14 @@ void MessagePumpIOStarboard::RemoveIOObserver(IOObserver* obs) { // Reentrant! void MessagePumpIOStarboard::Run(Delegate* delegate) { - AutoReset auto_reset_in_run(&in_run_, true); + AutoReset auto_reset_keep_running(&keep_running_, true); for (;;) { Delegate::NextWorkInfo next_work_info = delegate->DoWork(); - bool has_more_immediate_work = next_work_info.is_immediate(); - if (!keep_running_) - break; + bool immediate_work_available = next_work_info.is_immediate(); - if (has_more_immediate_work) - continue; + if (should_quit()) + break; // NOTE: We need to have a wake-up pending any time there is work queued, // and the MessageLoop only wakes up the pump when the work queue goes from @@ -193,44 +192,37 @@ void MessagePumpIOStarboard::Run(Delegate* delegate) { // loop and call delegate->DoWork() before we decide to block. SbSocketWaiterResult result = SbSocketWaiterWaitTimed(waiter_, 0); DCHECK_NE(kSbSocketWaiterResultInvalid, result); - has_more_immediate_work |= - (result == kSbSocketWaiterResultWokenUp) || processed_io_events_; + + bool attempt_more_work = + (result == kSbSocketWaiterResultWokenUp) || immediate_work_available || processed_io_events_; processed_io_events_ = false; - if (!keep_running_) + + if (should_quit()) break; - if (has_more_immediate_work) + if (attempt_more_work) continue; - has_more_immediate_work = delegate->DoIdleWork(); - if (!keep_running_) + attempt_more_work = delegate->DoIdleWork(); + + if (should_quit()) break; - if (has_more_immediate_work) + if (attempt_more_work) continue; - if (!next_work_info.delayed_run_time.is_null()) { - delayed_work_time_ = next_work_info.delayed_run_time; - } - if (delayed_work_time_.is_null()) { + if (next_work_info.delayed_run_time.is_max()) { SbSocketWaiterWait(waiter_); } else { - TimeDelta delay = delayed_work_time_ - TimeTicks::Now(); - if (delay > TimeDelta()) { - SbSocketWaiterWaitTimed(waiter_, delay.InMicroseconds()); - } else { - // It looks like delayed_work_time_ indicates a time in the past, so we - // need to call DoDelayedWork now. - delayed_work_time_ = TimeTicks(); - } + SbSocketWaiterWaitTimed(waiter_, next_work_info.remaining_delay().InMicroseconds()); } - } - keep_running_ = true; + if (should_quit()) + break; + } } void MessagePumpIOStarboard::Quit() { - DCHECK(in_run_); // Tell both the SbObjectWaiter and Run that they should break out of their // loops. keep_running_ = false; @@ -246,8 +238,6 @@ void MessagePumpIOStarboard::ScheduleDelayedWork( // We know that we can't be blocked on Wait right now since this method can // only be called on the same thread as Run, so we only need to update our // record of how long to sleep when we do sleep. - delayed_work_time_ = next_work_info.delayed_run_time; - ScheduleWork(); } void MessagePumpIOStarboard::WillProcessIOEvent() { diff --git a/base/message_loop/message_pump_io_starboard.h b/base/message_loop/message_pump_io_starboard.h index b09fd1a98ce..b7d807dec91 100644 --- a/base/message_loop/message_pump_io_starboard.h +++ b/base/message_loop/message_pump_io_starboard.h @@ -59,9 +59,12 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump { // Object returned by WatchSocket to manage further watching. class SocketWatcher { public: - SocketWatcher(); + SocketWatcher(const Location& from_here); ~SocketWatcher(); // Implicitly calls StopWatchingSocket. + SocketWatcher(const SocketWatcher&) = delete; + SocketWatcher& operator=(const SocketWatcher&) = delete; + // NOTE: These methods aren't called StartWatching()/StopWatching() to avoid // confusion with the win32 ObjectWatcher class. @@ -90,14 +93,13 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump { void OnSocketReadyToRead(SbSocket socket, MessagePumpIOStarboard* pump); void OnSocketReadyToWrite(SbSocket socket, MessagePumpIOStarboard* pump); + const Location created_from_location_; int interests_; SbSocket socket_; bool persistent_; MessagePumpIOStarboard* pump_; Watcher* watcher_; base::WeakPtrFactory weak_factory_; - - // DISALLOW_COPY_AND_ASSIGN(SocketWatcher); }; enum Mode { @@ -107,6 +109,10 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump { }; MessagePumpIOStarboard(); + virtual ~MessagePumpIOStarboard(); + + MessagePumpIOStarboard(const MessagePumpIOStarboard&) = delete; + MessagePumpIOStarboard& operator=(const MessagePumpIOStarboard&) = delete; // Have the current thread's message loop watch for a a situation in which // reading/writing to the socket can be performed without blocking. Callers @@ -135,9 +141,6 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump { virtual void ScheduleWork() override; virtual void ScheduleDelayedWork(const Delegate::NextWorkInfo& next_work_info) override; -// protected: - virtual ~MessagePumpIOStarboard(); - private: friend class MessagePumpIOStarboardTest; @@ -151,25 +154,20 @@ class BASE_EXPORT MessagePumpIOStarboard : public MessagePump { void* context, int ready_interests); + bool should_quit() const { return !keep_running_; } + // This flag is set to false when Run should return. bool keep_running_; - // This flag is set when inside Run. - bool in_run_; - // This flag is set if the Socket Waiter has processed I/O events. bool processed_io_events_; - // The time at which we should call DoDelayedWork. - TimeTicks delayed_work_time_; - // Starboard socket waiter dispatcher. Waits for all sockets registered with // it, and sends readiness callbacks when a socket is ready for I/O. SbSocketWaiter waiter_; ObserverList io_observers_; THREAD_CHECKER(watch_socket_caller_checker_); - // DISALLOW_COPY_AND_ASSIGN(MessagePumpIOStarboard); }; using MessagePumpForIO = MessagePumpIOStarboard; diff --git a/base/message_loop/message_pump_io_starboard_unittest.cc b/base/message_loop/message_pump_io_starboard_unittest.cc new file mode 100644 index 00000000000..c6231f70d88 --- /dev/null +++ b/base/message_loop/message_pump_io_starboard_unittest.cc @@ -0,0 +1,284 @@ +// Copyright 2024 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "base/message_loop/message_pump_io_starboard.h" + +#include + +#include + +#include "base/functional/bind.h" +#include "base/run_loop.h" +#include "base/synchronization/waitable_event.h" +#include "base/synchronization/waitable_event_watcher.h" +#include "base/task/single_thread_task_executor.h" +#include "base/task/single_thread_task_runner.h" +#include "base/test/gtest_util.h" +#include "base/test/task_environment.h" +#include "base/threading/thread.h" +#include "testing/gtest/include/gtest/gtest.h" + +namespace base { + +class MessagePumpIOStarboardTest : public testing::Test { + protected: + MessagePumpIOStarboardTest() + : task_environment_(std::make_unique( + test::SingleThreadTaskEnvironment::MainThreadType::DEFAULT)), + io_thread_("MessagePumpIOStarboardTestIOThread") {} + ~MessagePumpIOStarboardTest() override = default; + + void SetUp() override { + Thread::Options options(MessagePumpType::IO, 0); + ASSERT_TRUE(io_thread_.StartWithOptions(std::move(options))); + socket_ = SbSocketCreate(SbSocketAddressType::kSbSocketAddressTypeIpv4, SbSocketProtocol::kSbSocketProtocolTcp); + SbSocketIsValid(socket_); + } + + void TearDown() override { + // Some tests watch |pipefds_| from the |io_thread_|. The |io_thread_| must + // thus be joined to ensure those watches are complete before closing the + // pipe. + io_thread_.Stop(); + + SbSocketDestroy(socket_); + } + + std::unique_ptr CreateMessagePump() { + return std::make_unique(); + } + + SbSocket socket() { + return socket_; + } + + scoped_refptr io_runner() const { + return io_thread_.task_runner(); + } + + void SimulateIOEvent(MessagePumpIOStarboard::SocketWatcher* controller) { + MessagePumpIOStarboard::OnSocketWaiterNotification(nullptr, + nullptr, + controller, + (kSbSocketWaiterInterestRead | kSbSocketWaiterInterestWrite)); + } + + std::unique_ptr task_environment_; + + private: + Thread io_thread_; + SbSocket socket_; +}; + +namespace { + +// Concrete implementation of MessagePumpIOStarboard::Watcher that does +// nothing useful. +class StupidWatcher : public MessagePumpIOStarboard::Watcher { + public: + ~StupidWatcher() override = default; + + // MessagePumpIOStarboard::Watcher interface + void OnSocketReadyToRead(SbSocket socket) override {} + void OnSocketReadyToWrite(SbSocket socket) override {} +}; + +// Death tests not supported. +TEST_F(MessagePumpIOStarboardTest, DISABLED_QuitOutsideOfRun) { + std::unique_ptr pump = CreateMessagePump(); + ASSERT_DCHECK_DEATH(pump->Quit()); +} + +class BaseWatcher : public MessagePumpIOStarboard::Watcher { + public: + BaseWatcher() = default; + ~BaseWatcher() override = default; + + // MessagePumpIOStarboard::Watcher interface + void OnSocketReadyToRead(SbSocket socket) override { NOTREACHED(); } + void OnSocketReadyToWrite(SbSocket socket) override { NOTREACHED(); } +}; + +class DeleteWatcher : public BaseWatcher { + public: + explicit DeleteWatcher( + std::unique_ptr controller) + : controller_(std::move(controller)) {} + + ~DeleteWatcher() override { DCHECK(!controller_); } + + MessagePumpIOStarboard::SocketWatcher* controller() { + return controller_.get(); + } + + void OnSocketReadyToWrite(SbSocket socket) override { + DCHECK(controller_); + controller_.reset(); + } + + private: + std::unique_ptr controller_; +}; + +// Fails on some platforms. +TEST_F(MessagePumpIOStarboardTest, DISABLED_DeleteWatcher) { + DeleteWatcher delegate( + std::make_unique(FROM_HERE)); + std::unique_ptr pump = CreateMessagePump(); + pump->Watch(socket(), + /*persistent=*/false, + MessagePumpIOStarboard::WATCH_READ_WRITE, + delegate.controller(), + &delegate); + SimulateIOEvent(delegate.controller()); +} + +class StopWatcher : public BaseWatcher { + public: + explicit StopWatcher(MessagePumpIOStarboard::SocketWatcher* controller) + : controller_(controller) {} + + ~StopWatcher() override = default; + + void OnSocketReadyToWrite(SbSocket socket) override { + controller_->StopWatchingSocket(); + } + + private: + raw_ptr controller_ = nullptr; +}; + +// Fails on some platforms. +TEST_F(MessagePumpIOStarboardTest, DISABLED_StopWatcher) { + std::unique_ptr pump = CreateMessagePump(); + MessagePumpIOStarboard::SocketWatcher controller(FROM_HERE); + StopWatcher delegate(&controller); + pump->Watch(socket(), + /*persistent=*/false, + MessagePumpIOStarboard::WATCH_READ_WRITE, + &controller, + &delegate); + SimulateIOEvent(&controller); +} + +void QuitMessageLoopAndStart(OnceClosure quit_closure) { + std::move(quit_closure).Run(); + + RunLoop runloop(RunLoop::Type::kNestableTasksAllowed); + SingleThreadTaskRunner::GetCurrentDefault()->PostTask(FROM_HERE, + runloop.QuitClosure()); + runloop.Run(); +} + +class NestedPumpWatcher : public MessagePumpIOStarboard::Watcher { + public: + NestedPumpWatcher() = default; + ~NestedPumpWatcher() override = default; + + void OnSocketReadyToRead(SbSocket socket) override { + RunLoop runloop; + SingleThreadTaskRunner::GetCurrentDefault()->PostTask( + FROM_HERE, BindOnce(&QuitMessageLoopAndStart, runloop.QuitClosure())); + runloop.Run(); + } + + void OnSocketReadyToWrite(SbSocket socket) override {} +}; + +// Fails on some platforms. +TEST_F(MessagePumpIOStarboardTest, DISABLED_NestedPumpWatcher) { + NestedPumpWatcher delegate; + std::unique_ptr pump = CreateMessagePump(); + MessagePumpIOStarboard::SocketWatcher controller(FROM_HERE); + pump->Watch(socket(), + /*persistent=*/false, + MessagePumpIOStarboard::WATCH_READ, + &controller, + &delegate); + SimulateIOEvent(&controller); +} + +void FatalClosure() { + FAIL() << "Reached fatal closure."; +} + +class QuitWatcher : public BaseWatcher { + public: + QuitWatcher(base::OnceClosure quit_closure) + : quit_closure_(std::move(quit_closure)) {} + + void OnSocketReadyToRead(SbSocket socket) override { + // Post a fatal closure to the MessageLoop before we quit it. + SingleThreadTaskRunner::GetCurrentDefault()->PostTask( + FROM_HERE, BindOnce(&FatalClosure)); + + if (quit_closure_) + std::move(quit_closure_).Run(); + } + + private: + base::OnceClosure quit_closure_; +}; + +void WriteSocketWrapper(MessagePumpIOStarboard* pump, + WaitableEvent* event) { + pump->ScheduleWork(); +} + +// Fails on some platforms. +TEST_F(MessagePumpIOStarboardTest, DISABLED_QuitWatcher) { + // Delete the old TaskEnvironment so that we can manage our own one here. + task_environment_.reset(); + + std::unique_ptr executor_pump = CreateMessagePump(); + MessagePumpIOStarboard* pump = executor_pump.get(); + SingleThreadTaskExecutor executor(std::move(executor_pump)); + RunLoop run_loop; + QuitWatcher delegate(run_loop.QuitClosure()); + MessagePumpIOStarboard::SocketWatcher controller(FROM_HERE); + WaitableEvent event(WaitableEvent::ResetPolicy::AUTOMATIC, + WaitableEvent::InitialState::NOT_SIGNALED); + std::unique_ptr watcher(new WaitableEventWatcher); + + // Tell the pump to watch the pipe. + pump->Watch(socket(), + /*persistent=*/false, + MessagePumpIOStarboard::WATCH_READ, + &controller, + &delegate); + + // Make the IO thread wait for |event| before writing to pipefds[1]. + const char buf = 0; + WaitableEventWatcher::EventCallback write_socket_task = + BindOnce(&WriteSocketWrapper, base::Unretained(pump)); + io_runner()->PostTask( + FROM_HERE, BindOnce(IgnoreResult(&WaitableEventWatcher::StartWatching), + Unretained(watcher.get()), &event, + std::move(write_socket_task), io_runner())); + + // Queue |event| to signal on |sequence_manager|. + SingleThreadTaskRunner::GetCurrentDefault()->PostTask( + FROM_HERE, BindOnce(&WaitableEvent::Signal, Unretained(&event))); + + // Now run the MessageLoop. + run_loop.Run(); + + // StartWatching can move |watcher| to IO thread. Release on IO thread. + io_runner()->PostTask(FROM_HERE, BindOnce(&WaitableEventWatcher::StopWatching, + Owned(watcher.release()))); +} + +} // namespace + +} // namespace base diff --git a/base/message_loop/message_pump_ui_starboard.cc b/base/message_loop/message_pump_ui_starboard.cc index cc74fcd167d..142be66d536 100644 --- a/base/message_loop/message_pump_ui_starboard.cc +++ b/base/message_loop/message_pump_ui_starboard.cc @@ -27,41 +27,68 @@ void CallMessagePumpImmediate(void* context) { DCHECK(context); MessagePumpUIStarboard* pump = reinterpret_cast(context); - pump->RunOneAndReschedule(false /*delayed*/); + pump->CancelImmediate(); + pump->RunUntilIdle(); } void CallMessagePumpDelayed(void* context) { DCHECK(context); MessagePumpUIStarboard* pump = reinterpret_cast(context); - pump->RunOneAndReschedule(true /*delayed*/); + pump->CancelDelayed(); + pump->RunUntilIdle(); } } // namespace -MessagePumpUIStarboard::MessagePumpUIStarboard() : delegate_(NULL) {} +MessagePumpUIStarboard::MessagePumpUIStarboard() : delegate_(nullptr) {} -void MessagePumpUIStarboard::RunOneAndReschedule(bool delayed) { +void MessagePumpUIStarboard::CancelDelayed() { + base::AutoLock auto_lock(outstanding_events_lock_); + CancelDelayedLocked(); +} + +void MessagePumpUIStarboard::CancelImmediate() { + base::AutoLock auto_lock(outstanding_events_lock_); + CancelImmediateLocked(); +} + +void MessagePumpUIStarboard::RunUntilIdle() { DCHECK(delegate_); - if (delayed) { - CancelDelayed(); - } else { - CancelImmediate(); - } +#if !defined(COBALT_BUILD_TYPE_GOLD) + // Abort if this is a QA build to signal that this is unexpected. + CHECK(delegate_); +#endif + + if (should_quit()) + return; - TimeTicks delayed_work_time; for (;;) { - TimeTicks next_time; - if (!RunOne(&next_time)) { - delayed_work_time = next_time; + // Do some work and see if the next task is ready right away. + Delegate::NextWorkInfo next_work_info = delegate_->DoWork(); + bool attempt_more_work = next_work_info.is_immediate(); + + if (should_quit()) break; + + if (attempt_more_work) + continue; + + attempt_more_work = delegate_->DoIdleWork(); + + if (should_quit()) + break; + + if (attempt_more_work) + continue; + + // If there is delayed work. + if (!next_work_info.delayed_run_time.is_max()) { + ScheduleDelayedWork(next_work_info); } - } - if (!delayed_work_time.is_null()) { - Delegate::NextWorkInfo next_work_info; - next_work_info.delayed_run_time = delayed_work_time; - ScheduleDelayedWork(next_work_info); + // Idle. + break; } } @@ -83,111 +110,60 @@ void MessagePumpUIStarboard::Attach(Delegate* delegate) { } void MessagePumpUIStarboard::Quit() { - delegate_ = NULL; + delegate_ = nullptr; CancelAll(); } void MessagePumpUIStarboard::ScheduleWork() { - base::AutoLock auto_lock(outstanding_events_lock_); - if (!outstanding_events_.empty()) { - // No need, already an outstanding event. + // Check if outstanding event already exists. + if (outstanding_event_) return; - } - outstanding_events_.insert( - SbEventSchedule(&CallMessagePumpImmediate, this, 0)); + base::AutoLock auto_lock(outstanding_events_lock_); + outstanding_event_ = + SbEventSchedule(&CallMessagePumpImmediate, this, 0); } void MessagePumpUIStarboard::ScheduleDelayedWork( const Delegate::NextWorkInfo& next_work_info) { - base::TimeDelta delay; - if (!next_work_info.delayed_run_time.is_null()) { - delay = next_work_info.delayed_run_time - base::TimeTicks::Now(); - - if (delay <= base::TimeDelta()) { - delay = base::TimeDelta(); - } + if (next_work_info.is_immediate() || next_work_info.delayed_run_time.is_max()) { + return; } - { - base::AutoLock auto_lock(outstanding_events_lock_); - // Make sure any outstanding delayed event is canceled. - CancelDelayedLocked(); - - outstanding_delayed_events_.insert( - SbEventSchedule(&CallMessagePumpDelayed, this, delay.InMicroseconds())); + TimeDelta delay = next_work_info.remaining_delay(); + if (delay.is_negative()) { + delay = base::TimeDelta(); } -} -void MessagePumpUIStarboard::CancelAll() { base::AutoLock auto_lock(outstanding_events_lock_); - CancelImmediateLocked(); + // Make sure any outstanding delayed event is canceled. CancelDelayedLocked(); + outstanding_delayed_event_ = + SbEventSchedule(&CallMessagePumpDelayed, this, delay.InMicroseconds()); } -void MessagePumpUIStarboard::CancelImmediate() { +void MessagePumpUIStarboard::CancelAll() { base::AutoLock auto_lock(outstanding_events_lock_); CancelImmediateLocked(); -} - -void MessagePumpUIStarboard::CancelDelayed() { - base::AutoLock auto_lock(outstanding_events_lock_); CancelDelayedLocked(); } void MessagePumpUIStarboard::CancelImmediateLocked() { outstanding_events_lock_.AssertAcquired(); - for (SbEventIdSet::iterator it = outstanding_events_.begin(); - it != outstanding_events_.end(); ++it) { - SbEventCancel(*it); - } - outstanding_events_.erase(outstanding_events_.begin(), - outstanding_events_.end()); + if (!outstanding_event_) + return; + + SbEventCancel(*outstanding_event_); + outstanding_event_.reset(); } void MessagePumpUIStarboard::CancelDelayedLocked() { outstanding_events_lock_.AssertAcquired(); - for (SbEventIdSet::iterator it = outstanding_delayed_events_.begin(); - it != outstanding_delayed_events_.end(); ++it) { - SbEventCancel(*it); - } - outstanding_delayed_events_.erase(outstanding_delayed_events_.begin(), - outstanding_delayed_events_.end()); -} - -bool MessagePumpUIStarboard::RunOne(TimeTicks* out_delayed_work_time) { - DCHECK(out_delayed_work_time); - - // We expect to start with a delegate, so we can DCHECK it, but any task we - // run could call Quit and remove it. - DCHECK(delegate_); - if (!delegate_) { -#if !defined(COBALT_BUILD_TYPE_GOLD) - // Abort if this is a QA build to signal that this is unexpected. - CHECK(delegate_); -#endif - // Drop the work if there is no delegate for it. - return false; - } - - // Do immediate work. - Delegate::NextWorkInfo next_work_info = delegate_->DoWork(); - - // If we did work, and we still have a delegate, return true, so we will be - // called again. - if (next_work_info.is_immediate()) { - return !!delegate_; - } - - // If the delegate has been removed, Quit() has been called, so no more work. - if (!delegate_) { - return false; - } + if (!outstanding_delayed_event_) + return; - // No work was done, so only call back if there was idle work done, otherwise - // go to sleep. ScheduleWork or ScheduleDelayedWork will be called if new work - // is scheduled. - return delegate_->DoIdleWork(); + SbEventCancel(*outstanding_delayed_event_); + outstanding_delayed_event_.reset(); } MessagePump::Delegate* MessagePumpForUI::SetDelegate(Delegate* delegate) { diff --git a/base/message_loop/message_pump_ui_starboard.h b/base/message_loop/message_pump_ui_starboard.h index 6c0123453fe..a60b11c0547 100644 --- a/base/message_loop/message_pump_ui_starboard.h +++ b/base/message_loop/message_pump_ui_starboard.h @@ -22,6 +22,7 @@ #include "base/synchronization/lock.h" #include "base/time/time.h" #include "starboard/event.h" +#include "third_party/abseil-cpp/absl/types/optional.h" namespace base { @@ -41,9 +42,18 @@ class BASE_EXPORT MessagePumpUIStarboard : public MessagePump { MessagePumpUIStarboard(); virtual ~MessagePumpUIStarboard() { Quit(); } - // Runs one iteration of the run loop, and reschedules another call, if + MessagePumpUIStarboard(const MessagePumpUIStarboard&) = delete; + MessagePumpUIStarboard& operator=(const MessagePumpUIStarboard&) = delete; + + // Cancels delayed schedule callback events. + void CancelDelayed(); + + // Cancels immediate schedule callback events. + void CancelImmediate(); + + // Runs one iteration of the run loop, and schedules another iteration if // necessary. - void RunOneAndReschedule(bool delayed); + void RunUntilIdle(); // --- MessagePump Implementation --- @@ -64,21 +74,14 @@ class BASE_EXPORT MessagePumpUIStarboard : public MessagePump { // Cancels all outstanding scheduled callback events, if any. void CancelAll(); - // Cancels immediate schedule callback events. - void CancelImmediate(); - - // Cancels delayed schedule callback events. - void CancelDelayed(); - // Cancel workhorse that assumes |outstanding_events_lock_| is locked. void CancelImmediateLocked(); // Cancel delayed workhorse that assumes |outstanding_events_lock_| is locked. void CancelDelayedLocked(); - // Runs one iteration of the run loop, returning whether to schedule another - // iteration or not. Places the delay, if any, in |out_delayed_work_time|. - bool RunOne(base::TimeTicks* out_delayed_work_time); + // If the delegate has been removed, Quit() has been called. + bool should_quit() const { return delegate_ == nullptr; } // The MessagePump::Delegate configured in Start(). Delegate* delegate_; @@ -86,16 +89,11 @@ class BASE_EXPORT MessagePumpUIStarboard : public MessagePump { // Lock protecting outstanding scheduled callback events. base::Lock outstanding_events_lock_; - // A set of scheduled callback event IDs. - typedef std::set SbEventIdSet; - // The set of outstanding scheduled callback events for immediate work. - SbEventIdSet outstanding_events_; + absl::optional outstanding_event_; // The set of outstanding scheduled callback events for delayed work. - SbEventIdSet outstanding_delayed_events_; - - // DISALLOW_COPY_AND_ASSIGN(MessagePumpUIStarboard); + absl::optional outstanding_delayed_event_; }; using MessagePumpForUI = MessagePumpUIStarboard; diff --git a/base/synchronization/waitable_event_watcher_starboard.cc b/base/synchronization/waitable_event_watcher_starboard.cc new file mode 100644 index 00000000000..518f9296633 --- /dev/null +++ b/base/synchronization/waitable_event_watcher_starboard.cc @@ -0,0 +1,16 @@ +// Copyright 2024 Google Inc. All Rights Reserved. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +// We use the same code as the POSIX version. +#include "base/synchronization/waitable_event_watcher_posix.cc" diff --git a/cobalt/black_box_tests/black_box_tests.py b/cobalt/black_box_tests/black_box_tests.py index f5ba4be4ace..5e2843e81fd 100755 --- a/cobalt/black_box_tests/black_box_tests.py +++ b/cobalt/black_box_tests/black_box_tests.py @@ -88,7 +88,7 @@ 'service_worker_test', 'service_worker_persist_test', 'soft_mic_platform_service_test', - # 'telemetry_test', + 'telemetry_test', 'text_encoding_test', 'wasm_basic_test', 'web_debugger', diff --git a/cobalt/black_box_tests/testdata/telemetry_test.html b/cobalt/black_box_tests/testdata/telemetry_test.html index 01ccf90bd20..801245512e2 100644 --- a/cobalt/black_box_tests/testdata/telemetry_test.html +++ b/cobalt/black_box_tests/testdata/telemetry_test.html @@ -16,57 +16,65 @@ --> - - Cobalt Telemetry Test - + Cobalt Telemetry Test + - - + setupFinished(); +}); + - diff --git a/cobalt/browser/metrics/cobalt_metrics_services_manager.cc b/cobalt/browser/metrics/cobalt_metrics_services_manager.cc index f66d3e583a1..893c2252fc8 100644 --- a/cobalt/browser/metrics/cobalt_metrics_services_manager.cc +++ b/cobalt/browser/metrics/cobalt_metrics_services_manager.cc @@ -15,9 +15,11 @@ #include "cobalt/browser/metrics/cobalt_metrics_services_manager.h" #include +#include #include "base/logging.h" #include "cobalt/base/event_dispatcher.h" +#include "cobalt/base/task_runner_util.h" #include "cobalt/browser/metrics/cobalt_metrics_service_client.h" #include "cobalt/browser/metrics/cobalt_metrics_services_manager_client.h" #include "components/metrics_services_manager/metrics_services_manager.h" @@ -33,7 +35,6 @@ CobaltMetricsServicesManager::CobaltMetricsServicesManager() metrics_services_manager::MetricsServicesManager( std::make_unique()) {} - // Static Singleton getter for metrics services manager. CobaltMetricsServicesManager* CobaltMetricsServicesManager::GetInstance() { if (instance_ == nullptr) { @@ -65,12 +66,22 @@ void CobaltMetricsServicesManager::SetEventDispatcherInternal( client->SetEventDispatcher(event_dispatcher); } -void CobaltMetricsServicesManager::ToggleMetricsEnabled(bool is_enabled) { - instance_->task_runner_->PostTask( - FROM_HERE, - base::Bind(&CobaltMetricsServicesManager::ToggleMetricsEnabledInternal, - base::Unretained(instance_), is_enabled)); +void CobaltMetricsServicesManager::ToggleMetricsEnabled( + bool is_enabled, base::OnceClosure done_callback) { + if (done_callback) { + instance_->task_runner_->PostTaskAndReply( + FROM_HERE, + base::Bind(&CobaltMetricsServicesManager::ToggleMetricsEnabledInternal, + base::Unretained(instance_), is_enabled), + std::move(done_callback)); + } else { + instance_->task_runner_->PostTask( + FROM_HERE, + base::Bind(&CobaltMetricsServicesManager::ToggleMetricsEnabledInternal, + base::Unretained(instance_), is_enabled)); + } } + void CobaltMetricsServicesManager::ToggleMetricsEnabledInternal( bool is_enabled) { CobaltMetricsServicesManagerClient* client = @@ -102,7 +113,6 @@ void CobaltMetricsServicesManager::SetUploadIntervalInternal( << interval_seconds; } - } // namespace metrics } // namespace browser } // namespace cobalt diff --git a/cobalt/browser/metrics/cobalt_metrics_services_manager.h b/cobalt/browser/metrics/cobalt_metrics_services_manager.h index 0a9e4c630cc..5ae64819174 100644 --- a/cobalt/browser/metrics/cobalt_metrics_services_manager.h +++ b/cobalt/browser/metrics/cobalt_metrics_services_manager.h @@ -18,7 +18,7 @@ #include -#include "base//memory/scoped_refptr.h" +#include "base/memory/scoped_refptr.h" #include "base/single_thread_task_runner.h" #include "cobalt/base/event_dispatcher.h" #include "cobalt/browser/metrics/cobalt_metrics_services_manager_client.h" @@ -60,7 +60,8 @@ class CobaltMetricsServicesManager // Toggles whether metric reporting is enabled via // CobaltMetricsServicesManager. - static void ToggleMetricsEnabled(bool is_enabled); + static void ToggleMetricsEnabled( + bool is_enabled, base::OnceClosure done_callback = base::OnceClosure()); // Sets the upload interval for metrics reporting. That is, how often are // metrics snapshotted and attempted to upload. diff --git a/cobalt/h5vcc/h5vcc_metrics.cc b/cobalt/h5vcc/h5vcc_metrics.cc index be02577846e..0b22b176983 100644 --- a/cobalt/h5vcc/h5vcc_metrics.cc +++ b/cobalt/h5vcc/h5vcc_metrics.cc @@ -15,6 +15,7 @@ #include "cobalt/h5vcc/h5vcc_metrics.h" #include +#include #include "base/values.h" #include "cobalt/base/event.h" @@ -24,6 +25,7 @@ #include "cobalt/browser/metrics/cobalt_metrics_services_manager.h" #include "cobalt/h5vcc/h5vcc_metric_type.h" #include "cobalt/h5vcc/metric_event_handler_wrapper.h" +#include "cobalt/web/environment_settings_helper.h" namespace cobalt { namespace h5vcc { @@ -79,16 +81,49 @@ void H5vccMetrics::RunEventHandlerInternal( } } -void H5vccMetrics::Enable() { ToggleMetricsEnabled(true); } +script::HandlePromiseVoid H5vccMetrics::Enable( + script::EnvironmentSettings* environment_settings) { + auto* global_wrappable = web::get_global_wrappable(environment_settings); + script::HandlePromiseVoid promise = + web::get_script_value_factory(environment_settings) + ->CreateBasicPromise(); + auto promise_reference = + std::make_unique(global_wrappable, + promise); + ToggleMetricsEnabled( + true, + base::BindOnce( + [](std::unique_ptr + promise_reference) { promise_reference->value().Resolve(); }, + std::move(promise_reference))); + return promise; +} -void H5vccMetrics::Disable() { ToggleMetricsEnabled(false); } +script::HandlePromiseVoid H5vccMetrics::Disable( + script::EnvironmentSettings* environment_settings) { + auto* global_wrappable = web::get_global_wrappable(environment_settings); + script::HandlePromiseVoid promise = + web::get_script_value_factory(environment_settings) + ->CreateBasicPromise(); + auto promise_reference = + std::make_unique(global_wrappable, + promise); + ToggleMetricsEnabled( + false, + base::BindOnce( + [](std::unique_ptr + promise_reference) { promise_reference->value().Resolve(); }, + std::move(promise_reference))); + return promise; +} -void H5vccMetrics::ToggleMetricsEnabled(bool is_enabled) { +void H5vccMetrics::ToggleMetricsEnabled(bool is_enabled, + base::OnceClosure done_callback) { persistent_settings_->SetPersistentSetting( browser::metrics::kMetricEnabledSettingName, std::make_unique(is_enabled)); browser::metrics::CobaltMetricsServicesManager::GetInstance() - ->ToggleMetricsEnabled(is_enabled); + ->ToggleMetricsEnabled(is_enabled, std::move(done_callback)); } bool H5vccMetrics::IsEnabled() { diff --git a/cobalt/h5vcc/h5vcc_metrics.h b/cobalt/h5vcc/h5vcc_metrics.h index 351a06d7023..62694741d06 100644 --- a/cobalt/h5vcc/h5vcc_metrics.h +++ b/cobalt/h5vcc/h5vcc_metrics.h @@ -26,7 +26,9 @@ #include "cobalt/h5vcc/metric_event_handler_wrapper.h" #include "cobalt/persistent_storage/persistent_settings.h" #include "cobalt/script/callback_function.h" +#include "cobalt/script/environment_settings.h" #include "cobalt/script/script_value.h" +#include "cobalt/script/script_value_factory.h" #include "cobalt/script/wrappable.h" @@ -58,10 +60,12 @@ class H5vccMetrics : public script::Wrappable { const MetricEventHandlerWrapper::ScriptValue& event_handler); // Enable Cobalt metrics logging. - void Enable(); + script::HandlePromiseVoid Enable( + script::EnvironmentSettings* environment_settings); // Disable Cobalt metrics logging. - void Disable(); + script::HandlePromiseVoid Disable( + script::EnvironmentSettings* environment_settings); // Returns current enabled state of metrics logging/reporting. bool IsEnabled(); @@ -74,7 +78,8 @@ class H5vccMetrics : public script::Wrappable { private: // Internal convenience method for toggling enabled/disabled state. - void ToggleMetricsEnabled(bool is_enabled); + void ToggleMetricsEnabled( + bool is_enabled, base::OnceClosure done_callback = base::OnceClosure()); void RunEventHandler(const cobalt::h5vcc::H5vccMetricType& metric_type, const std::string& serialized_proto); diff --git a/cobalt/h5vcc/h5vcc_metrics.idl b/cobalt/h5vcc/h5vcc_metrics.idl index 1d7292c4dbd..60625c2dfb7 100644 --- a/cobalt/h5vcc/h5vcc_metrics.idl +++ b/cobalt/h5vcc/h5vcc_metrics.idl @@ -38,11 +38,11 @@ interface H5vccMetrics { // are persistent or "sticky". That is, you only have to call them once // and that setting will persist through multiple app lifecycles until the // enable/disable APIs are explicitly called again. - void enable(); + [CallWith=EnvironmentSettings, NewObject] Promise enable(); // Disable Cobalt metrics logging. If disabled, the metric event handler // should never get called afterward. - void disable(); + [CallWith=EnvironmentSettings, NewObject] Promise disable(); // Returns the current enabled state of metrics reporting. Note, the enable() // and disable() APIs are asynchronous under the hood. This means if you call diff --git a/components/metrics/metrics_upload_scheduler.cc b/components/metrics/metrics_upload_scheduler.cc index b7b7364452a..5992eadf0c5 100644 --- a/components/metrics/metrics_upload_scheduler.cc +++ b/components/metrics/metrics_upload_scheduler.cc @@ -61,7 +61,6 @@ base::TimeDelta GetInitialBackoffInterval() { MetricsUploadScheduler::MetricsUploadScheduler( const base::Closure& upload_callback) : MetricsScheduler(upload_callback), - unsent_logs_interval_(GetUnsentLogsInterval()), initial_backoff_interval_(GetInitialBackoffInterval()), backoff_interval_(initial_backoff_interval_) {} @@ -76,13 +75,13 @@ void MetricsUploadScheduler::UploadFinished(bool server_is_healthy) { backoff_interval_ = BackOffUploadInterval(backoff_interval_); } else { backoff_interval_ = initial_backoff_interval_; - TaskDone(unsent_logs_interval_); + TaskDone(GetUnsentLogsInterval()); } } void MetricsUploadScheduler::StopAndUploadCancelled() { Stop(); - TaskDone(unsent_logs_interval_); + TaskDone(GetUnsentLogsInterval()); } void MetricsUploadScheduler::UploadOverDataUsageCap() { diff --git a/components/metrics/metrics_upload_scheduler.h b/components/metrics/metrics_upload_scheduler.h index a97b7cb1f1a..f04a76ec1be 100644 --- a/components/metrics/metrics_upload_scheduler.h +++ b/components/metrics/metrics_upload_scheduler.h @@ -34,9 +34,6 @@ class MetricsUploadScheduler : public MetricsScheduler { void UploadOverDataUsageCap(); private: - // Time to wait between uploads on success. - const base::TimeDelta unsent_logs_interval_; - // Initial time to wait between upload retry attempts. const base::TimeDelta initial_backoff_interval_; diff --git a/net/socket/tcp_socket_starboard.cc b/net/socket/tcp_socket_starboard.cc index c26489c68b3..e87f81520d6 100644 --- a/net/socket/tcp_socket_starboard.cc +++ b/net/socket/tcp_socket_starboard.cc @@ -34,6 +34,7 @@ TCPSocketStarboard::TCPSocketStarboard( const NetLogSource& source) : socket_performance_watcher_(std::move(socket_performance_watcher)), socket_(kSbSocketInvalid), + socket_watcher_(FROM_HERE), family_(ADDRESS_FAMILY_UNSPECIFIED), logging_multiple_connect_attempts_(false), net_log_(NetLogWithSource::Make(net_log, NetLogSourceType::SOCKET)), diff --git a/net/socket/udp_socket_starboard.cc b/net/socket/udp_socket_starboard.cc index 373c69cd597..7e51b873276 100644 --- a/net/socket/udp_socket_starboard.cc +++ b/net/socket/udp_socket_starboard.cc @@ -43,6 +43,7 @@ UDPSocketStarboard::UDPSocketStarboard(DatagramSocket::BindType bind_type, socket_(kSbSocketInvalid), socket_options_(0), bind_type_(bind_type), + socket_watcher_(FROM_HERE), read_buf_len_(0), recv_from_address_(NULL), write_buf_len_(0),