Skip to content

Commit

Permalink
Merge pull request #18391 from BenPope/schema_registry_post_subjects_…
Browse files Browse the repository at this point in the history
…subject_deleted

[CORE-2831] schema_registry: Allow deleted=true for post_subject
  • Loading branch information
michael-redpanda committed May 13, 2024
2 parents 18a4197 + e881a8e commit 334220b
Show file tree
Hide file tree
Showing 5 changed files with 81 additions and 13 deletions.
12 changes: 12 additions & 0 deletions src/v/pandaproxy/api/api-doc/schema_registry.json
Original file line number Diff line number Diff line change
Expand Up @@ -437,6 +437,12 @@
"required": true,
"type": "string"
},
{
"name": "deleted",
"in": "query",
"required": false,
"type": "boolean"
},
{
"name": "schema_def",
"in": "body",
Expand All @@ -453,6 +459,12 @@
"$ref": "#/definitions/subject_schema"
}
},
"404": {
"description": "Not found",
"schema": {
"$ref": "#/definitions/error_body"
}
},
"409": {
"description": "Incompatible schema",
"schema": {
Expand Down
10 changes: 7 additions & 3 deletions src/v/pandaproxy/schema_registry/handlers.cc
Original file line number Diff line number Diff line change
Expand Up @@ -373,12 +373,15 @@ post_subject(server::request_t rq, server::reply_t rp) {
parse_content_type_header(rq);
parse_accept_header(rq, rp);
auto sub = parse::request_param<subject>(*rq.req, "subject");
vlog(plog.debug, "post_subject subject='{}'", sub);
auto inc_del{
parse::query_param<std::optional<include_deleted>>(*rq.req, "deleted")
.value_or(include_deleted::no)};
vlog(plog.debug, "post_subject subject='{}', deleted='{}'", sub, inc_del);
// We must sync
co_await rq.service().writer().read_sync();

// Force 40401 if no subject
co_await rq.service().schema_store().get_versions(sub, include_deleted::no);
co_await rq.service().schema_store().get_versions(sub, inc_del);

canonical_schema schema;
try {
Expand All @@ -397,7 +400,8 @@ post_subject(server::request_t rq, server::reply_t rp) {

rq.req.reset();

auto sub_schema = co_await rq.service().schema_store().has_schema(schema);
auto sub_schema = co_await rq.service().schema_store().has_schema(
schema, inc_del);

auto json_rslt{json::rjson_serialize(post_subject_versions_version_response{
.schema{std::move(sub_schema.schema)},
Expand Down
8 changes: 4 additions & 4 deletions src/v/pandaproxy/schema_registry/sharded_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -263,8 +263,9 @@ ss::future<bool> sharded_store::has_schema(schema_id id) {
});
}

ss::future<subject_schema> sharded_store::has_schema(canonical_schema schema) {
auto versions = co_await get_versions(schema.sub(), include_deleted::no);
ss::future<subject_schema>
sharded_store::has_schema(canonical_schema schema, include_deleted inc_del) {
auto versions = co_await get_versions(schema.sub(), inc_del);

try {
co_await validate_schema(schema);
Expand All @@ -275,8 +276,7 @@ ss::future<subject_schema> sharded_store::has_schema(canonical_schema schema) {
std::optional<subject_schema> sub_schema;
for (auto ver : versions) {
try {
auto res = co_await get_subject_schema(
schema.sub(), ver, include_deleted::no);
auto res = co_await get_subject_schema(schema.sub(), ver, inc_del);
if (schema.def() == res.schema.def()) {
sub_schema.emplace(std::move(res));
break;
Expand Down
3 changes: 2 additions & 1 deletion src/v/pandaproxy/schema_registry/sharded_store.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ class sharded_store {
is_deleted deleted);

ss::future<bool> has_schema(schema_id id);
ss::future<subject_schema> has_schema(canonical_schema schema);
ss::future<subject_schema> has_schema(
canonical_schema schema, include_deleted inc_del = include_deleted::no);

///\brief Return a schema definition by id.
ss::future<canonical_schema_definition> get_schema_definition(schema_id id);
Expand Down
61 changes: 56 additions & 5 deletions tests/rptest/tests/schema_registry_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -355,13 +355,15 @@ def _get_subjects(self, deleted=False, headers=HTTP_GET_HEADERS, **kwargs):
def _post_subjects_subject(self,
subject,
data,
deleted=False,
headers=HTTP_POST_HEADERS,
**kwargs):
return self._request("POST",
f"subjects/{subject}",
headers=headers,
data=data,
**kwargs)
return self._request(
"POST",
f"subjects/{subject}{'?deleted=true' if deleted else ''}",
headers=headers,
data=data,
**kwargs)

def _post_subjects_subject_versions(self,
subject,
Expand Down Expand Up @@ -827,6 +829,55 @@ def test_post_subjects_subject(self):
assert result["error_code"] == 40403
assert result["message"] == f"Schema not found"

self.logger.info("Soft deleting the schema")
result_raw = self._delete_subject_version(subject=subject,
version=1,
permanent=False)
assert result_raw.status_code == requests.codes.ok

self.logger.info(
"Posting deleted existing schema should be fail (no subject)")
result_raw = self._post_subjects_subject(subject=subject,
data=json.dumps(
{"schema": schema1_def}))
self.logger.info(result_raw)
self.logger.info(result_raw.content)
assert result_raw.status_code == requests.codes.not_found
result = result_raw.json()
assert result["error_code"] == 40401
assert result["message"] == f"Subject '{subject}' not found."

self.logger.info("Posting deleted existing schema should be success")
result_raw = self._post_subjects_subject(
subject=subject,
data=json.dumps({"schema": schema1_def}, ),
deleted=True)
self.logger.info(result_raw)
self.logger.info(result_raw.content)
assert result_raw.status_code == requests.codes.ok
result = result_raw.json()
assert result["subject"] == subject
assert result["id"] == 1
assert result["version"] == 1
assert result["schema"]

self.logger.info("Posting compatible schema should be success")
result_raw = self._post_subjects_subject_versions(
subject=subject, data=json.dumps({"schema": schema2_def}))
assert result_raw.status_code == requests.codes.ok

self.logger.info(
"Posting deleted existing schema should be fail (no schema)")
result_raw = self._post_subjects_subject(subject=subject,
data=json.dumps(
{"schema": schema1_def}))
self.logger.info(result_raw)
self.logger.info(result_raw.content)
assert result_raw.status_code == requests.codes.not_found
result = result_raw.json()
assert result["error_code"] == 40403
assert result["message"] == f"Schema not found"

@cluster(num_nodes=3)
def test_config(self):
"""
Expand Down

0 comments on commit 334220b

Please sign in to comment.