diff --git a/.gitattributes b/.gitattributes new file mode 100644 index 0000000..d8d5679 --- /dev/null +++ b/.gitattributes @@ -0,0 +1,9 @@ +# Auto detect text files and perform LF normalization +* text=auto + +/.gitattributes export-ignore +/.gitignore export-ignore +/.travis.yml export-ignore +/.travis.sh export-ignore +/composer.lock export-ignore +/tests/ export-ignore diff --git a/composer.json b/composer.json index aebe85d..55ae8c7 100644 --- a/composer.json +++ b/composer.json @@ -16,6 +16,7 @@ "require": { "php": "^7.1", "ext-curl": "*", + "ext-json": "*", "guzzlehttp/guzzle": "^6.3.3", "fig/http-message-util": "^1.1" }, diff --git a/phpunit.xml b/phpunit.xml index 4933a3f..e8facf0 100644 --- a/phpunit.xml +++ b/phpunit.xml @@ -8,7 +8,6 @@ convertWarningsToExceptions="true" processIsolation="false" stopOnFailure="false" - syntaxCheck="false" > diff --git a/src/ClientInterface.php b/src/ClientInterface.php new file mode 100644 index 0000000..f329c25 --- /dev/null +++ b/src/ClientInterface.php @@ -0,0 +1,42 @@ +id = $id->getId(); + } + + /** + * @return string + */ + public function getId(): string + { + return $this->id; + } +} diff --git a/src/Entity/FieldInfo.php b/src/Entity/FieldInfo.php new file mode 100644 index 0000000..ecf0545 --- /dev/null +++ b/src/Entity/FieldInfo.php @@ -0,0 +1,71 @@ +name = $name; + $this->schemaInfo = $schemaInfo; + } + + /** + * @return string + */ + public function getName(): string + { + return $this->name; + } + + /** + * @return SchemaInfo|null + */ + public function getSchema(): ?SchemaInfo + { + return $this->schemaInfo; + } + + /** + * @return string + */ + public function __toString(): string + { + return sprintf( + "FieldInfo{name='%s',schema=%s}", + $this->name, + !is_null($this->schemaInfo) ? spl_object_hash($this->schemaInfo) : '' + ); + } +} diff --git a/src/Entity/KafkaTopicInfo.php b/src/Entity/KafkaTopicInfo.php index 6e9af70..d331041 100644 --- a/src/Entity/KafkaTopicInfo.php +++ b/src/Entity/KafkaTopicInfo.php @@ -28,7 +28,7 @@ class KafkaTopicInfo implements EntityInterface /** @var string */ private $registered; - /** @var array */ + /** @var array */ private $replicaInfo; /** @var int */ @@ -38,15 +38,15 @@ class KafkaTopicInfo implements EntityInterface private $consumerGroupCount; /** - * @param string $name - * @param string $registered - * @param array $replicaInfo - * @param int $consumerCount - * @param int $consumerGroupCount + * @param string $name + * @param bool $registered + * @param array $replicaInfo + * @param int $consumerCount + * @param int $consumerGroupCount */ public function __construct( string $name, - string $registered, + bool $registered, array $replicaInfo, int $consumerCount, int $consumerGroupCount @@ -67,15 +67,15 @@ public function getName(): string } /** - * @return string + * @return bool */ - public function getRegistered(): string + public function getRegistered(): bool { return $this->registered; } /** - * @return array + * @return array */ public function getReplicaInfo(): array { diff --git a/src/Entity/KafkaTopics.php b/src/Entity/KafkaTopics.php index 8d0e307..00723a4 100644 --- a/src/Entity/KafkaTopics.php +++ b/src/Entity/KafkaTopics.php @@ -20,7 +20,7 @@ /** * Class KafkaTopics */ -class KafkaTopics extends AbstractKsql +class KafkaTopics extends KsqlEntity { /** @var array */ private $kafkaTopicInfoList; diff --git a/src/Entity/KsqlCollection.php b/src/Entity/KsqlCollection.php index 91b1572..dbb1644 100644 --- a/src/Entity/KsqlCollection.php +++ b/src/Entity/KsqlCollection.php @@ -22,19 +22,19 @@ */ class KsqlCollection implements EntityInterface { - /** @var AbstractKsql[] */ + /** @var EntityInterface[] */ protected $ksql = []; /** - * @param AbstractKsql $ksql + * @param EntityInterface $ksql */ - public function addKsql(AbstractKsql $ksql): void + public function addKsql(EntityInterface $ksql): void { $this->ksql[] = $ksql; } /** - * @return AbstractKsql[] + * @return EntityInterface[] */ public function getKsql(): array { diff --git a/src/Entity/AbstractKsql.php b/src/Entity/KsqlEntity.php similarity index 92% rename from src/Entity/AbstractKsql.php rename to src/Entity/KsqlEntity.php index 5c12705..274d9e0 100644 --- a/src/Entity/AbstractKsql.php +++ b/src/Entity/KsqlEntity.php @@ -18,12 +18,12 @@ namespace Istyle\KsqlClient\Entity; /** - * Class AbstractKsql + * Class KsqlEntity */ -abstract class AbstractKsql +abstract class KsqlEntity implements EntityInterface { /** @var string */ - protected $statementText; + private $statementText; /** * @param string $statementText diff --git a/src/Entity/KsqlErrorMessage.php b/src/Entity/KsqlErrorMessage.php index 7d37d71..87e8e00 100644 --- a/src/Entity/KsqlErrorMessage.php +++ b/src/Entity/KsqlErrorMessage.php @@ -17,11 +17,16 @@ namespace Istyle\KsqlClient\Entity; +use function sprintf; + /** * Class KsqlErrorMessage */ -class KsqlErrorMessage implements EntityInterface +class KsqlErrorMessage extends KsqlEntity { + /** @var int */ + protected $errorCode; + /** @var string */ protected $message; @@ -29,11 +34,16 @@ class KsqlErrorMessage implements EntityInterface protected $stackTrace; /** + * @param int $errorCode * @param string $message * @param array $stackTrace */ - public function __construct(string $message, array $stackTrace) - { + public function __construct( + int $errorCode, + string $message, + array $stackTrace = [] + ) { + $this->errorCode = $errorCode; $this->message = $message; $this->stackTrace = $stackTrace; } @@ -45,7 +55,7 @@ public function getMessage(): string { return $this->message; } - + /** * @return array */ @@ -53,4 +63,20 @@ public function getStackTrace(): array { return $this->stackTrace; } + + /** + * @return int + */ + public function getErrorCode(): int + { + return $this->errorCode; + } + + /** + * @return string + */ + public function __toString(): string + { + return sprintf("%s\n%s", $this->getMessage(), implode("\n", $this->getStackTrace())); + } } diff --git a/src/Entity/KsqlStatementErrorMessage.php b/src/Entity/KsqlStatementErrorMessage.php index 043f47d..a891692 100644 --- a/src/Entity/KsqlStatementErrorMessage.php +++ b/src/Entity/KsqlStatementErrorMessage.php @@ -20,19 +20,31 @@ /** * Class KsqlStatementErrorMessage */ -class KsqlStatementErrorMessage extends AbstractKsql +class KsqlStatementErrorMessage extends KsqlErrorMessage { - /** @var KsqlErrorMessage */ - private $errorMessage; + /** @var string */ + protected $statementText; + + /** @var array */ + private $entities = []; /** - * @param string $statementText - * @param KsqlErrorMessage $errorMessage + * @param string $statementText + * @param int $errorCode + * @param string $message + * @param array $stackTrace + * @param array $entities */ - public function __construct(string $statementText, KsqlErrorMessage $errorMessage) - { - parent::__construct($statementText); - $this->errorMessage = $errorMessage; + public function __construct( + string $statementText, + int $errorCode, + string $message, + array $stackTrace, + array $entities + ) { + $this->statementText = $statementText; + $this->entities = $entities; + parent::__construct($errorCode, $message, $stackTrace); } /** @@ -44,10 +56,10 @@ public function getStatementText(): string } /** - * @return KsqlErrorMessage + * @return array */ - public function getErrorMessage(): KsqlErrorMessage + public function getEntities(): array { - return $this->errorMessage; + return $this->entities; } } diff --git a/src/Entity/Properties.php b/src/Entity/Properties.php new file mode 100644 index 0000000..4807f77 --- /dev/null +++ b/src/Entity/Properties.php @@ -0,0 +1,47 @@ +properties = $properties; + } + + /** + * @return array + */ + public function getProperties(): array + { + return $this->properties; + } +} diff --git a/src/Entity/Queries.php b/src/Entity/Queries.php index 4430e9b..2d9ecc4 100644 --- a/src/Entity/Queries.php +++ b/src/Entity/Queries.php @@ -20,7 +20,7 @@ /** * Class Queries */ -final class Queries extends AbstractKsql +final class Queries extends KsqlEntity { /** @var RunningQuery[] */ private $queries; diff --git a/src/Entity/QueryDescription.php b/src/Entity/QueryDescription.php new file mode 100644 index 0000000..29e6c4f --- /dev/null +++ b/src/Entity/QueryDescription.php @@ -0,0 +1,142 @@ +entityQueryId = $entityQueryId; + $this->statementText = $statementText; + $this->fields = $fields; + $this->sources = $sources; + $this->sinks = $sinks; + $this->topology = $topology; + $this->executionPlan = $executionPlan; + $this->overriddenProperties = $overriddenProperties; + } + + /** + * @return EntityQueryId + */ + public function getEntityQueryId(): EntityQueryId + { + return $this->entityQueryId; + } + + /** + * @return string + */ + public function getStatementText(): string + { + return $this->statementText; + } + + /** + * @return array + */ + public function getFields(): array + { + return $this->fields; + } + + /** + * @return array + */ + public function getSources(): array + { + return $this->sources; + } + + /** + * @return array + */ + public function getSinks(): array + { + return $this->sinks; + } + + /** + * @return string + */ + public function getTopology(): string + { + return $this->topology; + } + + /** + * @return string + */ + public function getExecutionPlan(): string + { + return $this->executionPlan; + } + + /** + * @return array + */ + public function getOverriddenProperties(): array + { + return $this->overriddenProperties; + } +} diff --git a/src/Entity/QueryDescriptionEntity.php b/src/Entity/QueryDescriptionEntity.php new file mode 100644 index 0000000..cfd89f6 --- /dev/null +++ b/src/Entity/QueryDescriptionEntity.php @@ -0,0 +1,47 @@ +queryDescription = $queryDescription; + } + + /** + * @return QueryDescription + */ + public function getQueryDescription(): QueryDescription + { + return $this->queryDescription; + } +} diff --git a/src/Entity/RunningQuery.php b/src/Entity/RunningQuery.php index b431d8b..730a3bb 100644 --- a/src/Entity/RunningQuery.php +++ b/src/Entity/RunningQuery.php @@ -20,44 +20,31 @@ /** * Class RunningQuery */ -final class RunningQuery implements EntityInterface +final class RunningQuery extends KsqlEntity { - /** @var string */ - protected $queryString; - - /** @var array */ + /** @var string[] */ protected $sinks = []; - /** @var string */ + /** @var EntityQueryId */ protected $id; /** - * RunningQuery constructor. - * - * @param string $queryString - * @param array $sinks - * @param string $id + * @param string $statementText + * @param string[] $sinks + * @param EntityQueryId $id */ public function __construct( - string $queryString, + string $statementText, array $sinks, - string $id + EntityQueryId $id ) { - $this->queryString = $queryString; + parent::__construct($statementText); $this->sinks = $sinks; $this->id = $id; } /** - * @return string - */ - public function getQueryString(): string - { - return $this->queryString; - } - - /** - * @return array + * @return string[] */ public function getSinks(): array { @@ -65,9 +52,9 @@ public function getSinks(): array } /** - * @return string + * @return EntityQueryId */ - public function getId(): string + public function getId(): EntityQueryId { return $this->id; } diff --git a/src/Entity/SchemaInfo.php b/src/Entity/SchemaInfo.php new file mode 100644 index 0000000..7f0ce46 --- /dev/null +++ b/src/Entity/SchemaInfo.php @@ -0,0 +1,72 @@ +type = $type; + $this->fieldInfo = $fieldInfo; + $this->schemaInfo = $schemaInfo; + } + + /** + * @return string + */ + public function getType(): string + { + return $this->type; + } + + /** + * @return FieldInfo[]|null + */ + public function getFieldInfo(): ?array + { + return $this->fieldInfo; + } + + /** + * @return SchemaInfo|null + */ + public function getSchemaInfo(): ?SchemaInfo + { + return $this->schemaInfo; + } +} diff --git a/src/Entity/Description.php b/src/Entity/SourceDescription.php similarity index 75% rename from src/Entity/Description.php rename to src/Entity/SourceDescription.php index 2dd9f28..d7641ee 100644 --- a/src/Entity/Description.php +++ b/src/Entity/SourceDescription.php @@ -18,9 +18,9 @@ namespace Istyle\KsqlClient\Entity; /** - * Class Description + * Class SourceDescription */ -final class Description extends AbstractKsql +final class SourceDescription implements EntityInterface { /** @var string */ private $name; @@ -31,8 +31,8 @@ final class Description extends AbstractKsql /** @var RunningQuery[] */ private $writeQueries; - /** @var FieldSchema[] */ - private $schema; + /** @var FieldInfo[] */ + private $fields; /** @var string */ private $type; @@ -52,6 +52,12 @@ final class Description extends AbstractKsql /** @var bool */ private $extended; + /** @var string */ + private $format; + + /** @var string */ + private $topic; + /** @var int */ private $partitions; @@ -59,48 +65,49 @@ final class Description extends AbstractKsql private $replication; /** - * Description constructor. - * - * @param string $statementText - * @param string $name - * @param array $readQueries - * @param array $writeQueries - * @param FieldSchema[] $schema - * @param string $type - * @param string $key - * @param string $timestamp - * @param string $statistics - * @param string $errorStats - * @param bool $extended - * @param int $partitions - * @param int $replication + * @param string $name + * @param array $readQueries + * @param array $writeQueries + * @param array $fields + * @param string $type + * @param string $key + * @param string $timestamp + * @param string $statistics + * @param string $errorStats + * @param bool $extended + * @param string $format + * @param string $topic + * @param int $partitions + * @param int $replication */ public function __construct( - string $statementText, string $name, array $readQueries, array $writeQueries, - array $schema, + array $fields, string $type, string $key, string $timestamp, string $statistics, string $errorStats, bool $extended, + string $format, + string $topic, int $partitions, int $replication ) { - parent::__construct($statementText); $this->name = $name; $this->readQueries = $readQueries; $this->writeQueries = $writeQueries; - $this->schema = $schema; + $this->fields = $fields; $this->type = $type; $this->key = $key; $this->timestamp = $timestamp; $this->statistics = $statistics; $this->errorStats = $errorStats; $this->extended = $extended; + $this->format = $format; + $this->topic = $topic; $this->partitions = $partitions; $this->replication = $replication; } @@ -130,11 +137,11 @@ public function getWriteQueries(): array } /** - * @return FieldSchema[] + * @return FieldInfo[] */ - public function getSchema(): array + public function getFields(): array { - return $this->schema; + return $this->fields; } /** @@ -177,6 +184,22 @@ public function getErrorStats(): string return $this->errorStats; } + /** + * @return string + */ + public function getFormat(): string + { + return $this->format; + } + + /** + * @return string + */ + public function getTopic(): string + { + return $this->topic; + } + /** * @return bool */ diff --git a/src/Entity/SourceDescriptionEntity.php b/src/Entity/SourceDescriptionEntity.php new file mode 100644 index 0000000..bb8c8b7 --- /dev/null +++ b/src/Entity/SourceDescriptionEntity.php @@ -0,0 +1,47 @@ +sourceDescription = $sourceDescription; + } + + /** + * @return SourceDescription + */ + public function getSourceDescription(): SourceDescription + { + return $this->sourceDescription; + } +} diff --git a/src/Entity/SourceInfo.php b/src/Entity/SourceInfo.php new file mode 100644 index 0000000..39bfab4 --- /dev/null +++ b/src/Entity/SourceInfo.php @@ -0,0 +1,72 @@ +name = $name; + $this->topic = $topic; + $this->format = $format; + } + + /** + * @return string + */ + public function getName(): string + { + return $this->name; + } + + /** + * @return string + */ + public function getTopic(): string + { + return $this->topic; + } + + /** + * @return string + */ + public function getFormat(): string + { + return $this->format; + } +} diff --git a/src/Entity/SourceInfoTable.php b/src/Entity/SourceInfoTable.php new file mode 100644 index 0000000..1f6be84 --- /dev/null +++ b/src/Entity/SourceInfoTable.php @@ -0,0 +1,51 @@ +isWindowed = $isWindowed; + } + + /** + * @return bool + */ + public function getIsWindowed(): bool + { + return $this->isWindowed; + } +} diff --git a/src/Entity/StreamedRows.php b/src/Entity/StreamedRows.php index 41fc3db..598aa93 100644 --- a/src/Entity/StreamedRows.php +++ b/src/Entity/StreamedRows.php @@ -20,7 +20,7 @@ /** * Class StreamedRow */ -class StreamedRows implements EntityInterface +final class StreamedRows implements EntityInterface { /** @var StreamedRow[] */ protected $rows = []; diff --git a/src/Entity/StreamsList.php b/src/Entity/StreamsList.php new file mode 100644 index 0000000..d877bb8 --- /dev/null +++ b/src/Entity/StreamsList.php @@ -0,0 +1,47 @@ +sourceInfoList = $sourceInfoList; + } + + /** + * @return SourceInfo[] + */ + public function getSourceInfoList(): array + { + return $this->sourceInfoList; + } +} diff --git a/src/Entity/TablesList.php b/src/Entity/TablesList.php new file mode 100644 index 0000000..4ba3fe8 --- /dev/null +++ b/src/Entity/TablesList.php @@ -0,0 +1,47 @@ +sourceInfoTable = $sourceInfoTable; + } + + /** + * @return SourceInfoTable[] + */ + public function getSourceInfoList(): array + { + return $this->sourceInfoTable; + } +} diff --git a/src/Mapper/AbstractMapper.php b/src/Mapper/AbstractMapper.php index 726283f..31f41dc 100644 --- a/src/Mapper/AbstractMapper.php +++ b/src/Mapper/AbstractMapper.php @@ -18,13 +18,12 @@ namespace Istyle\KsqlClient\Mapper; use Psr\Http\Message\ResponseInterface; -use Istyle\KsqlClient\Entity\EntityInterface; /** * should use extends * Class AbstractMapper */ -abstract class AbstractMapper +abstract class AbstractMapper implements ResultInterface { /** @var ResponseInterface */ protected $response; @@ -36,10 +35,4 @@ public function __construct(ResponseInterface $response) { $this->response = $response; } - - /** - * return Result Entity - * @return EntityInterface - */ - abstract public function result(): EntityInterface; } diff --git a/src/Mapper/EntityManager.php b/src/Mapper/EntityManager.php new file mode 100644 index 0000000..cc4437d --- /dev/null +++ b/src/Mapper/EntityManager.php @@ -0,0 +1,68 @@ + KafkaTopicMapper::class, + 'streams' => StreamsListMapper::class, + 'generic_error' => KsqlErrorMapper::class, + 'statement_error' => KsqlStatementErrorMapper::class, + 'tables' => TablesListMapper::class, + 'queries' => QueriesMapper::class, + 'properties' => PropertiesMapper::class, + 'sourceDescription' => SourceDescriptionMapper::class, + 'queryDescription' => QueryDescriptionMapper::class, + ]; + + /** + * @param array $row + */ + public function __construct(array $row) + { + $this->row = $row; + } + + /** + * @return EntityInterface + */ + public function map(): EntityInterface + { + $type = $this->row['@type'] ?? ''; + if (array_key_exists($type, $this->map)) { + /** @var ResultInterface $mapper */ + $mapper = new $this->map[$type]($this->row); + return $mapper->result(); + } + throw new UnknownJsonObjectsException('Unknown json objects.'); + } +} diff --git a/src/Mapper/KafkaTopicMapper.php b/src/Mapper/KafkaTopicMapper.php new file mode 100644 index 0000000..55e8ed4 --- /dev/null +++ b/src/Mapper/KafkaTopicMapper.php @@ -0,0 +1,57 @@ +rows = $rows; + } + + /** + * @return EntityInterface + */ + public function result(): EntityInterface + { + $topics = []; + foreach ($this->rows['topics'] as $topic) { + $topics[] = new KafkaTopicInfo( + $topic['name'], + $topic['registered'], + (is_array($topic['replicaInfo'])) ? $topic['replicaInfo'] : [$topic['replicaInfo']], + $topic['consumerCount'], + $topic['consumerGroupCount'] + ); + } + return new KafkaTopics($this->rows['statementText'], $topics); + } +} diff --git a/src/Mapper/ErrorMapper.php b/src/Mapper/KsqlErrorMapper.php similarity index 73% rename from src/Mapper/ErrorMapper.php rename to src/Mapper/KsqlErrorMapper.php index f57e2ff..31eaf02 100644 --- a/src/Mapper/ErrorMapper.php +++ b/src/Mapper/KsqlErrorMapper.php @@ -21,21 +21,30 @@ use Istyle\KsqlClient\Entity\KsqlErrorMessage; /** - * Class ErrorResult + * Class KsqlErrorMapper */ -class ErrorMapper extends AbstractMapper +class KsqlErrorMapper implements ResultInterface { + /** @var array */ + protected $rows; + /** - * @return EntityInterface|KsqlErrorMessage + * @param array $rows + */ + public function __construct(array $rows) + { + $this->rows = $rows; + } + + /** + * @return EntityInterface */ public function result(): EntityInterface { - $decode = \GuzzleHttp\json_decode( - $this->response->getBody()->getContents(), true - ); return new KsqlErrorMessage( - $decode['message'] ?? '', - $decode['stackTrace'] ?? [] + $this->rows['error_code'], + $this->rows['message'] ?? '', + $this->rows['stackTrace'] ?? [] ); } } diff --git a/src/Mapper/KsqlMapper.php b/src/Mapper/KsqlMapper.php index cc17ad9..8e300d3 100644 --- a/src/Mapper/KsqlMapper.php +++ b/src/Mapper/KsqlMapper.php @@ -17,18 +17,11 @@ namespace Istyle\KsqlClient\Mapper; -use Istyle\KsqlClient\Entity\AbstractKsql; -use Istyle\KsqlClient\Entity\Description; use Istyle\KsqlClient\Entity\EntityInterface; -use Istyle\KsqlClient\Entity\FieldSchema; -use Istyle\KsqlClient\Entity\KafkaTopicInfo; -use Istyle\KsqlClient\Entity\KafkaTopics; use Istyle\KsqlClient\Entity\KsqlCollection; -use Istyle\KsqlClient\Entity\KsqlErrorMessage; -use Istyle\KsqlClient\Entity\KsqlStatementErrorMessage; -use Istyle\KsqlClient\Entity\Queries; -use Istyle\KsqlClient\Entity\RunningQuery; -use Istyle\KsqlClient\Exception\UnknownJsonObjectsException; + +use function GuzzleHttp\json_decode; +use function array_key_exists; /** * Class KsqlResult @@ -40,100 +33,18 @@ class KsqlMapper extends AbstractMapper */ public function result(): EntityInterface { - $decode = \GuzzleHttp\json_decode( + $decode = json_decode( $this->response->getBody()->getContents(), true ); + if (array_key_exists('@type', $decode)) { + return (new EntityManager($decode))->map(); + } $collect = new KsqlCollection(); foreach ($decode as $row) { - $collect->addKsql($this->detectEntity($row)); + $em = new EntityManager($row); + $collect->addKsql($em->map()); } return $collect; } - - /** - * @param array $row - * - * @return AbstractKsql - */ - protected function detectEntity(array $row): AbstractKsql - { - if (isset($row['queries'])) { - $queries = []; - foreach ($row['queries']['queries'] as $query) { - $queries[] = new RunningQuery( - $query['statementText'], - $query['sinks'], - $query['id'] - ); - } - - return new Queries($row['queries']['statementText'], $queries); - } - if (isset($row['description'])) { - $read = $write = $schema = []; - foreach ($row['description']['readQueries'] as $query) { - $read[] = new RunningQuery( - $query['statementText'], - $query['sinks'], - $query['id'] - ); - } - foreach ($row['description']['writeQueries'] as $query) { - $write[] = new RunningQuery( - $query['statementText'], - $query['sinks'], - $query['id'] - ); - } - foreach ($row['description']['schema'] as $query) { - $schema[] = new FieldSchema( - $query['name'], - $query['type'] - ); - } - $description = $row['description']; - - return new Description( - $row['description']['statementText'], - $description['name'], - $read, - $write, - $schema, - $description['type'], - $description['key'], - $description['timestamp'], - $description['statistics'], - $description['errorStats'], - $description['extended'], - $description['replication'], - $description['partitions'] - ); - } - if (isset($row['error'])) { - if (isset($row['error']['errorMessage'])) { - $errorMessage = $row['error']['errorMessage']; - - return new KsqlStatementErrorMessage( - $row['error']['statementText'], - new KsqlErrorMessage($errorMessage['message'], $errorMessage['stackTrace']) - ); - } - } - if (isset($row['kafka_topics'])) { - $topics = []; - foreach ($row['kafka_topics']['topics'] as $topic) { - $topics[] = new KafkaTopicInfo( - $topic['name'], - $topic['registered'], - (is_array($topic['replicaInfo'])) ? $topic['replicaInfo'] : [$topic['replicaInfo']], - $topic['consumerCount'], - $topic['consumerGroupCount'] - ); - } - - return new KafkaTopics($row['kafka_topics']['statementText'], $topics); - } - throw new UnknownJsonObjectsException('Unknown json objects.'); - } } diff --git a/src/Mapper/KsqlStatementErrorMapper.php b/src/Mapper/KsqlStatementErrorMapper.php new file mode 100644 index 0000000..3db7ac4 --- /dev/null +++ b/src/Mapper/KsqlStatementErrorMapper.php @@ -0,0 +1,52 @@ +rows = $rows; + } + + /** + * @return EntityInterface + */ + public function result(): EntityInterface + { + return new KsqlStatementErrorMessage( + $this->rows['statementText'], + $this->rows['error_code'], + $this->rows['message'], + $this->rows['stackTrace'], + $this->rows['entities'] + ); + } +} diff --git a/src/Mapper/PropertiesMapper.php b/src/Mapper/PropertiesMapper.php new file mode 100644 index 0000000..899f836 --- /dev/null +++ b/src/Mapper/PropertiesMapper.php @@ -0,0 +1,46 @@ +rows = $rows; + } + + /** + * @return EntityInterface + */ + public function result(): EntityInterface + { + return new Properties($this->rows['statementText'], $this->rows['properties']); + } +} diff --git a/src/Mapper/QueriesMapper.php b/src/Mapper/QueriesMapper.php new file mode 100644 index 0000000..2cc3b0d --- /dev/null +++ b/src/Mapper/QueriesMapper.php @@ -0,0 +1,60 @@ +rows = $rows; + } + + /** + * @return EntityInterface + */ + public function result(): EntityInterface + { + $queries = []; + foreach ($this->rows['queries'] as $row) { + $queries[] = new RunningQuery( + $row['queryString'], + $row['sinks'], + new EntityQueryId( + new QueryId($row['id']) + ) + ); + } + return new Queries($this->rows['statementText'], $queries); + } +} diff --git a/src/Mapper/QueryDescriptionMapper.php b/src/Mapper/QueryDescriptionMapper.php new file mode 100644 index 0000000..1996ba8 --- /dev/null +++ b/src/Mapper/QueryDescriptionMapper.php @@ -0,0 +1,66 @@ +rows = $rows; + } + + /** + * @return EntityInterface + */ + public function result(): EntityInterface + { + $description = $this->rows['queryDescription']; + $queryDescription = new QueryDescription( + new EntityQueryId(new QueryId($description['id'])), + $description['statementText'], + $this->parentFields($description['fields'] ?? []), + $description['sources'], + $description['sinks'], + $description['topology'], + $description['executionPlan'], + $description['overriddenProperties'] + ); + return new QueryDescriptionEntity( + $this->rows['statementText'], + $queryDescription + ); + } +} diff --git a/src/Mapper/RecursiveFieldTrait.php b/src/Mapper/RecursiveFieldTrait.php new file mode 100644 index 0000000..bcbf86c --- /dev/null +++ b/src/Mapper/RecursiveFieldTrait.php @@ -0,0 +1,84 @@ +recursiveFields($schema['fields']), + $this->recursiveSchemaInfo($schema['memberSchema']) + ) + ); + } + return $fields; + } + + /** + * @param array|null $rows + * + * @return FieldInfo|null + */ + private function recursiveFields(?array $rows): ?array + { + if (!is_null($rows)) { + $fields = []; + foreach ($rows as $row) { + if (!is_null($row['memberSchema'])) { + $fields[] = new FieldInfo( + $row['name'], + $this->recursiveSchemaInfo($row['memberSchema'] ?? []) + ); + } + } + return $fields; + } + return null; + } + + /** + * @param array|null $rows + * + * @return SchemaInfo|null + */ + private function recursiveSchemaInfo(?array $rows): ?SchemaInfo + { + if (is_null($rows)) { + return null; + } + return new SchemaInfo($rows['type'], $this->recursiveFields($rows['fields']), $rows['memberSchema']); + } +} diff --git a/src/Mapper/ResultInterface.php b/src/Mapper/ResultInterface.php new file mode 100644 index 0000000..dd6b5c4 --- /dev/null +++ b/src/Mapper/ResultInterface.php @@ -0,0 +1,31 @@ +rows = $rows; + } + + /** + * @return EntityInterface + */ + public function result(): EntityInterface + { + $description = $this->rows['sourceDescription']; + $sourceDescription = new SourceDescription( + $description['name'], + $this->getQueries($description['readQueries'] ?? []), + $this->getQueries($description['writeQueries'] ?? []), + $this->parentFields($description['fields'] ?? []), + $description['type'], + $description['key'], + $description['timestamp'], + $description['statistics'], + $description['errorStats'], + $description['extended'], + $description['format'], + $description['topic'], + $description['partitions'], + $description['replication'] + ); + return new SourceDescriptionEntity( + $this->rows['statementText'], + $sourceDescription + ); + } + + /** + * @param array $rows + * + * @return array + */ + private function getQueries(array $rows): array + { + $queries = []; + foreach ($rows as $row) { + $queries[] = new RunningQuery( + $row['queryString'], + $row['sinks'], + new EntityQueryId( + new QueryId($row['id']) + ) + ); + } + return $queries; + } +} diff --git a/src/Mapper/StreamMapper.php b/src/Mapper/StreamMapper.php index 4866bed..ba85a37 100644 --- a/src/Mapper/StreamMapper.php +++ b/src/Mapper/StreamMapper.php @@ -52,12 +52,11 @@ public function result(): EntityInterface $line = trim(\GuzzleHttp\Psr7\readline($stream)); if (!empty($line)) { $decode = \GuzzleHttp\json_decode($line, true); - $errorMessage = $decode['errorMessage']; $row = new StreamedRow( $decode['row']['columns'], new KsqlErrorMessage( - $errorMessage['message'] ?? '', - $errorMessage['stackTrace'] ?? [] + 0, + strval($decode['errorMessage']) ) ); call_user_func_array($this->callback, [$row]); diff --git a/src/Mapper/StreamsListMapper.php b/src/Mapper/StreamsListMapper.php new file mode 100644 index 0000000..53f0dbc --- /dev/null +++ b/src/Mapper/StreamsListMapper.php @@ -0,0 +1,56 @@ +rows = $rows; + } + + /** + * @return EntityInterface + */ + public function result(): EntityInterface + { + $streams = []; + foreach ($this->rows['streams'] as $stream) { + $streams[] = new SourceInfo( + $stream['name'], + $stream['topic'], + $stream['format'] + ); + } + return new StreamsList($this->rows['statementText'], $streams); + } +} diff --git a/src/Mapper/TablesListMapper.php b/src/Mapper/TablesListMapper.php new file mode 100644 index 0000000..53b0b02 --- /dev/null +++ b/src/Mapper/TablesListMapper.php @@ -0,0 +1,57 @@ +rows = $rows; + } + + /** + * @return EntityInterface + */ + public function result(): EntityInterface + { + $tables = []; + foreach ($this->rows['tables'] as $row) { + $tables[] = new SourceInfoTable( + $row['name'], + $row['topic'], + $row['format'], + $row['isWindowed'] + ); + } + + return new TablesList($this->rows['statementText'], $tables); + } +} diff --git a/src/Query/AbstractStreamQuery.php b/src/Query/AbstractStreamQuery.php index d68e1fe..55eca78 100644 --- a/src/Query/AbstractStreamQuery.php +++ b/src/Query/AbstractStreamQuery.php @@ -31,8 +31,6 @@ abstract class AbstractStreamQuery implements StreamQueryInterface protected $callback; /** - * StreamQuery constructor. - * * @param string $query * @param StreamConsumable $callback */ diff --git a/src/Query/CommandStatus.php b/src/Query/CommandStatus.php index dcf64b9..e482007 100644 --- a/src/Query/CommandStatus.php +++ b/src/Query/CommandStatus.php @@ -18,8 +18,8 @@ namespace Istyle\KsqlClient\Query; use Fig\Http\Message\RequestMethodInterface; +use Istyle\KsqlClient\Mapper\ResultInterface; use Psr\Http\Message\ResponseInterface; -use Istyle\KsqlClient\Mapper\AbstractMapper; use Istyle\KsqlClient\Mapper\CommandStatusMapper; /** @@ -65,9 +65,9 @@ public function toArray(): array /** * @param ResponseInterface $response * - * @return AbstractMapper + * @return ResultInterface */ - public function queryResult(ResponseInterface $response): AbstractMapper + public function queryResult(ResponseInterface $response): ResultInterface { return new CommandStatusMapper($response); } diff --git a/src/Query/Ksql.php b/src/Query/Ksql.php index 07d79a0..3f4f489 100644 --- a/src/Query/Ksql.php +++ b/src/Query/Ksql.php @@ -19,13 +19,13 @@ use Fig\Http\Message\RequestMethodInterface; use Psr\Http\Message\ResponseInterface; -use Istyle\KsqlClient\Mapper\AbstractMapper; +use Istyle\KsqlClient\Mapper\ResultInterface; use Istyle\KsqlClient\Mapper\KsqlMapper; /** * Class Ksql */ -final class Ksql implements QueryInterface +class Ksql implements QueryInterface { /** @var string */ protected $query; @@ -63,15 +63,16 @@ public function toArray(): array { return [ 'ksql' => $this->query, + "streamsProperties" => [] ]; } /** * @param ResponseInterface $response * - * @return AbstractMapper + * @return ResultInterface */ - public function queryResult(ResponseInterface $response): AbstractMapper + public function queryResult(ResponseInterface $response): ResultInterface { return new KsqlMapper($response); } diff --git a/src/Entity/FieldSchema.php b/src/Query/QueryId.php similarity index 69% rename from src/Entity/FieldSchema.php rename to src/Query/QueryId.php index 40135bb..c34ef91 100644 --- a/src/Entity/FieldSchema.php +++ b/src/Query/QueryId.php @@ -15,42 +15,37 @@ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. */ -namespace Istyle\KsqlClient\Entity; +namespace Istyle\KsqlClient\Query; /** - * Class FieldSchema + * Class QueryId */ -final class FieldSchema +final class QueryId { /** @var string */ - private $name; - - /** @var string */ - private $type; + private $id; /** - * @param string $name - * @param string $type + * @param string $id */ - public function __construct(string $name, string $type) + public function __construct(string $id) { - $this->name = $name; - $this->type = $type; + $this->id = $id; } /** * @return string */ - public function getName(): string + public function getId(): string { - return $this->name; + return $this->id; } /** * @return string */ - public function getType(): string + public function __toString(): string { - return $this->type; + return $this->id; } } diff --git a/src/Query/QueryInterface.php b/src/Query/QueryInterface.php index 78b302b..9d60884 100644 --- a/src/Query/QueryInterface.php +++ b/src/Query/QueryInterface.php @@ -17,8 +17,8 @@ namespace Istyle\KsqlClient\Query; +use Istyle\KsqlClient\Mapper\ResultInterface; use Psr\Http\Message\ResponseInterface; -use Istyle\KsqlClient\Mapper\AbstractMapper; /** * Interface QueryInterface @@ -45,7 +45,7 @@ public function toArray(): array; /** * @param ResponseInterface $response * - * @return AbstractMapper + * @return ResultInterface */ - public function queryResult(ResponseInterface $response); + public function queryResult(ResponseInterface $response): ResultInterface; } diff --git a/src/Query/ServerInfo.php b/src/Query/ServerInfo.php index 173cbd9..5e80408 100644 --- a/src/Query/ServerInfo.php +++ b/src/Query/ServerInfo.php @@ -19,6 +19,7 @@ use Fig\Http\Message\RequestMethodInterface; use Psr\Http\Message\ResponseInterface; +use Istyle\KsqlClient\Mapper\ResultInterface; use Istyle\KsqlClient\Mapper\ServerInfoMapper; /** @@ -53,7 +54,7 @@ public function toArray(): array /** * {@inheritdoc} */ - public function queryResult(ResponseInterface $response) + public function queryResult(ResponseInterface $response): ResultInterface { return new ServerInfoMapper($response); } diff --git a/src/Query/Status.php b/src/Query/Status.php index aac0cfc..69af1dd 100644 --- a/src/Query/Status.php +++ b/src/Query/Status.php @@ -19,7 +19,7 @@ use Fig\Http\Message\RequestMethodInterface; use Psr\Http\Message\ResponseInterface; -use Istyle\KsqlClient\Mapper\AbstractMapper; +use Istyle\KsqlClient\Mapper\ResultInterface; use Istyle\KsqlClient\Mapper\StatusMapper; /** @@ -52,11 +52,9 @@ public function toArray(): array } /** - * @param ResponseInterface $response - * - * @return AbstractMapper + * {@inheritdoc} */ - public function queryResult(ResponseInterface $response): AbstractMapper + public function queryResult(ResponseInterface $response): ResultInterface { return new StatusMapper($response); } diff --git a/src/Query/Stream.php b/src/Query/Stream.php index 7afdf71..a191939 100644 --- a/src/Query/Stream.php +++ b/src/Query/Stream.php @@ -19,7 +19,7 @@ use Fig\Http\Message\RequestMethodInterface; use Psr\Http\Message\ResponseInterface; -use Istyle\KsqlClient\Mapper\AbstractMapper; +use Istyle\KsqlClient\Mapper\ResultInterface; use Istyle\KsqlClient\Mapper\StreamMapper; /** @@ -54,11 +54,9 @@ public function toArray(): array } /** - * @param ResponseInterface $response - * - * @return AbstractMapper + * {@inheritdoc} */ - public function queryResult(ResponseInterface $response): AbstractMapper + public function queryResult(ResponseInterface $response): ResultInterface { $stream = new StreamMapper($response); $stream->setCallback($this->callback); diff --git a/src/RestClient.php b/src/RestClient.php index 2cda719..88d3f37 100644 --- a/src/RestClient.php +++ b/src/RestClient.php @@ -23,17 +23,21 @@ use GuzzleHttp\Psr7\Request; use GuzzleHttp\Psr7\Uri; use GuzzleHttp\Psr7\UriNormalizer; +use GuzzleHttp\RequestOptions; use Istyle\KsqlClient\Exception\KsqlRestClientException; -use Istyle\KsqlClient\Mapper\AbstractMapper; -use Istyle\KsqlClient\Mapper\ErrorMapper; +use Istyle\KsqlClient\Mapper\KsqlErrorMapper; +use Istyle\KsqlClient\Mapper\ResultInterface; use Istyle\KsqlClient\Query\QueryInterface; +use Psr\Http\Message\RequestInterface; +use Psr\Http\Message\ResponseInterface; +use Psr\Http\Message\UriInterface; /** * Class RestClient */ -class RestClient +class RestClient implements \Istyle\KsqlClient\ClientInterface { - const VERSION = '0.1.0'; + const VERSION = '0.2.0'; /** @var string */ private $serverAddress; @@ -77,12 +81,9 @@ public function __construct( */ protected function buildClient(): ClientInterface { - return new GuzzleClient([ - 'headers' => [ - 'User-Agent' => $this->userAgent(), - 'Accept' => 'application/json', - ], - ]); + return new GuzzleClient( + $this->requestHeader() + ); } /** @@ -120,43 +121,68 @@ public function setServerAddress(string $serverAddress): void /** * @param QueryInterface $query - * @param int $timeout - * @param bool $debug * - * @return AbstractMapper + * @return RequestInterface */ - public function requestQuery( - QueryInterface $query, - int $timeout = 500000, - bool $debug = false - ): AbstractMapper { + protected function normalizeRequest(QueryInterface $query): RequestInterface + { $uri = new Uri($this->serverAddress); $uri = $uri->withPath($query->uri()); $normalize = UriNormalizer::normalize( $uri, UriNormalizer::REMOVE_DUPLICATE_SLASHES ); - $request = new Request($query->httpMethod(), $normalize); + + return new Request($query->httpMethod(), $normalize); + } + + /** + * @param QueryInterface $query + * @param int $timeout + * @param bool $debug + * + * @return ResultInterface + */ + public function requestQuery( + QueryInterface $query, + int $timeout = 500000, + bool $debug = false + ): ResultInterface { + $request = $this->normalizeRequest($query); try { - $options = $this->getOptions($query, $timeout, $debug); - if ($this->hasUserCredentials) { - $credentials = $this->getAuthCredentials(); - $options = array_merge($options, [ - 'auth' => [$credentials->getUserName(), $credentials->getPassword()], - ]); - } - $response = $this->client->send( - $request, - array_merge($options, $this->options) - ); + $response = $this->sendRequest($query, $timeout, $debug, $request); if ($response->getStatusCode() == StatusCodeInterface::STATUS_OK) { return $query->queryResult($response); } } catch (\GuzzleHttp\Exception\GuzzleException $e) { throw new KsqlRestClientException($e->getMessage(), $e->getCode()); } + return $this->createErrorResult($request->getUri(), $response->getStatusCode()); + } - return new ErrorMapper($response); + /** + * @param UriInterface $uri + * @param int $statusCode + * + * @return ResultInterface + */ + private function createErrorResult( + UriInterface $uri, + int $statusCode + ): ResultInterface { + $errorMessage['error_code'] = $statusCode; + $errorMessage['message'] = "The server returned an unexpected error."; + if ($statusCode === StatusCodeInterface::STATUS_NOT_FOUND) { + $errorMessage['message'] = "Path not found. Path='" . $uri->getPath() . "'. " + . "Check your ksql http url to make sure you are connecting to a ksql server."; + } + if ($statusCode === StatusCodeInterface::STATUS_UNAUTHORIZED) { + $errorMessage['message'] = "Could not authenticate successfully with the supplied credentials."; + } + if ($statusCode === StatusCodeInterface::STATUS_FORBIDDEN) { + $errorMessage['message'] = "You are forbidden from using this cluster."; + } + return new KsqlErrorMapper($errorMessage); } /** @@ -172,9 +198,9 @@ protected function getOptions( bool $debug = false ): array { return [ - 'timeout' => $timeout, - 'body' => json_encode($query->toArray()), - 'debug' => $debug, + RequestOptions::TIMEOUT => $timeout, + RequestOptions::BODY => json_encode($query->toArray()), + RequestOptions::DEBUG => $debug, ]; } @@ -202,4 +228,46 @@ protected function userAgent(): string { return 'PHP-KSQLClient/' . self::VERSION; } + + /** + * @return array + */ + protected function requestHeader(): array + { + return [ + RequestOptions::HEADERS => [ + 'User-Agent' => $this->userAgent(), + 'Accept' => \Istyle\KsqlClient\ClientInterface::REQUEST_ACCEPT, + ], + ]; + } + + /** + * @param QueryInterface $query + * @param int $timeout + * @param bool $debug + * @param RequestInterface $request + * + * @return ResponseInterface + * @throws \GuzzleHttp\Exception\GuzzleException + */ + protected function sendRequest( + QueryInterface $query, + int $timeout, + bool $debug, + RequestInterface $request + ): ResponseInterface { + $options = $this->getOptions($query, $timeout, $debug); + if ($this->hasUserCredentials) { + $credentials = $this->getAuthCredentials(); + $options = array_merge($options, [ + 'auth' => [$credentials->getUserName(), $credentials->getPassword()], + ]); + } + + return $this->client->send( + $request, + array_merge($options, $this->options) + ); + } } diff --git a/src/StreamClient.php b/src/StreamClient.php index bb26434..9af7d6c 100644 --- a/src/StreamClient.php +++ b/src/StreamClient.php @@ -19,44 +19,36 @@ use GuzzleHttp\Client as GuzzleClient; use GuzzleHttp\ClientInterface; +use GuzzleHttp\RequestOptions; use Istyle\KsqlClient\Exception\StreamQueryException; use Istyle\KsqlClient\Query\AbstractStreamQuery; use Istyle\KsqlClient\Query\QueryInterface; -use Istyle\KsqlClient\Mapper\AbstractMapper; +use Istyle\KsqlClient\Mapper\ResultInterface; /** * Class StreamClient */ -final class StreamClient extends RestClient +class StreamClient extends RestClient { /** - * build GuzzleHttp Client - * - * @return ClientInterface + * {@inheritdoc} */ protected function buildClient(): ClientInterface { - return new GuzzleClient([ - 'headers' => [ - 'User-Agent' => $this->userAgent(), - 'Accept' => 'application/json', - ], - 'stream' => true, - ]); + return new GuzzleClient( + array_merge($this->requestHeader(), [ + RequestOptions::STREAM => true, + ])); } /** - * @param QueryInterface $query - * @param int $timeout - * @param bool $debug - * - * @return AbstractMapper + * {@inheritdoc} */ public function requestQuery( QueryInterface $query, int $timeout = 500000, bool $debug = false - ): AbstractMapper { + ): ResultInterface { if ($query instanceof AbstractStreamQuery) { return parent::requestQuery($query, $timeout, $debug); } @@ -64,4 +56,18 @@ public function requestQuery( "You must extends " . AbstractStreamQuery::class ); } + + /** + * @return array + */ + protected function requestHeader(): array + { + return [ + RequestOptions::HEADERS => [ + 'User-Agent' => $this->userAgent(), + 'Accept' => \Istyle\KsqlClient\ClientInterface::REQUEST_ACCEPT, + 'Content-Type' => 'application/json; charset=utf-8', + ], + ]; + } } diff --git a/tests/Entity/FieldInfoTest.php b/tests/Entity/FieldInfoTest.php new file mode 100644 index 0000000..2105d20 --- /dev/null +++ b/tests/Entity/FieldInfoTest.php @@ -0,0 +1,25 @@ +assertInstanceOf(\Istyle\KsqlClient\Entity\EntityInterface::class, $info); + } + + public function testShouldReturnString(): void + { + $info = new FieldInfo('ROWTIME', null); + $this->assertSame("FieldInfo{name='ROWTIME',schema=}", strval($info)); + $info = new FieldInfo('ROWTIME', new SchemaInfo('BIGINT', null, null)); + $this->assertNotSame("FieldInfo{name='ROWTIME',schema=}", $info); + } +} diff --git a/tests/Entity/KsqlErrorMessageTest.php b/tests/Entity/KsqlErrorMessageTest.php new file mode 100644 index 0000000..a08e0da --- /dev/null +++ b/tests/Entity/KsqlErrorMessageTest.php @@ -0,0 +1,16 @@ +assertSame(strval($message), "testing +ksql +test"); + } +} diff --git a/tests/Mapper/KsqlMapperTest.php b/tests/Mapper/KsqlMapperTest.php index 14c4e11..3d651d5 100644 --- a/tests/Mapper/KsqlMapperTest.php +++ b/tests/Mapper/KsqlMapperTest.php @@ -4,7 +4,7 @@ use GuzzleHttp\Psr7\Response; use Istyle\KsqlClient\Mapper\KsqlMapper; use Istyle\KsqlClient\Entity\KsqlCollection; -use Istyle\KsqlClient\Entity\Description; +use Istyle\KsqlClient\Entity\SourceDescriptionEntity; use Istyle\KsqlClient\Entity\RunningQuery; class KsqlMapperTest extends \PHPUnit\Framework\TestCase @@ -15,56 +15,116 @@ public function testShouldReturnDescriptionEntity(): void /** @var KsqlCollection $result */ $result = $mapper->result(); $row = $result->getKsql()[0]; - /** @var Description $row */ - $this->assertInstanceOf(Description::class, $row); - $this->assertEmpty($row->getStatementText()); - $this->assertEmpty($row->getErrorStats()); - $this->assertSame('MESSAGE', $row->getKey()); - $this->assertSame('MSGS', $row->getName()); - $this->assertContainsOnlyInstancesOf(RunningQuery::class, $row->getReadQueries()); - $this->assertSame(0, $row->getPartitions()); - $this->assertSame(0, $row->getReplication()); - /** @var RunningQuery $query */ - $query = $row->getReadQueries()[0]; - $this->assertSame('testing', $query->getId()); - $this->assertSame('', $query->getQueryString()); - $this->assertSame([], $query->getSinks()); + /** @var SourceDescriptionEntity $row */ + $this->assertInstanceOf(SourceDescriptionEntity::class, $row); + $this->assertNotEmpty($row->getStatementText()); + $this->assertEmpty($row->getSourceDescription()->getErrorStats()); + $this->assertSame('USERID', $row->getSourceDescription()->getKey()); + $this->assertSame('USERS', $row->getSourceDescription()->getName()); + $this->assertContainsOnlyInstancesOf( + RunningQuery::class, + $row->getSourceDescription()->getReadQueries() + ); + $this->assertSame(0, $row->getSourceDescription()->getPartitions()); + $this->assertSame(0, $row->getSourceDescription()->getReplication()); } protected function json(): string { return '[ { - "description": { - "statementText": "", - "name": "MSGS", - "readQueries": [{ - "statementText": "", - "sinks": [], - "id": "testing" - }], + "@type": "sourceDescription", + "statementText": "DESCRIBE users;", + "sourceDescription": { + "name": "USERS", + "readQueries": [], "writeQueries": [], - "schema": [ + "fields": [ { "name": "ROWTIME", - "type": "BIGINT" + "schema": { + "type": "BIGINT", + "fields": null, + "memberSchema": null + } + }, + { + "name": "ROWKEY", + "schema": { + "type": "STRING", + "fields": null, + "memberSchema": null + } + }, + { + "name": "REGISTERTIME", + "schema": { + "type": "BIGINT", + "fields": null, + "memberSchema": null + } + }, + { + "name": "GENDER", + "schema": { + "type": "STRING", + "fields": null, + "memberSchema": null + } + }, + { + "name": "REGIONID", + "schema": { + "type": "STRING", + "fields": null, + "memberSchema": null + } + }, + { + "name": "USERID", + "schema": { + "type": "STRING", + "fields": null, + "memberSchema": null + } + }, + { + "name": "INTERESTS", + "schema": { + "type": "ARRAY", + "fields": null, + "memberSchema": { + "type": "STRING", + "fields": null, + "memberSchema": null + } + } + }, + { + "name": "CONTACTINFO", + "schema": { + "type": "MAP", + "fields": null, + "memberSchema": { + "type": "STRING", + "fields": null, + "memberSchema": null + } + } } ], "type": "TABLE", - "key": "MESSAGE", + "key": "USERID", "timestamp": "", "statistics": "", "errorStats": "", "extended": false, - "serdes": "JSON", - "kafkaTopic": "testing", - "topology": "", - "executionPlan": "", - "replication": 0, - "partitions": 0 + "format": "JSON", + "topic": "users", + "partitions": 0, + "replication": 0 } } -] -'; +]'; } } diff --git a/tests/RestClientTest.php b/tests/RestClientTest.php index 1d1b707..d2f4adb 100644 --- a/tests/RestClientTest.php +++ b/tests/RestClientTest.php @@ -5,11 +5,13 @@ use GuzzleHttp\Handler\MockHandler; use GuzzleHttp\HandlerStack; use GuzzleHttp\Psr7\Response; +use Istyle\KsqlClient\Mapper\ResultInterface; use Istyle\KsqlClient\Mapper\AbstractMapper; use Istyle\KsqlClient\Query\{ Ksql, Status }; use Istyle\KsqlClient\RestClient; +use Istyle\KsqlClient\Entity; /** * Class RestClientTest @@ -73,10 +75,10 @@ public function testShouldBeCommandStatusesEntity(): void ); } - public function testShouldBeKsqlEntity(): void + public function testShouldBeDescKsqlEntity(): void { $mock = new MockHandler([ - new Response(200, [], file_get_contents(realpath(__DIR__ . '/resources/ksql.json'))), + new Response(200, [], file_get_contents(realpath(__DIR__ . '/resources/desc.json'))), ]); $client = new RestClient( "http://localhost:8088", @@ -84,7 +86,7 @@ public function testShouldBeKsqlEntity(): void new Client(['handler' => HandlerStack::create($mock)]) ); - $result = $client->requestQuery(new Ksql('SHOW QUERIES;')); + $result = $client->requestQuery(new Ksql('DESCRIBE users;')); $this->assertInstanceOf(AbstractMapper::class, $result); /** @var \Istyle\KsqlClient\Entity\KsqlCollection $entity */ $entity = $result->result(); @@ -93,15 +95,15 @@ public function testShouldBeKsqlEntity(): void $entity ); $this->assertContainsOnlyInstancesOf( - \Istyle\KsqlClient\Entity\AbstractKsql::class, + \Istyle\KsqlClient\Entity\KsqlEntity::class, $entity->getKsql() ); } - public function testShouldBeDescKsqlEntity(): void + public function testShouldBeQueryDescriptionKsqlEntity(): void { $mock = new MockHandler([ - new Response(200, [], file_get_contents(realpath(__DIR__ . '/resources/desc.json'))), + new Response(200, [], file_get_contents(realpath(__DIR__ . '/resources/query_desc.json'))), ]); $client = new RestClient( "http://localhost:8088", @@ -109,7 +111,7 @@ public function testShouldBeDescKsqlEntity(): void new Client(['handler' => HandlerStack::create($mock)]) ); - $result = $client->requestQuery(new Ksql('SHOW QUERIES;')); + $result = $client->requestQuery(new Ksql('EXPLAIN CSAS_STREAM2_0;')); $this->assertInstanceOf(AbstractMapper::class, $result); /** @var \Istyle\KsqlClient\Entity\KsqlCollection $entity */ $entity = $result->result(); @@ -118,9 +120,22 @@ public function testShouldBeDescKsqlEntity(): void $entity ); $this->assertContainsOnlyInstancesOf( - \Istyle\KsqlClient\Entity\AbstractKsql::class, + \Istyle\KsqlClient\Entity\KsqlEntity::class, $entity->getKsql() ); + /** @var Entity\QueryDescriptionEntity $result */ + $result = $entity->getKsql()[0]; + $this->assertInstanceOf(Entity\QueryDescriptionEntity::class, $result); + $description = $result->getQueryDescription(); + $this->assertInstanceOf( + Entity\QueryDescription::class, + $description + ); + $this->assertInstanceOf( + Entity\EntityQueryId::class, + $description->getEntityQueryId() + ); + $this->assertSame('CSAS_STREAM2_0', $description->getEntityQueryId()->getId()); } public function testShouldBeCommandStatusEntity(): void @@ -163,32 +178,20 @@ public function testShouldBeErrorMessageEntity(): void new \Istyle\KsqlClient\Query\Ksql('MESSAGE_STREAM/create') ); $this->assertInstanceOf(AbstractMapper::class, $result); - /** @var \Istyle\KsqlClient\Entity\CommandStatus $entity */ + /** @var \Istyle\KsqlClient\Entity\KsqlStatementErrorMessage $entity */ $entity = $result->result(); $this->assertInstanceOf( - \Istyle\KsqlClient\Entity\KsqlCollection::class, - $entity - ); - /** @var \Istyle\KsqlClient\Entity\KsqlStatementErrorMessage[] $message */ - $message = $entity->getKsql(); - $this->assertContainsOnlyInstancesOf( \Istyle\KsqlClient\Entity\KsqlStatementErrorMessage::class, - $message - ); - $this->assertSame( - 'SELECT NOW();', - $message[0]->getStatementText() - ); - $this->assertSame( - "ServerError:io.confluent.ksql.parser.exception.ParseFailedException\r\nCaused by: null", - trim($message[0]->getErrorMessage()->getMessage()) + $entity ); + $this->assertSame($entity->getMessage(), 'SELECT and PRINT queries must use the /query endpoint'); + $this->assertSame(40002, $entity->getErrorCode()); } public function testShouldReturnErrorResult(): void { $mock = new MockHandler([ - new Response(201, [], file_get_contents(realpath(__DIR__ . '/resources/error.json'))), + new Response(201, [], file_get_contents(realpath(__DIR__ . '/resources/generic_error.json'))), ]); $client = new RestClient( "http://localhost:8088", @@ -199,14 +202,14 @@ public function testShouldReturnErrorResult(): void $result = $client->requestQuery( new \Istyle\KsqlClient\Query\CommandStatus('MESSAGE_STREAM/create') ); - $this->assertInstanceOf(AbstractMapper::class, $result); + $this->assertInstanceOf(ResultInterface::class, $result); /** @var \Istyle\KsqlClient\Entity\KsqlErrorMessage $entity */ $entity = $result->result(); $this->assertInstanceOf( \Istyle\KsqlClient\Entity\KsqlErrorMessage::class, $entity ); - $this->assertSame('HTTP 405 Method Not Allowed', $entity->getMessage()); + $this->assertSame('The server returned an unexpected error.', $entity->getMessage()); } /** @@ -215,7 +218,7 @@ public function testShouldReturnErrorResult(): void public function testShouldThrowClientException(): void { $mock = new MockHandler([ - new Response(405, [], file_get_contents(realpath(__DIR__ . '/resources/error.json'))), + new Response(405, [], file_get_contents(realpath(__DIR__ . '/resources/generic_error.json'))), ]); $client = new RestClient( "http://localhost:8088", @@ -246,9 +249,42 @@ public function testShouldReturnServerInfoEntity(): void \Istyle\KsqlClient\Entity\ServerInfo::class, $entity ); - $this->assertSame('4.1.0', $entity->getVersion()); - $this->assertEmpty($entity->getKafkaClusterId()); - $this->assertEmpty($entity->getKsqlServiceId()); + $this->assertSame('5.0.1', $entity->getVersion()); + $this->assertNotEmpty($entity->getKafkaClusterId()); + $this->assertNotEmpty($entity->getKsqlServiceId()); + } + + public function testShouldReturnTablesEntity(): void + { + $mock = new MockHandler([ + new Response(200, [], file_get_contents(realpath(__DIR__ . '/resources/tables.json'))), + ]); + $client = new RestClient( + "http://localhost:8088", + [], + new Client(['handler' => HandlerStack::create($mock)]) + ); + + $result = $client->requestQuery( + new \Istyle\KsqlClient\Query\Ksql('SHOW TABLES;') + ); + $this->assertInstanceOf(AbstractMapper::class, $result); + /** @var \Istyle\KsqlClient\Entity\KsqlCollection $entity */ + $entity = $result->result(); + $this->assertInstanceOf( + \Istyle\KsqlClient\Entity\KsqlCollection::class, + $entity + ); + /** @var \Istyle\KsqlClient\Entity\TablesList $table */ + $table = $entity->getKsql()[0]; + $this->assertInstanceOf( + \Istyle\KsqlClient\Entity\TablesList::class, + $table + ); + $this->assertContainsOnlyInstancesOf( + \Istyle\KsqlClient\Entity\SourceInfoTable::class, + $table->getSourceInfoList() + ); } public function testCanBeArrayForBasicAuth(): void @@ -312,11 +348,124 @@ public function testShouldReturnKafkaTopics(): void ); /** @var \Istyle\KsqlClient\Entity\KafkaTopicInfo $info */ $info = $list[0]; - $this->assertSame('__confluent.support.metrics', $info->getName()); - $this->assertSame('false', $info->getRegistered()); - $this->assertSame(['1'], $info->getReplicaInfo()); + $this->assertSame('_schemas', $info->getName()); + $this->assertSame(false, $info->getRegistered()); + $this->assertSame([1], $info->getReplicaInfo()); $this->assertSame(0, $info->getConsumerCount()); $this->assertSame(0, $info->getConsumerGroupCount()); + } + + public function testShouldReturnStreamsList(): void + { + $mock = new MockHandler([ + new Response(200, [], file_get_contents(realpath(__DIR__ . '/resources/streamslist.json'))), + ]); + $client = new RestClient( + "http://localhost:8088", + [], + new Client(['handler' => HandlerStack::create($mock)]) + ); + + $result = $client->requestQuery(new \Istyle\KsqlClient\Query\Ksql("LIST STREAMS;")); + $this->assertInstanceOf(AbstractMapper::class, $result); + /** @var \Istyle\KsqlClient\Entity\KsqlCollection $entity */ + $entity = $result->result(); + $this->assertInstanceOf( + \Istyle\KsqlClient\Entity\KsqlCollection::class, + $entity + ); + /** @var \Istyle\KsqlClient\Entity\StreamsList $topic */ + $topic = $entity->getKsql()[0]; + $this->assertInstanceOf( + \Istyle\KsqlClient\Entity\StreamsList::class, + $topic + ); + $list = $topic->getSourceInfoList(); + $this->assertContainsOnly(\Istyle\KsqlClient\Entity\SourceInfo::class, $list); + $this->assertCount(1, $list); + foreach ($list as $row) { + $this->assertSame($row->getTopic(), 'ksql-testing'); + $this->assertSame($row->getName(), 'KSQLTESTING'); + } + } + + public function testShouldReturnQueriesEntity(): void + { + $mock = new MockHandler([ + new Response(200, [], file_get_contents(realpath(__DIR__ . '/resources/queries.json'))), + ]); + $client = new RestClient( + "http://localhost:8088", + [], + new Client(['handler' => HandlerStack::create($mock)]) + ); + + $result = $client->requestQuery( + new \Istyle\KsqlClient\Query\Ksql('LIST QUERIES;') + ); + $this->assertInstanceOf(AbstractMapper::class, $result); + /** @var \Istyle\KsqlClient\Entity\KsqlCollection $entity */ + $entity = $result->result(); + $this->assertInstanceOf( + \Istyle\KsqlClient\Entity\KsqlCollection::class, + $entity + ); + /** @var \Istyle\KsqlClient\Entity\Queries $queries */ + $queries = $entity->getKsql()[0]; + $this->assertInstanceOf( + \Istyle\KsqlClient\Entity\Queries::class, + $queries + ); + $this->assertContainsOnlyInstancesOf( + \Istyle\KsqlClient\Entity\RunningQuery::class, + $queries->getQueries() + ); + } + + public function testShouldReturnPropertiesEntity(): void + { + $mock = new MockHandler([ + new Response(200, [], file_get_contents(realpath(__DIR__ . '/resources/properties.json'))), + ]); + $client = new RestClient( + "http://localhost:8088", + [], + new Client(['handler' => HandlerStack::create($mock)]) + ); + + $result = $client->requestQuery( + new \Istyle\KsqlClient\Query\Ksql('LIST PROPERTIES;') + ); + $this->assertInstanceOf(AbstractMapper::class, $result); + /** @var \Istyle\KsqlClient\Entity\KsqlCollection $entity */ + $entity = $result->result(); + $this->assertInstanceOf( + \Istyle\KsqlClient\Entity\KsqlCollection::class, + $entity + ); + /** @var \Istyle\KsqlClient\Entity\Properties $properties */ + $properties = $entity->getKsql()[0]; + $this->assertInstanceOf( + \Istyle\KsqlClient\Entity\Properties::class, + $properties + ); + } + public function testShouldReturnKsqlErrorMessage(): void + { + $mock = new MockHandler([ + new Response(201, [], file_get_contents(realpath(__DIR__ . '/resources/info.json'))), + ]); + $client = new RestClient( + "http://localhost:8088", + [], + new Client(['handler' => HandlerStack::create($mock)]) + ); + /** @var Entity\KsqlErrorMessage $result */ + $result = $client->requestQuery( + new \Istyle\KsqlClient\Query\ServerInfo() + )->result(); + $this->assertSame($result->getErrorCode(), 201); + $this->assertSame($result->getMessage(), 'The server returned an unexpected error.'); } } diff --git a/tests/resources/desc.json b/tests/resources/desc.json index 2d67fb2..fceccf0 100644 --- a/tests/resources/desc.json +++ b/tests/resources/desc.json @@ -1,36 +1,95 @@ [ { - "description": { - "statementText": "", - "name": "MSGS", + "@type": "sourceDescription", + "statementText": "DESCRIBE users;", + "sourceDescription": { + "name": "USERS", "readQueries": [], "writeQueries": [], - "schema": [ + "fields": [ { "name": "ROWTIME", - "type": "BIGINT" + "schema": { + "type": "BIGINT", + "fields": null, + "memberSchema": null + } }, { "name": "ROWKEY", - "type": "VARCHAR(STRING)" + "schema": { + "type": "STRING", + "fields": null, + "memberSchema": null + } }, { - "name": "MESSAGE", - "type": "VARCHAR(STRING)" + "name": "REGISTERTIME", + "schema": { + "type": "BIGINT", + "fields": null, + "memberSchema": null + } + }, + { + "name": "GENDER", + "schema": { + "type": "STRING", + "fields": null, + "memberSchema": null + } + }, + { + "name": "REGIONID", + "schema": { + "type": "STRING", + "fields": null, + "memberSchema": null + } + }, + { + "name": "USERID", + "schema": { + "type": "STRING", + "fields": null, + "memberSchema": null + } + }, + { + "name": "INTERESTS", + "schema": { + "type": "ARRAY", + "fields": null, + "memberSchema": { + "type": "STRING", + "fields": null, + "memberSchema": null + } + } + }, + { + "name": "CONTACTINFO", + "schema": { + "type": "MAP", + "fields": null, + "memberSchema": { + "type": "STRING", + "fields": null, + "memberSchema": null + } + } } ], "type": "TABLE", - "key": "MESSAGE", + "key": "USERID", "timestamp": "", "statistics": "", "errorStats": "", "extended": false, - "serdes": "JSON", - "kafkaTopic": "message.sender", - "topology": "", - "executionPlan": "", - "replication": 0, - "partitions": 0 + "format": "JSON", + "topic": "users", + "partitions": 0, + "replication": 0 } } ] diff --git a/tests/resources/error.json b/tests/resources/generic_error.json similarity index 58% rename from tests/resources/error.json rename to tests/resources/generic_error.json index 58d4f62..8cabab2 100644 --- a/tests/resources/error.json +++ b/tests/resources/generic_error.json @@ -1,48 +1,56 @@ { + "@type": "generic_error", + "error_code": 40500, "message": "HTTP 405 Method Not Allowed", "stackTrace": [ "org.glassfish.jersey.server.internal.routing.MethodSelectingRouter.getMethodRouter(MethodSelectingRouter.java:466)", "org.glassfish.jersey.server.internal.routing.MethodSelectingRouter.access$000(MethodSelectingRouter.java:94)", "org.glassfish.jersey.server.internal.routing.MethodSelectingRouter$4.apply(MethodSelectingRouter.java:779)", "org.glassfish.jersey.server.internal.routing.MethodSelectingRouter.apply(MethodSelectingRouter.java:371)", - "org.glassfish.jersey.server.internal.routing.RoutingStage._apply(RoutingStage.java:109)", - "org.glassfish.jersey.server.internal.routing.RoutingStage._apply(RoutingStage.java:112)", - "org.glassfish.jersey.server.internal.routing.RoutingStage._apply(RoutingStage.java:112)", - "org.glassfish.jersey.server.internal.routing.RoutingStage.apply(RoutingStage.java:92)", - "org.glassfish.jersey.server.internal.routing.RoutingStage.apply(RoutingStage.java:61)", + "org.glassfish.jersey.server.internal.routing.RoutingStage._apply(RoutingStage.java:110)", + "org.glassfish.jersey.server.internal.routing.RoutingStage._apply(RoutingStage.java:113)", + "org.glassfish.jersey.server.internal.routing.RoutingStage._apply(RoutingStage.java:113)", + "org.glassfish.jersey.server.internal.routing.RoutingStage.apply(RoutingStage.java:93)", + "org.glassfish.jersey.server.internal.routing.RoutingStage.apply(RoutingStage.java:62)", "org.glassfish.jersey.process.internal.Stages.process(Stages.java:197)", - "org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:318)", - "org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)", - "org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)", - "org.glassfish.jersey.internal.Errors.process(Errors.java:315)", - "org.glassfish.jersey.internal.Errors.process(Errors.java:297)", - "org.glassfish.jersey.internal.Errors.process(Errors.java:267)", - "org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317)", - "org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:305)", - "org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1154)", - "org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:473)", - "org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:408)", - "org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:583)", - "org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:524)", - "org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:461)", - "org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652)", - "org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:585)", - "org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)", - "org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)", - "org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)", - "org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)", - "org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)", - "org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)", - "org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)", - "org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)", - "org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159)", - "org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)", - "org.eclipse.jetty.server.Server.handle(Server.java:499)", - "org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:311)", - "org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:258)", - "org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544)", - "org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)", - "org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)", + "org.glassfish.jersey.server.ServerRuntime$1.run(ServerRuntime.java:269)", + "org.glassfish.jersey.internal.Errors$1.call(Errors.java:272)", + "org.glassfish.jersey.internal.Errors$1.call(Errors.java:268)", + "org.glassfish.jersey.internal.Errors.process(Errors.java:316)", + "org.glassfish.jersey.internal.Errors.process(Errors.java:298)", + "org.glassfish.jersey.internal.Errors.process(Errors.java:268)", + "org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:289)", + "org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:256)", + "org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:703)", + "org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:416)", + "org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:409)", + "org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:584)", + "org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:525)", + "org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:462)", + "org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1642)", + "org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:533)", + "org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)", + "org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1595)", + "org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:255)", + "org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1317)", + "org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:203)", + "org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:473)", + "org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1564)", + "org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:201)", + "org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1219)", + "org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:144)", + "org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:126)", + "org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:169)", + "org.eclipse.jetty.server.handler.ContextHandlerCollection.handle(ContextHandlerCollection.java:219)", + "org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:132)", + "org.eclipse.jetty.server.Server.handle(Server.java:531)", + "org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:352)", + "org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:260)", + "org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:281)", + "org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:102)", + "org.eclipse.jetty.io.ChannelEndPoint$2.run(ChannelEndPoint.java:118)", + "org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:762)", + "org.eclipse.jetty.util.thread.QueuedThreadPool$2.run(QueuedThreadPool.java:680)", "java.lang.Thread.run(Thread.java:748)" ] } diff --git a/tests/resources/info.json b/tests/resources/info.json index c6865fc..b66a506 100644 --- a/tests/resources/info.json +++ b/tests/resources/info.json @@ -1,5 +1,7 @@ { "KsqlServerInfo": { - "version": "4.1.0" + "version": "5.0.1", + "kafkaClusterId": "VAW2HC3-Rz2n0dBbjuzYgw", + "ksqlServiceId": "default_" } } diff --git a/tests/resources/kafka_topics.json b/tests/resources/kafka_topics.json index 7c2773a..f4743d0 100644 --- a/tests/resources/kafka_topics.json +++ b/tests/resources/kafka_topics.json @@ -1,169 +1,81 @@ [ { - "kafka_topics": { - "statementText": "SHOW TOPICS;", - "topics": [ - { - "name": "__confluent.support.metrics", - "registered": "false", - "partitionCount": 1, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "_confluent-controlcenter-4-1-0-1-aggregate-topic-partition", - "registered": "false", - "partitionCount": 1, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "_confluent-controlcenter-4-1-0-1-aggregate-topic-partition-changelog", - "registered": "false", - "partitionCount": 1, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "_confluent-controlcenter-4-1-0-1-aggregatedTopicPartitionTableWindows-THREE_HOURS-changelog", - "registered": "false", - "partitionCount": 1, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "_confluent-controlcenter-4-1-0-1-error-topic", - "registered": "false", - "partitionCount": 1, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "_confluent-controlcenter-4-1-0-1-expected-group-consumption-rekey", - "registered": "false", - "partitionCount": 1, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "_confluent-controlcenter-4-1-0-1-group-aggregate-topic-ONE_MINUTE-changelog", - "registered": "false", - "partitionCount": 1, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "_confluent-controlcenter-4-1-0-1-Group-ONE_MINUTE-changelog", - "registered": "false", - "partitionCount": 1, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "_confluent-controlcenter-4-1-0-1-KSTREAM-OUTERTHIS-0000000095-store-changelog", - "registered": "false", - "partitionCount": 1, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "_confluent-controlcenter-4-1-0-1-monitoring-message-rekey", - "registered": "false", - "partitionCount": 1, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "_confluent-controlcenter-4-1-0-1-monitoring-trigger-event-rekey", - "registered": "false", - "partitionCount": 1, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "_confluent-controlcenter-4-1-0-1-MonitoringMessageAggregatorWindows-ONE_MINUTE-changelog", - "registered": "false", - "partitionCount": 1, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "_confluent-controlcenter-4-1-0-1-MonitoringMessageAggregatorWindows-THREE_HOURS-changelog", - "registered": "false", - "partitionCount": 1, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "_confluent-controlcenter-4-1-0-1-MonitoringStream-ONE_MINUTE-changelog", - "registered": "false", - "partitionCount": 1, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "_confluent-ksql-default__command_topic", - "registered": "true", - "partitionCount": 1, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "_schemas", - "registered": "false", - "partitionCount": 1, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "connect-configs", - "registered": "false", - "partitionCount": 1, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "connect-offsets", - "registered": "false", - "partitionCount": 25, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "connect-statuses", - "registered": "false", - "partitionCount": 5, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - }, - { - "name": "ksql__commands", - "registered": "false", - "partitionCount": 1, - "replicaInfo": "1", - "consumerCount": 0, - "consumerGroupCount": 0 - } - ] - } + "@type": "kafka_topics", + "statementText": "SHOW TOPICS;", + "topics": [ + { + "name": "_schemas", + "registered": false, + "replicaInfo": [ + 1 + ], + "consumerCount": 0, + "consumerGroupCount": 0 + }, + { + "name": "connect-configs", + "registered": false, + "replicaInfo": [ + 1 + ], + "consumerCount": 0, + "consumerGroupCount": 0 + }, + { + "name": "connect-offsets", + "registered": false, + "replicaInfo": [ + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1, + 1 + ], + "consumerCount": 0, + "consumerGroupCount": 0 + }, + { + "name": "connect-status", + "registered": false, + "replicaInfo": [ + 1, + 1, + 1, + 1, + 1 + ], + "consumerCount": 0, + "consumerGroupCount": 0 + }, + { + "name": "ksql-testing", + "registered": true, + "replicaInfo": [ + 1 + ], + "consumerCount": 0, + "consumerGroupCount": 0 + } + ] } ] diff --git a/tests/resources/ksql.json b/tests/resources/ksql.json deleted file mode 100644 index 09dda05..0000000 --- a/tests/resources/ksql.json +++ /dev/null @@ -1,8 +0,0 @@ -[ - { - "queries": { - "statementText": "SHOW QUERIES;", - "queries": [] - } - } -] diff --git a/tests/resources/properties.json b/tests/resources/properties.json new file mode 100644 index 0000000..52f7c9b --- /dev/null +++ b/tests/resources/properties.json @@ -0,0 +1,44 @@ +[ + { + "@type": "properties", + "statementText": "SHOW PROPERTIES;", + "properties": { + "ksql.extension.dir": "ext", + "ksql.streams.cache.max.bytes.buffering": "10000000", + "ksql.transient.prefix": "transient_", + "ksql.schema.registry.url": "http://localhost:8081", + "ssl.secure.random.implementation": null, + "ksql.streams.default.deserialization.exception.handler": "io.confluent.ksql.errors.LogMetricAndContinueExceptionHandler", + "ksql.output.topic.name.prefix": "", + "ksql.streams.auto.offset.reset": "latest", + "ksql.sink.partitions": "4", + "ssl.keystore.type": "JKS", + "ssl.trustmanager.algorithm": "PKIX", + "ksql.statestore.suffix": "_ksql_statestore", + "ssl.key.password": null, + "ksql.service.id": "default_", + "ssl.truststore.password": null, + "ssl.endpoint.identification.algorithm": "https", + "ksql.streams.bootstrap.servers": "192.168.0.1:9092", + "ssl.protocol": "TLS", + "ksql.streams.commit.interval.ms": "2000", + "ksql.sink.replicas": "1", + "ssl.provider": null, + "ssl.enabled.protocols": "TLSv1.2,TLSv1.1,TLSv1", + "ssl.keystore.location": null, + "ksql.streams.num.stream.threads": "4", + "ssl.cipher.suites": null, + "ssl.truststore.type": "JKS", + "ksql.udfs.enabled": "true", + "ssl.truststore.location": null, + "ksql.udf.enable.security.manager": "true", + "ssl.keystore.password": null, + "ssl.keymanager.algorithm": "SunX509", + "ksql.streams.application.id": "KSQL_REST_SERVER_DEFAULT_APP_ID", + "ksql.sink.window.change.log.additional.retention": "1000000", + "ksql.udf.collect.metrics": "false", + "ksql.persistent.prefix": "query_" + }, + "overwrittenProperties": [] + } +] diff --git a/tests/resources/queries.json b/tests/resources/queries.json new file mode 100644 index 0000000..87ffc0d --- /dev/null +++ b/tests/resources/queries.json @@ -0,0 +1,15 @@ +[ + { + "@type": "queries", + "statementText": "LIST QUERIES;", + "queries": [ + { + "sinks": [ + "STREAM2" + ], + "id": "CSAS_STREAM2_0", + "queryString": "CREATE STREAM stream2 \tWITH (kafka_topic='output-topic' , value_format='DELIMITED') \tAS SELECT * FROM stream1 WHERE LEN(message) > 2;" + } + ] + } +] diff --git a/tests/resources/query_desc.json b/tests/resources/query_desc.json new file mode 100644 index 0000000..72406f2 --- /dev/null +++ b/tests/resources/query_desc.json @@ -0,0 +1,45 @@ +[ + { + "@type": "queryDescription", + "statementText": "EXPLAIN CSAS_STREAM2_0;", + "queryDescription": { + "id": "CSAS_STREAM2_0", + "statementText": "CREATE STREAM stream2 \tWITH (kafka_topic='output-topic' , value_format='DELIMITED') \tAS SELECT * FROM stream1 WHERE LEN(message) > 2;", + "fields": [ + { + "name": "ROWTIME", + "schema": { + "type": "BIGINT", + "fields": null, + "memberSchema": null + } + }, + { + "name": "ROWKEY", + "schema": { + "type": "STRING", + "fields": null, + "memberSchema": null + } + }, + { + "name": "MESSAGE", + "schema": { + "type": "STRING", + "fields": null, + "memberSchema": null + } + } + ], + "sources": [ + "STREAM1" + ], + "sinks": [ + "STREAM2" + ], + "topology": "Topologies:\n Sub-topology: 0\n Source: KSTREAM-SOURCE-0000000000 (topics: [input-topic])\n --> KSTREAM-MAPVALUES-0000000001\n Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])\n --> KSTREAM-TRANSFORMVALUES-0000000002\n <-- KSTREAM-SOURCE-0000000000\n Processor: KSTREAM-TRANSFORMVALUES-0000000002 (stores: [])\n --> KSTREAM-FILTER-0000000003\n <-- KSTREAM-MAPVALUES-0000000001\n Processor: KSTREAM-FILTER-0000000003 (stores: [])\n --> KSTREAM-MAPVALUES-0000000004\n <-- KSTREAM-TRANSFORMVALUES-0000000002\n Processor: KSTREAM-MAPVALUES-0000000004 (stores: [])\n --> KSTREAM-MAPVALUES-0000000005\n <-- KSTREAM-FILTER-0000000003\n Processor: KSTREAM-MAPVALUES-0000000005 (stores: [])\n --> KSTREAM-SINK-0000000006\n <-- KSTREAM-MAPVALUES-0000000004\n Sink: KSTREAM-SINK-0000000006 (topic: output-topic)\n <-- KSTREAM-MAPVALUES-0000000005\n\n", + "executionPlan": " > [ SINK ] Schema: [ROWTIME : BIGINT, ROWKEY : VARCHAR, MESSAGE : VARCHAR].\n\t\t > [ PROJECT ] Schema: [ROWTIME : BIGINT, ROWKEY : VARCHAR, MESSAGE : VARCHAR].\n\t\t\t\t > [ FILTER ] Schema: [STREAM1.ROWTIME : BIGINT, STREAM1.ROWKEY : VARCHAR, STREAM1.MESSAGE : VARCHAR].\n\t\t\t\t\t\t > [ SOURCE ] Schema: [STREAM1.ROWTIME : BIGINT, STREAM1.ROWKEY : VARCHAR, STREAM1.MESSAGE : VARCHAR].\n", + "overriddenProperties": {} + } + } +] diff --git a/tests/resources/statement_error.json b/tests/resources/statement_error.json index 8278f95..c458d42 100644 --- a/tests/resources/statement_error.json +++ b/tests/resources/statement_error.json @@ -1,59 +1,8 @@ -[ - { - "error": { - "statementText": "SELECT NOW();", - "errorMessage": { - "message": " ServerError:io.confluent.ksql.parser.exception.ParseFailedException\r\nCaused by: null", - "stackTrace": [ - "io.confluent.ksql.parser.KsqlParser.buildAst(KsqlParser.java:62)", - "io.confluent.ksql.KsqlEngine.getStatements(KsqlEngine.java:431)", - "io.confluent.ksql.rest.server.resources.KsqlResource.handleKsqlStatements(KsqlResource.java:135)", - "sun.reflect.GeneratedMethodAccessor8.invoke(Unknown Source)", - "sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)", - "java.lang.reflect.Method.invoke(Method.java:498)", - "org.glassfish.jersey.server.model.internal.ResourceMethodInvocationHandlerFactory$1.invoke(ResourceMethodInvocationHandlerFactory.java:81)", - "org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher$1.run(AbstractJavaResourceMethodDispatcher.java:144)", - "org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.invoke(AbstractJavaResourceMethodDispatcher.java:161)", - "org.glassfish.jersey.server.model.internal.JavaResourceMethodDispatcherProvider$ResponseOutInvoker.doDispatch(JavaResourceMethodDispatcherProvider.java:160)", - "org.glassfish.jersey.server.model.internal.AbstractJavaResourceMethodDispatcher.dispatch(AbstractJavaResourceMethodDispatcher.java:99)", - "org.glassfish.jersey.server.model.ResourceMethodInvoker.invoke(ResourceMethodInvoker.java:389)", - "org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:347)", - "org.glassfish.jersey.server.model.ResourceMethodInvoker.apply(ResourceMethodInvoker.java:102)", - "org.glassfish.jersey.server.ServerRuntime$2.run(ServerRuntime.java:326)", - "org.glassfish.jersey.internal.Errors$1.call(Errors.java:271)", - "org.glassfish.jersey.internal.Errors$1.call(Errors.java:267)", - "org.glassfish.jersey.internal.Errors.process(Errors.java:315)", - "org.glassfish.jersey.internal.Errors.process(Errors.java:297)", - "org.glassfish.jersey.internal.Errors.process(Errors.java:267)", - "org.glassfish.jersey.process.internal.RequestScope.runInScope(RequestScope.java:317)", - "org.glassfish.jersey.server.ServerRuntime.process(ServerRuntime.java:305)", - "org.glassfish.jersey.server.ApplicationHandler.handle(ApplicationHandler.java:1154)", - "org.glassfish.jersey.servlet.WebComponent.serviceImpl(WebComponent.java:473)", - "org.glassfish.jersey.servlet.ServletContainer.serviceImpl(ServletContainer.java:408)", - "org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:583)", - "org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:524)", - "org.glassfish.jersey.servlet.ServletContainer.doFilter(ServletContainer.java:461)", - "org.eclipse.jetty.servlet.ServletHandler$CachedChain.doFilter(ServletHandler.java:1652)", - "org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:585)", - "org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:221)", - "org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1127)", - "org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:515)", - "org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:185)", - "org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1061)", - "org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)", - "org.eclipse.jetty.server.handler.HandlerCollection.handle(HandlerCollection.java:110)", - "org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)", - "org.eclipse.jetty.server.handler.StatisticsHandler.handle(StatisticsHandler.java:159)", - "org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:97)", - "org.eclipse.jetty.server.Server.handle(Server.java:499)", - "org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:311)", - "org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:258)", - "org.eclipse.jetty.io.AbstractConnection$2.run(AbstractConnection.java:544)", - "org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:635)", - "org.eclipse.jetty.util.thread.QueuedThreadPool$3.run(QueuedThreadPool.java:555)", - "java.lang.Thread.run(Thread.java:748)" - ] - } - } - } -] +{ + "@type": "statement_error", + "error_code": 40002, + "message": "SELECT and PRINT queries must use the /query endpoint", + "stackTrace": [], + "statementText": "SELECT * FROM ksqltesting;", + "entities": [] +} diff --git a/tests/resources/streamslist.json b/tests/resources/streamslist.json new file mode 100644 index 0000000..ec2a115 --- /dev/null +++ b/tests/resources/streamslist.json @@ -0,0 +1,14 @@ +[ + { + "@type": "streams", + "statementText": "LIST STREAMS;", + "streams": [ + { + "type": "STREAM", + "name": "KSQLTESTING", + "topic": "ksql-testing", + "format": "DELIMITED" + } + ] + } +] diff --git a/tests/resources/tables.json b/tests/resources/tables.json new file mode 100644 index 0000000..ddb697b --- /dev/null +++ b/tests/resources/tables.json @@ -0,0 +1,15 @@ +[ + { + "@type": "tables", + "statementText": "SHOW TABLES;", + "tables": [ + { + "type": "TABLE", + "name": "USERS", + "topic": "users", + "format": "JSON", + "isWindowed": false + } + ] + } +]