Skip to content

Commit

Permalink
rgw/notification: add rgw notification specific debug log subsystem.
Browse files Browse the repository at this point in the history
decorate the events with event details while logging.

Signed-off-by: kchheda3 <kchheda3@bloomberg.net>
  • Loading branch information
kchheda3 committed Mar 19, 2024
1 parent 395ff95 commit 4299679
Show file tree
Hide file tree
Showing 9 changed files with 58 additions and 36 deletions.
2 changes: 2 additions & 0 deletions doc/rados/troubleshooting/log-and-debug.rst
Original file line number Diff line number Diff line change
Expand Up @@ -302,6 +302,8 @@ to their default level or to a level suitable for normal operations.
+--------------------------+-----------+--------------+
| ``rgw lifecycle`` | 1 | 5 |
+--------------------------+-----------+--------------+
| ``rgw notification`` | 1 | 5 |
+--------------------------+-----------+--------------+
| ``javaclient`` | 1 | 5 |
+--------------------------+-----------+--------------+
| ``asok`` | 1 | 5 |
Expand Down
1 change: 1 addition & 0 deletions qa/suites/crimson-rados/perf/settings/optimized.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ overrides:
debug rgw: "0/0"
debug rgw sync: "0/0"
debug rgw lifecycle: "0/0"
debug rgw notification: "0/0"
debug civetweb: "0/0"
debug javaclient: "0/0"
debug asok: "0/0"
Expand Down
1 change: 1 addition & 0 deletions qa/suites/perf-basic/settings/optimized.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ overrides:
debug rgw: "0/0"
debug rgw sync: "0/0"
debug rgw lifecycle: "0/0"
debug rgw notification: "0/0"
debug civetweb: "0/0"
debug javaclient: "0/0"
debug asok: "0/0"
Expand Down
1 change: 1 addition & 0 deletions qa/suites/rados/perf/settings/optimized.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ overrides:
debug rgw: "0/0"
debug rgw sync: "0/0"
debug rgw lifecycle: "0/0"
debug rgw notification: "0/0"
debug civetweb: "0/0"
debug javaclient: "0/0"
debug asok: "0/0"
Expand Down
1 change: 1 addition & 0 deletions qa/suites/rgw/multisite/overrides.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ overrides:
debug rgw: 20
debug rgw sync: 20
debug rgw lifecycle: 20
debug rgw notification: 20
rgw crypt s3 kms backend: testing
rgw crypt s3 kms encryption keys: testkey-1=YmluCmJvb3N0CmJvb3N0LWJ1aWxkCmNlcGguY29uZgo=
rgw crypt require ssl: false
Expand Down
1 change: 1 addition & 0 deletions src/common/subsys.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ SUBSYS(rgw_access, 1, 5)
SUBSYS(rgw_dbstore, 1, 5)
SUBSYS(rgw_flight, 1, 5)
SUBSYS(rgw_lifecycle, 1, 5)
SUBSYS(rgw_notification, 1, 5)
SUBSYS(javaclient, 1, 5)
SUBSYS(asok, 1, 5)
SUBSYS(throttle, 1, 1)
Expand Down
83 changes: 49 additions & 34 deletions src/rgw/driver/rados/rgw_notify.cc
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
#include "common/dout.h"
#include <chrono>

#define dout_subsys ceph_subsys_rgw
#define dout_subsys ceph_subsys_rgw_notification

