Skip to content

Commit

Permalink
Merge pull request #17952 from BenPope/schema-registry-mode
Browse files Browse the repository at this point in the history
[CORE-60] Schema Registry: Support `/mode`
  • Loading branch information
BenPope committed May 14, 2024
2 parents c55d8d6 + 19e51c9 commit c1ec803
Show file tree
Hide file tree
Showing 31 changed files with 1,812 additions and 206 deletions.
209 changes: 201 additions & 8 deletions src/v/pandaproxy/api/api-doc/schema_registry.json
Original file line number Diff line number Diff line change
Expand Up @@ -185,17 +185,210 @@
"application/json"
],
"parameters": [],
"produces": ["application/vnd.schemaregistry.v1+json"],
"produces": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"type": "object",
"properties": {
"mode": {
"type": "string"
}
}
"$ref": "#/definitions/mode"
}
},
"500": {
"description": "Internal Server error",
"schema": {
"$ref": "#/definitions/error_body"
}
}
}
},
"put": {
"summary": "Set the global mode.",
"operationId": "put_mode",
"consumes": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"parameters": [
{
"name": "mode",
"in": "body",
"schema": {
"$ref": "#/definitions/mode"
}
}
],
"produces": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/mode"
}
},
"422": {
"description": "Unprocessable Entity",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"500": {
"description": "Internal Server error",
"schema": {
"$ref": "#/definitions/error_body"
}
}
}
}
},
"/mode/{subject}": {
"get": {
"summary": "Get the mode for a subject.",
"operationId": "get_mode_subject",
"consumes": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"parameters": [
{
"name": "subject",
"description": "The subject to get the mode for.",
"in": "path",
"required": true,
"type": "string"
},
{
"name": "defaultToGlobal",
"description": "If true, return the global mode if the subject doesn't have a mode set.",
"in": "query",
"required": false,
"type": "boolean"
}
],
"produces": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/mode"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"500": {
"description": "Internal Server error",
"schema": {
"$ref": "#/definitions/error_body"
}
}
}
},
"put": {
"summary": "Set the mode for a subject.",
"operationId": "put_mode_subject",
"consumes": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"parameters": [
{
"name": "subject",
"description": "The subject to set the mode for.",
"in": "path",
"required": true,
"type": "string"
},
{
"name": "mode",
"in": "body",
"schema": {
"$ref": "#/definitions/mode"
}
}
],
"produces": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/mode"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"422": {
"description": "Unprocessable Entity",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"500": {
"description": "Internal Server error",
"schema": {
"$ref": "#/definitions/error_body"
}
}
}
},
"delete": {
"summary": "Delete the mode for a subject.",
"operationId": "delete_mode_subject",
"consumes": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"parameters": [
{
"name": "subject",
"description": "The subject to delete the mode for.",
"in": "path",
"required": true,
"type": "string"
}
],
"produces": [
"application/vnd.schemaregistry.v1+json",
"application/vnd.schemaregistry+json",
"application/json"
],
"responses": {
"200": {
"description": "OK",
"schema": {
"$ref": "#/definitions/mode"
}
},
"404": {
"description": "Not Found",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"500": {
Expand Down Expand Up @@ -423,7 +616,7 @@
},
"/subjects/{subject}": {
"post": {
"summary": "Check if a schema is already registred for the subject.",
"summary": "Check if a schema is already registered for the subject.",
"operationId": "post_subject",
"consumes": [
"application/vnd.schemaregistry.v1+json",
Expand Down
12 changes: 12 additions & 0 deletions src/v/pandaproxy/api/api-doc/schema_registry_definitions.def.json
Original file line number Diff line number Diff line change
Expand Up @@ -89,5 +89,17 @@
"compatibility": {
"type": "string"
}
}
},
"mode": {
"type": "object",
"properties": {
"mode": {
"type": "string",
"enum": [
"READWRITE",
"READONLY"
]
},
}
}
2 changes: 1 addition & 1 deletion src/v/pandaproxy/api/api-doc/schema_registry_header.json
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
"swagger": "2.0",
"info": {
"title": "Pandaproxy Schema Registry",
"version": "1.0.3"
"version": "1.0.4"
},
"host": "{{Host}}",
"basePath": "/",
Expand Down
43 changes: 43 additions & 0 deletions src/v/pandaproxy/auth_utils.h
Original file line number Diff line number Diff line change
Expand Up @@ -39,4 +39,47 @@ inline credential_t maybe_authenticate_request(

return user;
}

enum class auth_level {
// Unauthenticated endpoint (not a typo, 'public' is a keyword)
publik = 0,
// Requires authentication (if enabled) but not superuser status
user = 1,
// Requires authentication (if enabled) and superuser status
superuser = 2
};

inline credential_t maybe_authorize_request(
config::rest_authn_method authn_method,
auth_level lvl,
request_authenticator& authenticator,
const ss::http::request& req) {
credential_t user;

if (authn_method != config::rest_authn_method::none) {
// Will throw 400 & 401 if auth fails
auto auth_result = authenticator.authenticate(req);
// Will throw 403 if user enabled HTTP Basic Auth but
// did not give the authorization header.
switch (lvl) {
case auth_level::superuser:
auth_result.require_superuser();
break;
case auth_level::user:
auth_result.require_authenticated();
break;
case auth_level::publik:
auth_result.pass();
break;
}

user = credential_t{
auth_result.get_username(),
auth_result.get_password(),
auth_result.get_sasl_mechanism()};
}

return user;
}

} // namespace pandaproxy
4 changes: 4 additions & 0 deletions src/v/pandaproxy/error.cc
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,8 @@ struct reply_error_category final : std::error_category {
return "subject_version_not_deleted";
case reply_error_code::compatibility_not_found:
return "compatibility_not_found";
case reply_error_code::mode_not_found:
return "mode_not_found";
case reply_error_code::serialization_error:
return "serialization_error";
case reply_error_code::consumer_already_exists:
Expand All @@ -140,6 +142,8 @@ struct reply_error_category final : std::error_category {
return "Invalid schema version";
case reply_error_code::compatibility_level_invalid:
return "Invalid compatibility level";
case reply_error_code::mode_invalid:
return "Invalid mode";
case reply_error_code::subject_version_operaton_not_permitted:
return "Overwrite new schema is not permitted.";
case reply_error_code::subject_version_has_references:
Expand Down
2 changes: 2 additions & 0 deletions src/v/pandaproxy/error.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,11 +78,13 @@ enum class reply_error_code : uint16_t {
subject_version_soft_deleted = 40406,
subject_version_not_deleted = 40407,
compatibility_not_found = 40408,
mode_not_found = 40409,
serialization_error = 40801,
consumer_already_exists = 40902,
schema_empty = 42201,
schema_version_invalid = 42202,
compatibility_level_invalid = 42203,
mode_invalid = 42204,
subject_version_operaton_not_permitted = 42205,
subject_version_has_references = 42206,
subject_version_schema_id_already_exists = 42207,
Expand Down
2 changes: 1 addition & 1 deletion src/v/pandaproxy/schema_registry/api.cc
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ api::~api() noexcept = default;

ss::future<> api::start() {
_store = std::make_unique<sharded_store>();
co_await _store->start(_sg);
co_await _store->start(is_mutable(_cfg.mode_mutability), _sg);
co_await _schema_id_validation_probe.start();
co_await _schema_id_validation_probe.invoke_on_all(
&schema_id_validation_probe::setup_metrics);
Expand Down
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/configuration.cc
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ configuration::configuration()
{},
{},
config::endpoint_tls_config::validate_many)
, mode_mutability(*this, "mode_mutability", "Allow modifying mode", {}, false)
, schema_registry_replication_factor(
*this,
"schema_registry_replication_factor",
Expand Down
1 change: 1 addition & 0 deletions src/v/pandaproxy/schema_registry/configuration.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ struct configuration final : public config::config_store {
schema_registry_api;
config::one_or_many_property<config::endpoint_tls_config>
schema_registry_api_tls;
config::property<bool> mode_mutability;
config::property<std::optional<int16_t>> schema_registry_replication_factor;
config::property<ss::sstring> api_doc_dir;
};
Expand Down
8 changes: 8 additions & 0 deletions src/v/pandaproxy/schema_registry/error.cc
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,8 @@ struct error_category final : std::error_category {
case error_code::compatibility_not_found:
return "Subject does not have subject-level compatibility "
"configured";
case error_code::mode_not_found:
return "Subject does not have subject-level mode configured";
case error_code::subject_version_operaton_not_permitted:
return "Overwrite new schema is not permitted.";
case error_code::subject_version_has_references:
Expand All @@ -69,6 +71,8 @@ struct error_category final : std::error_category {
return "Invalid compatibility level. Valid values are NONE, "
"BACKWARD, FORWARD, FULL, BACKWARD_TRANSITIVE, "
"FORWARD_TRANSITIVE, and FULL_TRANSITIVE";
case error_code::mode_invalid:
return "Invalid mode. Valid values are READWRITE, READONLY";
}
return "(unrecognized error)";
}
Expand All @@ -93,6 +97,8 @@ struct error_category final : std::error_category {
return reply_error_code::subject_version_not_deleted; // 40407
case error_code::compatibility_not_found:
return reply_error_code::compatibility_not_found; // 40408
case error_code::mode_not_found:
return reply_error_code::mode_not_found; // 40409
case error_code::subject_schema_invalid:
return reply_error_code::internal_server_error; // 500
case error_code::write_collision:
Expand All @@ -117,6 +123,8 @@ struct error_category final : std::error_category {
return reply_error_code::zookeeper_error; // 50001
case error_code::compatibility_level_invalid:
return reply_error_code::compatibility_level_invalid; // 42203
case error_code::mode_invalid:
return reply_error_code::mode_invalid; // 42204
}
return {};
}
Expand Down
Loading

0 comments on commit c1ec803

Please sign in to comment.