Skip to content

Commit

Permalink
rgw/pubsub: allow pubsub REST API on master
Browse files Browse the repository at this point in the history
Signed-off-by: Yuval Lifshitz <yuvalif@yahoo.com>
  • Loading branch information
yuvalif committed Sep 10, 2019
1 parent f7cef51 commit a143534
Show file tree
Hide file tree
Showing 31 changed files with 1,667 additions and 1,007 deletions.
2 changes: 1 addition & 1 deletion src/common/options.cc
Expand Up @@ -5716,7 +5716,7 @@ std::vector<Option> get_rgw_options() {
"will be located in the path that is specified here. "),

Option("rgw_enable_apis", Option::TYPE_STR, Option::LEVEL_ADVANCED)
.set_default("s3, s3website, swift, swift_auth, admin, sts, iam")
.set_default("s3, s3website, swift, swift_auth, admin, sts, iam, pubsub")
.set_description("A list of set of RESTful APIs that rgw handles."),

Option("rgw_cache_enabled", Option::TYPE_BOOL, Option::LEVEL_ADVANCED)
Expand Down
8 changes: 7 additions & 1 deletion src/mrun
Expand Up @@ -21,4 +21,10 @@ fi

shift 2

$CEPH_BIN/$command -c $CEPH_CONF_PATH/ceph.conf "$@"
if [ "$RGW_VALGRIND" == 'yes' ] && [ $command == 'radosgw' ]; then
valgrind --trace-children=yes --tool=memcheck --max-threads=1024 $CEPH_BIN/$command -c $CEPH_CONF_PATH/ceph.conf "$@"
sleep 10
else
$CEPH_BIN/$command -c $CEPH_CONF_PATH/ceph.conf "$@"
fi