namespace rgw::notify {

Expand Down Expand Up @@ -66,6 +66,16 @@ struct event_entry_t {
};
WRITE_CLASS_ENCODER(event_entry_t)

static inline std::ostream& operator<<(std::ostream& out,
const event_entry_t& e) {
return out << "notification id: '" << e.event.configurationId
<< "', topic: '" << e.arn_topic
<< "', endpoint: '" << e.push_endpoint
<< "', bucket_owner: '" << e.event.bucket_ownerIdentity
<< "', bucket: '" << e.event.bucket_name
<< "', object: '" << e.event.object_key
<< "', event type: '" << e.event.eventName << "'";
}

struct persistency_tracker {
ceph::coarse_real_time last_retry_time {ceph::coarse_real_clock::zero()};
Expand Down Expand Up @@ -244,15 +254,12 @@ class Manager : public DoutPrefixProvider {
if ( (topic_persistency_ttl != 0 && event_entry.creation_time != ceph::coarse_real_clock::zero() &&
time_now - event_entry.creation_time > std::chrono::seconds(topic_persistency_ttl))
|| ( topic_persistency_max_retries != 0 && entry_persistency_tracker.retires_num > topic_persistency_max_retries) ) {
ldpp_dout(this, 1) << "Expiring entry for topic= "
<< event_entry.arn_topic << " bucket_owner= "
<< event_entry.event.bucket_ownerIdentity
<< " bucket= " << event_entry.event.bucket_name
<< " object_name= " << event_entry.event.object_key
<< " entry retry_number="
ldpp_dout(this, 1) << "WARNING: Expiring entry marker: " << entry.marker
<< " for event with " << event_entry
<< " entry retry_number: "
<< entry_persistency_tracker.retires_num
<< " creation_time=" << event_entry.creation_time
<< " time_now=" << time_now << dendl;
<< " creation_time: " << event_entry.creation_time
<< " time_now: " << time_now << dendl;
return EntryProcessingResult::Expired;
}
if (time_now - entry_persistency_tracker.last_retry_time < std::chrono::seconds(topic_persistency_sleep_duration) ) {
Expand All @@ -261,7 +268,10 @@ class Manager : public DoutPrefixProvider {

++entry_persistency_tracker.retires_num;
entry_persistency_tracker.last_retry_time = time_now;
ldpp_dout(this, 20) << "Processing entry retry_number=" << entry_persistency_tracker.retires_num << " time=" << dendl;
ldpp_dout(this, 20) << "Processing event entry with " << event_entry
<< " retry_number: "
<< entry_persistency_tracker.retires_num
<< " current time: " << time_now << dendl;
try {
// TODO move endpoint creation to queue level
const auto push_endpoint = RGWPubSubEndpoint::create(event_entry.push_endpoint, event_entry.arn_topic,
Expand All @@ -271,12 +281,14 @@ class Manager : public DoutPrefixProvider {
" for entry: " << entry.marker << dendl;
const auto ret = push_endpoint->send_to_completion_async(cct, event_entry.event, optional_yield(io_context, yield));
if (ret < 0) {
ldpp_dout(this, 5) << "WARNING: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
<< " failed. error: " << ret << " (will retry)" << dendl;
ldpp_dout(this, 5) << "WARNING: push entry marker: " << entry.marker
<< " failed. error: " << ret
<< " (will retry) for event with " << event_entry
<< dendl;
return EntryProcessingResult::Failure;
} else {
ldpp_dout(this, 20) << "INFO: push entry: " << entry.marker << " to endpoint: " << event_entry.push_endpoint
<< " ok" << dendl;
ldpp_dout(this, 5) << "INFO: push entry marker: " << entry.marker
<< " ok for event with " << event_entry << dendl;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
return EntryProcessingResult::Successful;
}
Expand Down Expand Up @@ -304,12 +316,15 @@ class Manager : public DoutPrefixProvider {
auto ret = rgw_rados_operate(this, rados_store.getRados()->get_notif_pool_ctx(), queue_name, &op, optional_yield(io_context, yield));
if (ret == -ENOENT) {
// queue was deleted
ldpp_dout(this, 5) << "INFO: queue: "
<< queue_name << ". was removed. cleanup will stop" << dendl;
ldpp_dout(this, 10) << "INFO: queue: " << queue_name
<< ". was removed. cleanup will stop" << dendl;
return;
}
if (ret == -EBUSY) {
ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
ldpp_dout(this, 10)
<< "WARNING: queue: " << queue_name
<< " ownership moved to another daemon. processing will stop"
<< dendl;
return;
}
if (ret < 0) {
Expand Down Expand Up @@ -367,13 +382,16 @@ class Manager : public DoutPrefixProvider {
if (ret == -ENOENT) {
// queue was deleted
topics_persistency_tracker.erase(queue_name);
ldpp_dout(this, 5) << "INFO: queue: "
<< queue_name << ". was removed. processing will stop" << dendl;
ldpp_dout(this, 10) << "INFO: queue: " << queue_name
<< ". was removed. processing will stop" << dendl;
return;
}
if (ret == -EBUSY) {
topics_persistency_tracker.erase(queue_name);
ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
ldpp_dout(this, 10)
<< "WARNING: queue: " << queue_name
<< " ownership moved to another daemon. processing will stop"
<< dendl;
return;
}
if (ret < 0) {
Expand Down Expand Up @@ -468,11 +486,15 @@ class Manager : public DoutPrefixProvider {
auto ret = rgw_rados_operate(this, rados_ioctx, queue_name, &op, optional_yield(io_context, yield));
if (ret == -ENOENT) {
// queue was deleted
ldpp_dout(this, 5) << "INFO: queue: " << queue_name << ". was removed. processing will stop" << dendl;
ldpp_dout(this, 10) << "INFO: queue: " << queue_name
<< ". was removed. processing will stop" << dendl;
return;
}
if (ret == -EBUSY) {
ldpp_dout(this, 5) << "WARNING: queue: " << queue_name << " ownership moved to another daemon. processing will stop" << dendl;
ldpp_dout(this, 10)
<< "WARNING: queue: " << queue_name
<< " ownership moved to another daemon. processing will stop"
<< dendl;
return;
}
if (ret < 0) {
Expand Down Expand Up @@ -1133,23 +1155,16 @@ int publish_commit(rgw::sal::Object* obj,
dpp->get_cct(), event_entry.event, res.yield);
if (ret < 0) {
ldpp_dout(dpp, 1)
<< "ERROR: push to endpoint " << topic.cfg.dest.push_endpoint
<< " bucket: " << event_entry.event.bucket_name
<< " bucket_owner: " << event_entry.event.bucket_ownerIdentity
<< " object_name: " << event_entry.event.object_key
<< " failed. error: " << ret << dendl;
<< "ERROR: failed to push sync notification event with error: "
<< ret << " for event with " << event_entry << dendl;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
return ret;
}
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_ok);
} catch (const RGWPubSubEndpoint::configuration_error& e) {
ldpp_dout(dpp, 1) << "ERROR: failed to create push endpoint: "
<< topic.cfg.dest.push_endpoint
<< " bucket: " << event_entry.event.bucket_name
<< " bucket_owner: "
<< event_entry.event.bucket_ownerIdentity
<< " object_name: " << event_entry.event.object_key
<< ". error: " << e.what() << dendl;
ldpp_dout(dpp, 1) << "ERROR: failed to create push endpoint for sync "
"notification event with error: "
<< e.what() << " event with " << event_entry << dendl;
if (perfcounter) perfcounter->inc(l_rgw_pubsub_push_failed);
return -EINVAL;
}
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_amqp.cc
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include "common/dout.h"
#include <openssl/ssl.h>

#define dout_subsys ceph_subsys_rgw
#define dout_subsys ceph_subsys_rgw_notification

// TODO investigation, not necessarily issues:
// (1) in case of single threaded writer context use spsc_queue
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_kafka.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
#include <boost/lockfree/queue.hpp>
#include "common/dout.h"

#define dout_subsys ceph_subsys_rgw
#define dout_subsys ceph_subsys_rgw_notification

// TODO investigation, not necessarily issues:
// (1) in case of single threaded writer context use spsc_queue
Expand Down

0 comments on commit 4299679

Please sign in to comment.