Skip to content

Commit

Permalink
Update Kafka protocol support through 1.0 (#84)
Browse files Browse the repository at this point in the history
* Add new response classes, excluding fetch, produce, and transaction support

* Simplify schema test for response classes

* Clean up some inheritance for existing requests

* Add the ability to have requests that are not exposed in the CLI tool

* Add additional error codes

* Rename v1 of the GroupCoordinator response to FindCoordinator

* Remove support for controller-only requests from the CLI

* Additional changes for renaming FindCoordinator

* Add new requests through 1.0

* Add tests for new requests
  • Loading branch information
toddpalino committed Jan 11, 2018
1 parent 79639e5 commit 55993ff
Show file tree
Hide file tree
Showing 133 changed files with 3,344 additions and 609 deletions.
3 changes: 3 additions & 0 deletions kafka/tools/protocol/__main__.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
def _get_request_classes():
request_classes = {}
for cls in get_modules(kafka.tools.protocol.requests, kafka.tools.protocol.requests.BaseRequest):
if not cls.supports_cli:
continue

if cls.cmd.lower() not in request_classes:
request_classes[cls.cmd.lower()] = {}
request_classes[cls.cmd.lower()][cls.api_version] = cls
Expand Down
48 changes: 47 additions & 1 deletion kafka/tools/protocol/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,53 @@
'long': "This is not the correct controller for this cluster."},
42: {'short': 'INVALID_REQUEST',
'long': "This most likely occurs because of a request being malformed by the client library or" +
" the message was sent to an incompatible broker. See the broker logs for more details."}}
" the message was sent to an incompatible broker. See the broker logs for more details."},
43: {'short': 'UNSUPPORTED_FOR_MESSAGE_FORMAT',
'long': "The message format version on the broker does not support the request."},
44: {'short': 'POLICY_VIOLATION',
'long': "Request parameters do not satisfy the configured policy."},
45: {'short': 'OUT_OF_ORDER_SEQUENCE_NUMBER',
'long': "The broker received an out of order sequence number"},
46: {'short': 'DUPLICATE_SEQUENCE_NUMBER',
'long': "The broker received a duplicate sequence number"},
47: {'short': 'INVALID_PRODUCER_EPOCH',
'long': "Producer attempted an operation with an old epoch. Either there is a newer producer " +
"with the same transactionalId, or the producer's transaction has been expired by the broker."},
48: {'short': 'INVALID_TXN_STATE',
'long': "The producer attempted a transactional operation in an invalid state"},
49: {'short': 'INVALID_PRODUCER_ID_MAPPING',
'long': "The producer attempted to use a producer id which is not currently assigned to its transactional id"},
50: {'short': 'INVALID_TRANSACTION_TIMEOUT',
'long': "The transaction timeout is larger than the maximum value allowed by " +
"the broker (as configured by max.transaction.timeout.ms)."},
51: {'short': 'CONCURRENT_TRANSACTIONS',
'long': "The producer attempted to update a transaction " +
"while another concurrent operation on the same transaction was ongoing"},
52: {'short': 'TRANSACTION_COORDINATOR_FENCED',
'long': "Indicates that the transaction coordinator sending a WriteTxnMarker " +
"is no longer the current coordinator for a given producer"},
53: {'short': 'TRANSACTIONAL_ID_AUTHORIZATION_FAILED',
'long': "Transactional Id authorization failed"},
54: {'short': 'SECURITY_DISABLED',
'long': "Security features are disabled."},
55: {'short': 'OPERATION_NOT_ATTEMPTED',
'long': "The broker did not attempt to execute this operation. This may happen for batched RPCs " +
"where some operations in the batch failed, causing the broker to respond without trying the rest."},
56: {'short': 'KAFKA_STORAGE_ERROR',
'long': "Disk error when trying to access log file on the disk."},
57: {'short': 'LOG_DIR_NOT_FOUND',
'long': "The user-specified log directory is not found in the broker config."},
58: {'short': 'SASL_AUTHENTICATION_FAILED',
'long': "SASL Authentication failed."},
59: {'short': 'UNKNOWN_PRODUCER_ID',
'long': "This exception is raised by the broker if it could not locate the producer metadata " +
"associated with the producerId in question. This could happen if, for instance, the producer's records " +
"were deleted because their retention time had elapsed. Once the last records of the producerId are " +
"removed, the producer's metadata is removed from the broker, and future appends by the producer will " +
"return this exception."},
60: {'short': 'REASSIGNMENT_IN_PROGRESS',
'long': "A partition reassignment is in progress"},
}


