diff --git a/tests/rptest/tests/schema_registry_test.py b/tests/rptest/tests/schema_registry_test.py index da4508ad25b65..76a1dc831c6be 100644 --- a/tests/rptest/tests/schema_registry_test.py +++ b/tests/rptest/tests/schema_registry_test.py @@ -13,6 +13,7 @@ from typing import Optional import uuid import re +import urllib.parse import requests import time import random @@ -79,6 +80,7 @@ def get_subject_name(sns: str, topic: str, field: MessageField, schema2_def = '{"type":"record","name":"myrecord","fields":[{"name":"f1","type":["null","string"]},{"name":"f2","type":"string","default":"foo"}]}' # Schema 3 is not backwards compatible schema3_def = '{"type":"record","name":"myrecord","fields":[{"name":"f1","type":"string"},{"name":"f2","type":"string"}]}' + invalid_avro = '{"type":"notatype","name":"myrecord","fields":[{"name":"f1","type":"string"}]}' simple_proto_def = """ @@ -97,6 +99,95 @@ def get_subject_name(sns: str, topic: str, field: MessageField, Simple id = 1; }""" +validation_schemas = dict( + proto3=""" +syntax = "proto3"; + +message myrecord { + message Msg1 { + int32 f1 = 1; + } + Msg1 m1 = 1; + Msg1 m2 = 2; +} +""", + proto3_incompat=""" +syntax = "proto3"; + +message myrecord { + // MESSAGE_REMOVED + message Msg1d { + int32 f1 = 1; + } + // FIELD_NAMED_TYPE_CHANGED + Msg1d m1 = 1; +} +""", + proto2=""" +syntax = "proto2"; + +message myrecord { + message Msg1 { + required int32 f1 = 1; + } + required Msg1 m1 = 1; + required Msg1 m2 = 2; +} +""", + proto2_incompat=""" +syntax = "proto2"; + +message myrecord { + // MESSAGE_REMOVED + message Msg1d { + required int32 f1 = 1; + } + // FIELD_NAMED_TYPE_CHANGED + required Msg1d m1 = 1; +} +""", + avro=""" +{ + "type": "record", + "name": "myrecord", + "fields": [ + { + "name": "f1", + "type": "string" + }, + { + "name": "enumF", + "type": { + "name": "ABorC", + "type": "enum", + "symbols": ["a", "b", "c"] + } + } + ] +} +""", + avro_incompat=""" +{ + "type": "record", + "name": "myrecord", + "fields": [ + { + "name": "f1", + "type": "int" + }, + { + "name": "enumF", + "type": { + "name": "ABorC", + "type": "enum", + "symbols": ["a"] + } + } + ] +} +""", +) + log_config = LoggingConfig('info', logger_levels={ 'security': 'trace', @@ -438,10 +529,15 @@ def _post_compatibility_subject_version(self, version, data, headers=HTTP_POST_HEADERS, + verbose: bool | None = None, **kwargs): + params = '' + if verbose is not None: + params = f"?{urllib.parse.urlencode({'verbose': str(verbose).lower()})}" + return self._request( "POST", - f"compatibility/subjects/{subject}/versions/{version}", + f"compatibility/subjects/{subject}/versions/{version}{params}", headers=headers, data=data, **kwargs) @@ -974,6 +1070,7 @@ def test_post_compatibility_subject_version(self): subject=f"{topic}-key", version=1, data=schema_2_data) assert result_raw.status_code == requests.codes.ok assert result_raw.json()["is_compatible"] == True + assert len(result_raw.json()["messages"]) == 0 self.logger.debug("Set subject config - BACKWARD") result_raw = self._set_config_subject( @@ -986,14 +1083,32 @@ def test_post_compatibility_subject_version(self): subject=f"{topic}-key", version=1, data=schema_2_data) assert result_raw.status_code == requests.codes.ok assert result_raw.json()["is_compatible"] == True + assert len(result_raw.json()["messages"]) == 0 - self.logger.debug("Check compatibility backward, no default") + self.logger.debug("Check compatibility backward, no default, verbose") result_raw = self._post_compatibility_subject_version( - subject=f"{topic}-key", version=1, data=schema_3_data) + subject=f"{topic}-key", + version=1, + data=schema_3_data, + verbose=True) assert result_raw.status_code == requests.codes.ok assert result_raw.json()["is_compatible"] == False + self.logger.debug( + "Check compatibility backward, no default, not verbose") + result_raw = self._post_compatibility_subject_version( + subject=f"{topic}-key", + version=1, + data=schema_3_data, + verbose=False) + assert result_raw.status_code == requests.codes.ok + assert result_raw.json()["is_compatible"] == False + assert not result_raw.json().get( + "messages", + None), f"Expected no messages, got {result_raw.json()['messages']}" + self.logger.debug("Posting incompatible schema 3 as a subject key") + result_raw = self._post_subjects_subject_versions( subject=f"{topic}-key", data=schema_3_data) assert result_raw.status_code == requests.codes.conflict @@ -1023,6 +1138,79 @@ def test_post_compatibility_subject_version(self): assert result_raw.status_code == requests.codes.ok assert result_raw.json()["id"] == v1_id + p3_errs: list[tuple] = [ + ("MESSAGE_REMOVED", "#/myrecord/Msg1"), + ("FIELD_NAMED_TYPE_CHANGED", "#/myrecord/1"), + ] + p2_errs: list[tuple] = p3_errs + [ + ("REQUIRED_FIELD_REMOVED", "#/myrecord/2"), + ] + av_errs: list[tuple] = [ + ("TYPE_MISMATCH", "/fields/0/type"), + ("MISSING_ENUM_SYMBOLS", "/fields/1/type/symbols"), + ] + EXPECTED_INCOMPATIBILITIES: dict[str, list[tuple]] = { + "proto3_incompat": p3_errs, + "proto2_incompat": p2_errs, + "avro_incompat": av_errs, + } + + @cluster(num_nodes=3) + @parametrize(schemas=("avro", "avro_incompat", "AVRO")) + @parametrize(schemas=("proto3", "proto3_incompat", "PROTOBUF")) + @parametrize(schemas=("proto2", "proto2_incompat", "PROTOBUF")) + def test_compatibility_messages(self, schemas): + """ + Verify compatibility messages + """ + + topic = create_topic_names(1)[0] + + self.logger.debug(f"Register a schema against a subject") + schema_data = json.dumps({ + "schema": validation_schemas[schemas[0]], + "schemaType": schemas[2], + }) + incompatible_data = json.dumps({ + "schema": validation_schemas[schemas[1]], + "schemaType": schemas[2], + }) + + super_username, super_password, _ = self.redpanda.SUPERUSER_CREDENTIALS + + self.logger.debug("Posting schema as a subject key") + result_raw = self._post_subjects_subject_versions( + subject=f"{topic}-key", data=schema_data) + self.logger.debug(result_raw) + assert result_raw.status_code == requests.codes.ok + v1_id = result_raw.json()["id"] + + self.logger.debug("Set subject config - BACKWARD") + result_raw = self._set_config_subject( + subject=f"{topic}-key", + data=json.dumps({"compatibility": "BACKWARD"})) + assert result_raw.status_code == requests.codes.ok + + self.logger.debug("Check compatibility full") + result_raw = self._post_compatibility_subject_version( + subject=f"{topic}-key", + version=1, + data=incompatible_data, + verbose=True) + + assert result_raw.status_code == requests.codes.ok + assert result_raw.json()["is_compatible"] == False + msgs = result_raw.json()["messages"] + for message in ["oldSchemaVersion", "oldSchema", "compatibility"]: + assert any( + message in m for m in msgs + ), f"Expected to find an instance of '{message}', got {msgs}" + + expected_errs = self.EXPECTED_INCOMPATIBILITIES[schemas[1]] + for e in expected_errs: + assert any(e[0] in m and e[1] in m + for m in msgs), f"Expected {e} in messages, got {msgs}" + @cluster(num_nodes=3) def test_delete_subject(self): """