2 changes: 2 additions & 0 deletions src/rgw/CMakeLists.txt
Expand Up @@ -113,6 +113,8 @@ set(librgw_common_srcs
rgw_rest_conn.cc
rgw_rest_log.cc
rgw_rest_metadata.cc
rgw_rest_pubsub.cc
rgw_rest_pubsub_common.cc
rgw_rest_realm.cc
rgw_rest_role.cc
rgw_rest_s3.cc
Expand Down
12 changes: 8 additions & 4 deletions src/rgw/rgw_basic_types.cc
Expand Up @@ -6,21 +6,25 @@
#include <string>

#include "rgw_basic_types.h"
#include "rgw_xml.h"
#include "common/ceph_json.h"

using std::string;
using std::stringstream;

void decode_json_obj(rgw_user& val, JSONObj *obj)
{
string s = obj->get_data();
val.from_str(s);
val.from_str(obj->get_data());
}

void encode_json(const char *name, const rgw_user& val, Formatter *f)
{
string s = val.to_str();
f->dump_string(name, s);
f->dump_string(name, val.to_str());
}

void encode_xml(const char *name, const rgw_user& val, Formatter *f)
{
encode_xml(name, val.to_str(), f);
}

namespace rgw {
Expand Down
4 changes: 2 additions & 2 deletions src/rgw/rgw_basic_types.h
Expand Up @@ -13,8 +13,7 @@ struct rgw_user {
std::string id;

rgw_user() {}
// cppcheck-suppress noExplicitConstructor
rgw_user(const std::string& s) {
explicit rgw_user(const std::string& s) {
from_str(s);
}
rgw_user(const std::string& tenant, const std::string& id)
Expand Down Expand Up @@ -203,6 +202,7 @@ class JSONObj;

void decode_json_obj(rgw_user& val, JSONObj *obj);
void encode_json(const char *name, const rgw_user& val, Formatter *f);
void encode_xml(const char *name, const rgw_user& val, Formatter *f);

inline ostream& operator<<(ostream& out, const rgw_user &u) {
string s;
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_bucket.cc
Expand Up @@ -1389,7 +1389,7 @@ int RGWBucketAdminOp::limit_check(RGWRadosStore *store,
do {
RGWUserBuckets buckets;

ret = rgw_read_user_buckets(store, user_id, buckets,
ret = rgw_read_user_buckets(store, rgw_user(user_id), buckets,
marker, string(), max_entries, false,
&is_truncated);
if (ret < 0)
Expand Down
6 changes: 3 additions & 3 deletions src/rgw/rgw_cr_rados.cc
Expand Up @@ -589,7 +589,7 @@ int RGWAsyncFetchRemoteObj::_send_request()

std::optional<uint64_t> bytes_transferred;
int r = store->getRados()->fetch_remote_obj(obj_ctx,
user_id,
rgw_user(user_id),
NULL, /* req_info */
source_zone,
dest_obj,
Expand Down Expand Up @@ -646,7 +646,7 @@ int RGWAsyncStatRemoteObj::_send_request()
rgw_obj dest_obj(src_obj);

int r = store->getRados()->stat_remote_obj(obj_ctx,
user_id,
rgw_user(user_id),
nullptr, /* req_info */
source_zone,
src_obj,
Expand Down Expand Up @@ -722,7 +722,7 @@ int RGWAsyncRemoveObj::_send_request()
}
del_op.params.olh_epoch = versioned_epoch;
del_op.params.marker_version_id = marker_version_id;
del_op.params.obj_owner.set_id(owner);
del_op.params.obj_owner.set_id(rgw_user(owner));
del_op.params.obj_owner.set_name(owner_display_name);
del_op.params.mtime = timestamp;
del_op.params.high_precision_time = true;
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_file.h
Expand Up @@ -992,7 +992,7 @@ namespace rgw {
}
if (token.valid() && (ldh->auth(token.id, token.key) == 0)) {
/* try to store user if it doesn't already exist */
if (store->ctl()->user->get_info_by_uid(token.id, &user, null_yield) < 0) {
if (store->ctl()->user->get_info_by_uid(rgw_user(token.id), &user, null_yield) < 0) {
int ret = store->ctl()->user->store_info(user, null_yield,
RGWUserCtl::PutParams()
.set_exclusive(true));
Expand Down
3 changes: 2 additions & 1 deletion src/rgw/rgw_main.cc
Expand Up @@ -353,12 +353,13 @@ int main(int argc, const char **argv)
const bool s3website_enabled = apis_map.count("s3website") > 0;
const bool sts_enabled = apis_map.count("sts") > 0;
const bool iam_enabled = apis_map.count("iam") > 0;
const bool pubsub_enabled = apis_map.count("pubsub") > 0;
// Swift API entrypoint could placed in the root instead of S3
const bool swift_at_root = g_conf()->rgw_swift_url_prefix == "/";
if (apis_map.count("s3") > 0 || s3website_enabled) {
if (! swift_at_root) {
rest.register_default_mgr(set_logging(rest_filter(store->getRados(), RGW_REST_S3,
new RGWRESTMgr_S3(s3website_enabled, sts_enabled, iam_enabled))));
new RGWRESTMgr_S3(s3website_enabled, sts_enabled, iam_enabled, pubsub_enabled))));
} else {
derr << "Cannot have the S3 or S3 Website enabled together with "
<< "Swift API placed in the root of hierarchy" << dendl;
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_op.cc
Expand Up @@ -2887,7 +2887,7 @@ static int forward_request_to_master(struct req_state *s, obj_version *objv,
bufferlist response;
string uid_str = s->user->user_id.to_str();
#define MAX_REST_RESPONSE (128 * 1024) // we expect a very small response
int ret = store->svc()->zone->get_master_conn()->forward(uid_str, (forward_info ? *forward_info : s->info),
int ret = store->svc()->zone->get_master_conn()->forward(rgw_user(uid_str), (forward_info ? *forward_info : s->info),
objv, MAX_REST_RESPONSE, &in_data, &response);
if (ret < 0)
return ret;
Expand Down
22 changes: 22 additions & 0 deletions src/rgw/rgw_pubsub.cc
Expand Up @@ -116,6 +116,14 @@ void rgw_pubsub_topic::dump(Formatter *f) const
encode_json("arn", arn, f);
}

void rgw_pubsub_topic::dump_xml(Formatter *f) const
{
encode_xml("User", user, f);
encode_xml("Name", name, f);
encode_xml("EndPoint", dest, f);
encode_xml("TopicArn", arn, f);
}

void rgw_pubsub_topic_filter::dump(Formatter *f) const
{
encode_json("topic", topic, f);
Expand Down Expand Up @@ -144,6 +152,13 @@ void rgw_pubsub_user_topics::dump(Formatter *f) const
}
}

void rgw_pubsub_user_topics::dump_xml(Formatter *f) const
{
for (auto& t : topics) {
encode_xml("member", t.second.topic, f);
}
}

void rgw_pubsub_sub_dest::dump(Formatter *f) const
{
encode_json("bucket_name", bucket_name, f);
Expand All @@ -153,6 +168,13 @@ void rgw_pubsub_sub_dest::dump(Formatter *f) const
encode_json("arn_topic", arn_topic, f);
}

void rgw_pubsub_sub_dest::dump_xml(Formatter *f) const
{
encode_xml("EndpointAddress", push_endpoint, f);
encode_xml("EndpointArgs", push_endpoint_args, f);
encode_xml("TopicArn", arn_topic, f);
}

void rgw_pubsub_sub_config::dump(Formatter *f) const
{
encode_json("user", user, f);
Expand Down
3 changes: 3 additions & 0 deletions src/rgw/rgw_pubsub.h
Expand Up @@ -257,6 +257,7 @@ struct rgw_pubsub_sub_dest {
}

void dump(Formatter *f) const;
void dump_xml(Formatter *f) const;
};
WRITE_CLASS_ENCODER(rgw_pubsub_sub_dest)

Expand Down Expand Up @@ -325,6 +326,7 @@ struct rgw_pubsub_topic {
}

void dump(Formatter *f) const;
void dump_xml(Formatter *f) const;

bool operator<(const rgw_pubsub_topic& t) const {
return to_str().compare(t.to_str());
Expand Down Expand Up @@ -413,6 +415,7 @@ struct rgw_pubsub_user_topics {
}

void dump(Formatter *f) const;
void dump_xml(Formatter *f) const;
};
WRITE_CLASS_ENCODER(rgw_pubsub_user_topics)

Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_quota.cc
Expand Up @@ -652,7 +652,7 @@ int RGWUserStatsCache::sync_user(const rgw_user& user)
ceph::real_time last_stats_sync;
ceph::real_time last_stats_update;

int ret = store->ctl()->user->read_stats(user_str, &stats, &last_stats_sync, &last_stats_update);
int ret = store->ctl()->user->read_stats(rgw_user(user_str), &stats, &last_stats_sync, &last_stats_update);
if (ret < 0) {
ldout(store->ctx(), 5) << "ERROR: can't read user header: ret=" << ret << dendl;
return ret;
Expand Down
2 changes: 1 addition & 1 deletion src/rgw/rgw_rest.h
Expand Up @@ -553,10 +553,10 @@ class RGWHandler_REST : public RGWHandler {
virtual RGWOp *op_copy() { return NULL; }
virtual RGWOp *op_options() { return NULL; }

public:
static int allocate_formatter(struct req_state *s, int default_formatter,
bool configurable);

public:
static constexpr int MAX_BUCKET_NAME_LEN = 255;
static constexpr int MAX_OBJ_NAME_LEN = 1024;

Expand Down
11 changes: 6 additions & 5 deletions src/rgw/rgw_rest_iam.cc
Expand Up @@ -26,12 +26,13 @@ void RGWHandler_REST_IAM::rgw_iam_parse_input()
for (const auto& t : tokens) {
auto pos = t.find("=");
if (pos != string::npos) {
std::string key = t.substr(0, pos);
std::string value = t.substr(pos + 1, t.size() - 1);
if (key == "AssumeRolePolicyDocument" || key == "Path" || key == "PolicyDocument") {
value = url_decode(value);
const auto key = t.substr(0, pos);
if (key == "Action") {
s->info.args.append(key, t.substr(pos + 1, t.size() - 1));
} else if (key == "AssumeRolePolicyDocument" || key == "Path" || key == "PolicyDocument") {
const auto value = url_decode(t.substr(pos + 1, t.size() - 1));
s->info.args.append(key, value);
}
s->info.args.append(key, value);
}
}
}
Expand Down

0 comments on commit a143534

Please sign in to comment.