def error_short(err_num):
Expand Down
6 changes: 5 additions & 1 deletion kafka/tools/protocol/requests/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,14 +130,18 @@ def response(self):
def help_string(self):
raise NotImplementedError

# By default, requests are not used in the CLI
supports_cli = False

def __init__(self, value):
_evaluate_sequence(value, self.schema)
self._request = value

def encode(self, buf):
_encode_sequence(self._request, self.schema, buf)

@abc.abstractmethod
# This is not an abstract method because requests that are not supported in the CLI will not override it
@classmethod
def process_arguments(cls, cmd_args):
"""IMPORTANT: this is class method, override it with @classmethod!"""
raise NotImplementedError
Expand Down
44 changes: 44 additions & 0 deletions kafka/tools/protocol/requests/alter_configs_v0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from kafka.tools.protocol.requests import BaseRequest
from kafka.tools.protocol.responses.alter_configs_v0 import AlterConfigsV0Response


class AlterConfigsV0Request(BaseRequest):
api_key = 33
api_version = 0
response = AlterConfigsV0Response

cmd = "AlterConfigs"
help_string = ''

schema = [
{'name': 'resources',
'type': 'array',
'item_type': [
{'name': 'resource_type', 'type': 'int8'},
{'name': 'resource_name', 'type': 'string'},
{'name': 'config_entries',
'type': 'array',
'item_type': [
{'name': 'config_name', 'type': 'string'},
{'name': 'config_value', 'type': 'string'},
]},
]},
{'name': 'validate_only', 'type': 'boolean'}
]
42 changes: 42 additions & 0 deletions kafka/tools/protocol/requests/alter_replica_log_dirs_v0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from kafka.tools.protocol.requests import BaseRequest
from kafka.tools.protocol.responses.alter_replica_log_dirs_v0 import AlterReplicaLogDirsV0Response


class AlterReplicaLogDirsV0Request(BaseRequest):
api_key = 34
api_version = 0
response = AlterReplicaLogDirsV0Response

cmd = "AlterReplicaLogDirs"
help_string = ''

schema = [
{'name': 'log_dirs',
'type': 'array',
'item_type': [
{'name': 'log_dir', 'type': 'string'},
{'name': 'topics',
'type': 'array',
'item_type': [
{'name': 'topic', 'type': 'string'},
{'name': 'partitions', 'type': 'array', 'item_type': 'int32'}
]},
]},
]
1 change: 1 addition & 0 deletions kafka/tools/protocol/requests/api_versions_v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ class ApiVersionsV0Request(BaseRequest):
cmd = "ApiVersions"
response = ApiVersionsV0Response

supports_cli = True
help_string = ("Request: {0}V{1}\n".format(cmd, api_version) +
"Format: {0}V{1}\n".format(cmd, api_version) +
"Description: API versions supported by the broker.\n")
Expand Down
24 changes: 24 additions & 0 deletions kafka/tools/protocol/requests/api_versions_v1.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from kafka.tools.protocol.requests.api_versions_v0 import ApiVersionsV0Request
from kafka.tools.protocol.responses.api_versions_v1 import ApiVersionsV1Response


class ApiVersionsV1Request(ApiVersionsV0Request):
api_version = 1
response = ApiVersionsV1Response
33 changes: 33 additions & 0 deletions kafka/tools/protocol/requests/controlled_shutdown_v0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.


from kafka.tools.protocol.requests import BaseRequest
from kafka.tools.protocol.responses.controlled_shutdown_v0 import ControlledShutdownV0Response


