diff --git a/README.md b/README.md index 2646809..f7be8ff 100644 --- a/README.md +++ b/README.md @@ -6,7 +6,7 @@ composer require jobcloud/php-console-kafka-schema-registry ``` ## Requirements -- php: >= 7.4 +- php: ^8.0 ## Register commands You can register each command separately like this: diff --git a/docker/docker-compose.ci.yml b/docker/docker-compose.ci.yml index bf9f675..b94051c 100644 --- a/docker/docker-compose.ci.yml +++ b/docker/docker-compose.ci.yml @@ -1,4 +1,3 @@ -version: '3.2' services: php: build: diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index bf9f675..b94051c 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -1,4 +1,3 @@ -version: '3.2' services: php: build: diff --git a/src/Command/SetAllSchemasCompatibilityModeCommand.php b/src/Command/SetAllSchemasCompatibilityModeCommand.php new file mode 100644 index 0000000..f04d99f --- /dev/null +++ b/src/Command/SetAllSchemasCompatibilityModeCommand.php @@ -0,0 +1,176 @@ +setName('kafka-schema-registry:set:compatibility:mode:all') + ->setDescription('Set compatibility modes for multiple schemas from a JSON configuration file') + ->setHelp($this->getHelpText()) + ->addArgument( + 'configFile', + InputArgument::REQUIRED, + 'Path to JSON configuration file containing schema-compatibility mappings' + ); + } + + private function getHelpText(): string + { + return <<<'HELP' +Set compatibility modes for multiple schemas based on configuration in a JSON file. + +JSON File Format: +The configuration file must be a valid JSON array with the following structure: + +[ + { + "schemaName": "schema-subject-name", + "compatibilityLevel": "COMPATIBILITY_LEVEL" + } +] + +Required Fields: +- schemaName: The subject name of the schema in the registry +- compatibilityLevel: One of the following compatibility levels: + * NONE + * BACKWARD + * BACKWARD_TRANSITIVE + * FORWARD + * FORWARD_TRANSITIVE + * FULL + * FULL_TRANSITIVE + +Example: +[ + { + "schemaName": "user-events", + "compatibilityLevel": "BACKWARD_TRANSITIVE" + }, + { + "schemaName": "order-events", + "compatibilityLevel": "FORWARD" + }, + { + "schemaName": "payment-events", + "compatibilityLevel": "FULL" + } +] + +The command will process each schema in the order specified and provide feedback +for each operation. If any schema update fails, the command will continue processing +the remaining schemas and return a non-zero exit code at the end. +HELP; + } + + public function execute(InputInterface $input, OutputInterface $output): int + { + $configFilePath = (string) $input->getArgument('configFile'); + + $config = $this->loadConfigFile($configFilePath, $output); + if (null === $config) { + return Command::FAILURE; + } + + $totalSchemas = count($config); + $successCount = 0; + $failureCount = 0; + + $output->writeln(sprintf('Processing %d schema compatibility configurations...', $totalSchemas)); + + foreach ($config as $index => $schemaConfig) { + if (false === $this->isValidSchemaConfig($schemaConfig)) { + $output->writeln( + sprintf('Invalid configuration at index %d: missing schemaName or compatibilityLevel', $index) + ); + $failureCount++; + + continue; + } + + $schemaName = $schemaConfig['schemaName']; + $compatibilityLevel = $schemaConfig['compatibilityLevel']; + + $output->write( + sprintf('Setting compatibility mode for schema "%s" to "%s"... ', $schemaName, $compatibilityLevel) + ); + + if ($this->setSchemaCompatibility($schemaName, $compatibilityLevel, $output)) { + $successCount++; + + continue; + } + + $failureCount++; + } + + $this->outputSummary($output, $totalSchemas, $successCount, $failureCount); + + return $failureCount > 0 ? Command::FAILURE : Command::SUCCESS; + } + + /** + * @return array>|null + */ + private function loadConfigFile(string $configFilePath, OutputInterface $output): ?array + { + $jsonContent = @file_get_contents($configFilePath); + if (false === $jsonContent) { + $output->writeln(sprintf('Could not read configuration file: %s', $configFilePath)); + + return null; + } + + $config = json_decode($jsonContent, true); + if (null === $config || false === is_array($config) || false === array_is_list($config)) { + $output->writeln('Configuration file must contain a JSON array of schema configurations'); + + return null; + } + + return $config; + } + + private function isValidSchemaConfig(mixed $schemaConfig): bool + { + return is_array($schemaConfig) + && isset($schemaConfig['schemaName']) + && isset($schemaConfig['compatibilityLevel']); + } + + private function setSchemaCompatibility( + string $schemaName, + string $compatibilityLevel, + OutputInterface $output + ): bool { + try { + $this->schemaRegistryApi->setSubjectCompatibilityLevel($schemaName, $compatibilityLevel); + } catch (\Exception $e) { + $output->writeln(sprintf('FAILED: %s', $e->getMessage())); + + return false; + } + + $output->writeln('SUCCESS'); + + return true; + } + + private function outputSummary(OutputInterface $output, int $total, int $success, int $failure): void + { + $output->writeln(''); + $output->writeln('=== Summary ==='); + $output->writeln(sprintf('Total schemas processed: %d', $total)); + $output->writeln(sprintf('Successful updates: %d', $success)); + $output->writeln(sprintf('Failed updates: %d', $failure)); + } +} diff --git a/src/Command/SetCompatibilityModeForSchemaCommand.php b/src/Command/SetCompatibilityModeForSchemaCommand.php new file mode 100644 index 0000000..c95e35a --- /dev/null +++ b/src/Command/SetCompatibilityModeForSchemaCommand.php @@ -0,0 +1,43 @@ +setName('kafka-schema-registry:set:schema:compatibility:mode') + ->setDescription('Set the compatibility mode for a given schema') + ->setHelp('Set the compatibility mode for a given schema') + ->addArgument('schemaName', InputArgument::REQUIRED, 'Name of the schema') + ->addArgument('compatibilityLevel', InputArgument::REQUIRED, 'Compatibility level to set'); + } + + public function execute(InputInterface $input, OutputInterface $output): int + { + $schemaName = (string) $input->getArgument('schemaName'); + $compatibilityLevel = (string) $input->getArgument('compatibilityLevel'); + + try { + $this->schemaRegistryApi->setSubjectCompatibilityLevel($schemaName, $compatibilityLevel); + } catch (\Exception $e) { + $output->writeln( + sprintf('Could not change compatibility mode for schema %s: %s', $schemaName, $e->getMessage()) + ); + + return Command::FAILURE; + } + + $output->writeln(sprintf('Successfully changed compatibility mode for schema: %s', $schemaName)); + + return Command::SUCCESS; + } +} diff --git a/src/ServiceProvider/CommandServiceProvider.php b/src/ServiceProvider/CommandServiceProvider.php index aba8dff..2792154 100644 --- a/src/ServiceProvider/CommandServiceProvider.php +++ b/src/ServiceProvider/CommandServiceProvider.php @@ -21,6 +21,8 @@ use Jobcloud\SchemaConsole\Command\ListVersionsForSchemaCommand; use Jobcloud\SchemaConsole\Command\RegisterChangedSchemasCommand; use Jobcloud\SchemaConsole\Command\RegisterSchemaVersionCommand; +use Jobcloud\SchemaConsole\Command\SetAllSchemasCompatibilityModeCommand; +use Jobcloud\SchemaConsole\Command\SetCompatibilityModeForSchemaCommand; use Jobcloud\SchemaConsole\Command\SetImportModeCommand; use Jobcloud\SchemaConsole\Command\SetReadOnlyModeCommand; use Jobcloud\SchemaConsole\Command\SetReadWriteModeCommand; @@ -51,6 +53,8 @@ public function register(Container $container) new GetCompatibilityModeCommand($schemaRegistryApi), new CheckAllSchemasCompatibilityCommand($schemaRegistryApi), new GetCompatibilityModeForSchemaCommand($schemaRegistryApi), + new SetAllSchemasCompatibilityModeCommand($schemaRegistryApi), + new SetCompatibilityModeForSchemaCommand($schemaRegistryApi), new GetLatestSchemaCommand($schemaRegistryApi), new GetSchemaByVersionCommand($schemaRegistryApi), new ListAllSchemasCommand($schemaRegistryApi), diff --git a/tests/Command/SetAllSchemasCompatibilityModeCommandTest.php b/tests/Command/SetAllSchemasCompatibilityModeCommandTest.php new file mode 100644 index 0000000..0baa421 --- /dev/null +++ b/tests/Command/SetAllSchemasCompatibilityModeCommandTest.php @@ -0,0 +1,309 @@ + 'schema1', 'compatibilityLevel' => 'BACKWARD'], + ['schemaName' => 'schema2', 'compatibilityLevel' => 'FORWARD'], + ])); + + /** @var MockObject|KafkaSchemaRegistryApiClient $schemaRegistryApi */ + $schemaRegistryApi = $this->makeMock(KafkaSchemaRegistryApiClient::class, [ + 'setSubjectCompatibilityLevel' => true, + ]); + + $commandTester = $this->createCommandTester($schemaRegistryApi); + $commandTester->execute(['configFile' => self::CONFIG_FILE]); + + $output = $commandTester->getDisplay(); + + self::assertStringContainsString('Processing 2 schema compatibility configurations...', $output); + self::assertStringContainsString( + 'Setting compatibility mode for schema "schema1" to "BACKWARD"... SUCCESS', + $output + ); + self::assertStringContainsString( + 'Setting compatibility mode for schema "schema2" to "FORWARD"... SUCCESS', + $output + ); + self::assertStringContainsString('Total schemas processed: 2', $output); + self::assertStringContainsString('Successful updates: 2', $output); + self::assertStringContainsString('Failed updates: 0', $output); + self::assertEquals(0, $commandTester->getStatusCode()); + } + + public function testCommandWithValidConfigFileAndSomeFailures(): void + { + file_put_contents(self::CONFIG_FILE, json_encode([ + ['schemaName' => 'schema1', 'compatibilityLevel' => 'BACKWARD'], + ['schemaName' => 'schema2', 'compatibilityLevel' => 'FORWARD'], + ['schemaName' => 'schema3', 'compatibilityLevel' => 'FULL'], + ])); + + /** @var MockObject|KafkaSchemaRegistryApiClient $schemaRegistryApi */ + $schemaRegistryApi = $this->makeMock(KafkaSchemaRegistryApiClient::class); + $schemaRegistryApi->method('setSubjectCompatibilityLevel') + ->willReturnCallback(function ($schema) { + return $schema === 'schema2' ? throw new \Exception('error') : true; + }); + + $commandTester = $this->createCommandTester($schemaRegistryApi); + $commandTester->execute(['configFile' => self::CONFIG_FILE]); + + $output = $commandTester->getDisplay(); + + self::assertStringContainsString('Processing 3 schema compatibility configurations...', $output); + self::assertStringContainsString( + 'Setting compatibility mode for schema "schema1" to "BACKWARD"... SUCCESS', + $output + ); + self::assertStringContainsString( + 'Setting compatibility mode for schema "schema2" to "FORWARD"... FAILED', + $output + ); + self::assertStringContainsString( + 'Setting compatibility mode for schema "schema3" to "FULL"... SUCCESS', + $output + ); + self::assertStringContainsString('Total schemas processed: 3', $output); + self::assertStringContainsString('Successful updates: 2', $output); + self::assertStringContainsString('Failed updates: 1', $output); + self::assertEquals(1, $commandTester->getStatusCode()); + } + + public function testCommandWithApiException(): void + { + file_put_contents(self::CONFIG_FILE, json_encode([ + ['schemaName' => 'schema1', 'compatibilityLevel' => 'BACKWARD'], + ['schemaName' => 'schema2', 'compatibilityLevel' => 'FORWARD'], + ])); + + /** @var MockObject|KafkaSchemaRegistryApiClient $schemaRegistryApi */ + $schemaRegistryApi = $this->makeMock(KafkaSchemaRegistryApiClient::class); + $schemaRegistryApi->method('setSubjectCompatibilityLevel') + ->willReturnOnConsecutiveCalls( + true, + $this->throwException(new \Exception('API Error: Schema not found')) + ); + + $commandTester = $this->createCommandTester($schemaRegistryApi); + $commandTester->execute(['configFile' => self::CONFIG_FILE]); + + $output = $commandTester->getDisplay(); + + self::assertStringContainsString( + 'Setting compatibility mode for schema "schema1" to "BACKWARD"... SUCCESS', + $output + ); + self::assertStringContainsString( + 'Setting compatibility mode for schema "schema2" to "FORWARD"... FAILED: API Error: Schema not found', + $output + ); + self::assertStringContainsString('Successful updates: 1', $output); + self::assertStringContainsString('Failed updates: 1', $output); + self::assertEquals(1, $commandTester->getStatusCode()); + } + + public function testCommandWithNonExistentConfigFile(): void + { + /** @var MockObject|KafkaSchemaRegistryApiClient $schemaRegistryApi */ + $schemaRegistryApi = $this->makeMock(KafkaSchemaRegistryApiClient::class); + + $commandTester = $this->createCommandTester($schemaRegistryApi); + $commandTester->execute(['configFile' => '/non/existent/file.json']); + + $output = $commandTester->getDisplay(); + + self::assertStringContainsString('Could not read configuration file: /non/existent/file.json', $output); + self::assertEquals(1, $commandTester->getStatusCode()); + } + + /** + * @dataProvider invalidJsonConfigurationProvider + */ + public function testCommandWithInvalidJsonConfiguration(mixed $configData): void + { + file_put_contents(self::CONFIG_FILE, json_encode($configData)); + + /** @var MockObject|KafkaSchemaRegistryApiClient $schemaRegistryApi */ + $schemaRegistryApi = $this->makeMock(KafkaSchemaRegistryApiClient::class); + + $commandTester = $this->createCommandTester($schemaRegistryApi); + $commandTester->execute(['configFile' => self::CONFIG_FILE]); + + $output = $commandTester->getDisplay(); + + self::assertStringContainsString( + 'Configuration file must contain a JSON array of schema configurations', + $output + ); + self::assertEquals(1, $commandTester->getStatusCode()); + } + + /** + * @return array + */ + public static function invalidJsonConfigurationProvider(): array + { + return [ + 'invalidJsonConfigurationProvider:1' => ['[invalid json}'], + 'invalidJsonConfigurationProvider:2' => [''], + 'invalidJsonConfigurationProvider:3' => [['invalid' => 'structure']], + 'invalidJsonConfigurationProvider:4' => ['not_an_array'], + 'invalidJsonConfigurationProvider:5' => [null], + 'invalidJsonConfigurationProvider:6' => [true], + 'invalidJsonConfigurationProvider:7' => [123], + ]; + } + + public function testCommandWithMissingSchemaNameField(): void + { + file_put_contents(self::CONFIG_FILE, json_encode([ + ['compatibilityLevel' => 'BACKWARD'], + ['schemaName' => 'valid-schema', 'compatibilityLevel' => 'FORWARD'], + ])); + + /** @var MockObject|KafkaSchemaRegistryApiClient $schemaRegistryApi */ + $schemaRegistryApi = $this->makeMock(KafkaSchemaRegistryApiClient::class, [ + 'setSubjectCompatibilityLevel' => true, + ]); + + $commandTester = $this->createCommandTester($schemaRegistryApi); + $commandTester->execute(['configFile' => self::CONFIG_FILE]); + + $output = $commandTester->getDisplay(); + + self::assertStringContainsString( + 'Invalid configuration at index 0: missing schemaName or compatibilityLevel', + $output + ); + self::assertStringContainsString( + 'Setting compatibility mode for schema "valid-schema" to "FORWARD"... SUCCESS', + $output + ); + self::assertStringContainsString('Successful updates: 1', $output); + self::assertStringContainsString('Failed updates: 1', $output); + self::assertEquals(1, $commandTester->getStatusCode()); + } + + public function testCommandWithMissingCompatibilityLevelField(): void + { + file_put_contents(self::CONFIG_FILE, json_encode([ + ['schemaName' => 'incomplete-schema'], + ['schemaName' => 'valid-schema', 'compatibilityLevel' => 'BACKWARD'], + ])); + + /** @var MockObject|KafkaSchemaRegistryApiClient $schemaRegistryApi */ + $schemaRegistryApi = $this->makeMock(KafkaSchemaRegistryApiClient::class, [ + 'setSubjectCompatibilityLevel' => true, + ]); + + $commandTester = $this->createCommandTester($schemaRegistryApi); + $commandTester->execute(['configFile' => self::CONFIG_FILE]); + + $output = $commandTester->getDisplay(); + + self::assertStringContainsString( + 'Invalid configuration at index 0: missing schemaName or compatibilityLevel', + $output + ); + self::assertStringContainsString( + 'Setting compatibility mode for schema "valid-schema" to "BACKWARD"... SUCCESS', + $output + ); + self::assertStringContainsString('Successful updates: 1', $output); + self::assertStringContainsString('Failed updates: 1', $output); + self::assertEquals(1, $commandTester->getStatusCode()); + } + + public function testCommandWithEmptyArray(): void + { + file_put_contents(self::CONFIG_FILE, json_encode([])); + + /** @var MockObject|KafkaSchemaRegistryApiClient $schemaRegistryApi */ + $schemaRegistryApi = $this->makeMock(KafkaSchemaRegistryApiClient::class); + + $commandTester = $this->createCommandTester($schemaRegistryApi); + $commandTester->execute(['configFile' => self::CONFIG_FILE]); + + $output = $commandTester->getDisplay(); + + self::assertStringContainsString('Processing 0 schema compatibility configurations...', $output); + self::assertStringContainsString('Total schemas processed: 0', $output); + self::assertStringContainsString('Successful updates: 0', $output); + self::assertStringContainsString('Failed updates: 0', $output); + self::assertEquals(0, $commandTester->getStatusCode()); + } + + public function testCommandWithAllValidCompatibilityLevels(): void + { + $compatibilityLevels = [ + 'NONE', 'BACKWARD', 'BACKWARD_TRANSITIVE', + 'FORWARD', 'FORWARD_TRANSITIVE', 'FULL', 'FULL_TRANSITIVE', + ]; + + $schemas = []; + foreach ($compatibilityLevels as $level) { + $schemas[] = ['schemaName' => "schema-{$level}", 'compatibilityLevel' => $level]; + } + + file_put_contents(self::CONFIG_FILE, json_encode($schemas)); + + /** @var MockObject|KafkaSchemaRegistryApiClient $schemaRegistryApi */ + $schemaRegistryApi = $this->makeMock(KafkaSchemaRegistryApiClient::class, [ + 'setSubjectCompatibilityLevel' => true, + ]); + + $commandTester = $this->createCommandTester($schemaRegistryApi); + $commandTester->execute(['configFile' => self::CONFIG_FILE]); + + $output = $commandTester->getDisplay(); + + self::assertStringContainsString('Processing 7 schema compatibility configurations...', $output); + + foreach ($compatibilityLevels as $level) { + self::assertStringContainsString( + "Setting compatibility mode for schema \"schema-{$level}\" to \"{$level}\"... SUCCESS", + $output + ); + } + + self::assertStringContainsString('Successful updates: 7', $output); + self::assertStringContainsString('Failed updates: 0', $output); + self::assertEquals(0, $commandTester->getStatusCode()); + } + + private function createCommandTester(MockObject $schemaRegistryApi): CommandTester + { + $application = new Application(); + $application->add(new SetAllSchemasCompatibilityModeCommand($schemaRegistryApi)); + $command = $application->find('kafka-schema-registry:set:compatibility:mode:all'); + + return new CommandTester($command); + } +} diff --git a/tests/Command/SetCompatibilityModeForSchemaCommandTest.php b/tests/Command/SetCompatibilityModeForSchemaCommandTest.php new file mode 100644 index 0000000..04b1de7 --- /dev/null +++ b/tests/Command/SetCompatibilityModeForSchemaCommandTest.php @@ -0,0 +1,75 @@ +makeMock(KafkaSchemaRegistryApiClient::class, [ + 'setSubjectCompatibilityLevel' => true, + ]); + + $application = new Application(); + $application->add(new SetCompatibilityModeForSchemaCommand($schemaRegistryApi)); + $command = $application->find('kafka-schema-registry:set:schema:compatibility:mode'); + $commandTester = new CommandTester($command); + + $commandTester->execute([ + 'schemaName' => $schemaName, + 'compatibilityLevel' => 'BACKWARD_TRANSITIVE', + ]); + + $commandOutput = trim($commandTester->getDisplay()); + + self::assertEquals( + sprintf('Successfully changed compatibility mode for schema: %s', $schemaName), + $commandOutput + ); + self::assertEquals(0, $commandTester->getStatusCode()); + } + + public function testCommandWhenCompatibilityIsNotChanged(): void + { + $schemaName = 'SomeSchemaName'; + $errorMessage = 'error'; + + /** @var MockObject|KafkaSchemaRegistryApiClient $schemaRegistryApi */ + $schemaRegistryApi = $this->makeMock(KafkaSchemaRegistryApiClient::class, [ + 'setSubjectCompatibilityLevel' => new \Exception($errorMessage), + ]); + + $application = new Application(); + $application->add(new SetCompatibilityModeForSchemaCommand($schemaRegistryApi)); + $command = $application->find('kafka-schema-registry:set:schema:compatibility:mode'); + $commandTester = new CommandTester($command); + + $commandTester->execute([ + 'schemaName' => $schemaName, + 'compatibilityLevel' => 'BACKWARD_TRANSITIVE', + ]); + + $commandOutput = trim($commandTester->getDisplay()); + + self::assertEquals( + sprintf('Could not change compatibility mode for schema %s: %s', $schemaName, $errorMessage), + $commandOutput + ); + self::assertEquals(1, $commandTester->getStatusCode()); + } +} diff --git a/tests/ServiceProvider/CommandServiceProviderTest.php b/tests/ServiceProvider/CommandServiceProviderTest.php index fa982c6..7af3997 100644 --- a/tests/ServiceProvider/CommandServiceProviderTest.php +++ b/tests/ServiceProvider/CommandServiceProviderTest.php @@ -20,6 +20,8 @@ use Jobcloud\SchemaConsole\Command\ListVersionsForSchemaCommand; use Jobcloud\SchemaConsole\Command\RegisterChangedSchemasCommand; use Jobcloud\SchemaConsole\Command\RegisterSchemaVersionCommand; +use Jobcloud\SchemaConsole\Command\SetAllSchemasCompatibilityModeCommand; +use Jobcloud\SchemaConsole\Command\SetCompatibilityModeForSchemaCommand; use Jobcloud\SchemaConsole\Command\SetImportModeCommand; use Jobcloud\SchemaConsole\Command\SetReadOnlyModeCommand; use Jobcloud\SchemaConsole\Command\SetReadWriteModeCommand; @@ -83,6 +85,8 @@ public function testHasAllOfTheCommands(): void self::assertArrayHasInstanceOf(DeleteAllSchemasCommand::class, $commands); self::assertArrayHasInstanceOf(GetCompatibilityModeCommand::class, $commands); self::assertArrayHasInstanceOf(GetCompatibilityModeForSchemaCommand::class, $commands); + self::assertArrayHasInstanceOf(SetAllSchemasCompatibilityModeCommand::class, $commands); + self::assertArrayHasInstanceOf(SetCompatibilityModeForSchemaCommand::class, $commands); self::assertArrayHasInstanceOf(GetLatestSchemaCommand::class, $commands); self::assertArrayHasInstanceOf(GetSchemaByVersionCommand::class, $commands); self::assertArrayHasInstanceOf(ListAllSchemasCommand::class, $commands); @@ -97,5 +101,7 @@ public function testHasAllOfTheCommands(): void self::assertArrayHasInstanceOf(CheckAllSchemaTemplatesDocCommentsCommand::class, $commands); self::assertArrayHasInstanceOf(CheckAllSchemaTemplatesDefaultTypeCommand::class, $commands); self::assertArrayHasInstanceOf(CheckAllSchemaTemplatesNamesCommand::class, $commands); + + self::assertCount(22, $commands); } }