class ControlledShutdownV0Request(BaseRequest):
api_key = 7
api_version = 0
cmd = "ControlledShutdown"
response = ControlledShutdownV0Response

help_string = ''

schema = [
{'name': 'broker_id', 'type': 'int32'}
]
23 changes: 2 additions & 21 deletions kafka/tools/protocol/requests/controlled_shutdown_v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,29 +15,10 @@
# specific language governing permissions and limitations
# under the License.

import six

from kafka.tools.protocol.requests import BaseRequest, ArgumentError
from kafka.tools.protocol.requests.controlled_shutdown_v0 import ControlledShutdownV0Request
from kafka.tools.protocol.responses.controlled_shutdown_v1 import ControlledShutdownV1Response


class ControlledShutdownV1Request(BaseRequest):
api_key = 7
class ControlledShutdownV1Request(ControlledShutdownV0Request):
api_version = 1
cmd = "ControlledShutdown"
response = ControlledShutdownV1Response

help_string = ("Request: {0}V{1}\n".format(cmd, api_version) +
"Format: {0}V{1} broker_id\n".format(cmd, api_version) +
"Description: Request controlled shutdown for the specified broker ID\n")

schema = [
{'name': 'broker_id', 'type': 'int32'}
]

@classmethod
def process_arguments(cls, cmd_args):
if (len(cmd_args) != 1) or (not isinstance(cmd_args[0], six.integer_types)):
raise ArgumentError("ControlledShutdown requires exactly one argument that is a number")

return {'broker_id': cmd_args[0]}
41 changes: 41 additions & 0 deletions kafka/tools/protocol/requests/create_acls_v0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from kafka.tools.protocol.requests import BaseRequest
from kafka.tools.protocol.responses.create_acls_v0 import CreateAclsV0Response


class CreateAclsV0Request(BaseRequest):
api_key = 30
api_version = 0
response = CreateAclsV0Response

cmd = "CreateAcls"
help_string = ''

schema = [
{'name': 'creations',
'type': 'array',
'item_type': [
{'name': 'resource_type', 'type': 'int8'},
{'name': 'resource_name', 'type': 'string'},
{'name': 'principal', 'type': 'string'},
{'name': 'host', 'type': 'string'},
{'name': 'operation', 'type': 'int8'},
{'name': 'permission_type', 'type': 'int8'},
]},
]
44 changes: 44 additions & 0 deletions kafka/tools/protocol/requests/create_partitions_v0.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.

from kafka.tools.protocol.requests import BaseRequest
from kafka.tools.protocol.responses.create_partitions_v0 import CreatePartitionsV0Response


class CreatePartitionsV0Request(BaseRequest):
api_key = 37
api_version = 0
response = CreatePartitionsV0Response

cmd = "CreatePartitions"
help_string = ''

schema = [
{'name': 'topic_partitions',
'type': 'array',
'item_type': [
{'name': 'topic', 'type': 'string'},
{'name': 'count', 'type': 'int32'},
{'name': 'replicas',
'type': 'array',
'item_type': [
{'name': 'broker_id', 'type': 'array', 'item_type': 'int32'},
]},
]},
{'name': 'timeout', 'type': 'int32'},
{'name': 'validate_only', 'type': 'boolean'},
]
1 change: 1 addition & 0 deletions kafka/tools/protocol/requests/create_topics_v0.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class CreateTopicsV0Request(BaseRequest):
cmd = "CreateTopics"
response = CreateTopicsV0Response

supports_cli = True
help_string = ("Request: {0}V{1}\n".format(cmd, api_version) +
"Format: {0}V{1} timeout (topic_name,num_partitions,replication_factor[,config=value]... )...\n".format(cmd, api_version) +
" {0}V{1} timeout (topic_name,(partition_id=replica_id[|replica_id]...)...[,config=value]... )...\n".format(cmd, api_version) +
Expand Down

0 comments on commit 55993ff

Please sign in to comment.