Skip to content

Commit

Permalink
refactor methods and param names
Browse files Browse the repository at this point in the history
  • Loading branch information
mabulgu committed Jul 7, 2020
1 parent 837580d commit f2fddac
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 97 deletions.
165 changes: 91 additions & 74 deletions kfk/topics_command.py
Expand Up @@ -15,101 +15,118 @@
@click.option('--delete-config', help='A topic configuration override to be removed for an existing topic',
multiple=True)
@click.option('--config', help='A topic configuration override for the topic being created or altered.', multiple=True)
@click.option('--alter', help='Alter the number of partitions, replica assignment, and/or configuration of the topic.',
@click.option('--alter', 'is_alter', help='Alter the number of partitions, replica assignment, and/or configuration of the topic.',
is_flag=True)
@click.option('--delete', help='Delete a topic.', is_flag=True)
@click.option('--delete', 'is_delete', help='Delete a topic.', is_flag=True)
@click.option('--native', help='List details for the given topic natively.', is_flag=True, cls=RequiredIf,
required_if=['describe'])
required_if=['is_describe'])
@click.option('-o', '--output',
help='Output format. One of: json|yaml|name|go-template|go-template-file|template|templatefile|jsonpath'
'|jsonpath-file.')
@click.option('--describe', help='List details for the given topic.', is_flag=True)
@click.option('--describe', 'is_describe', help='List details for the given topic.', is_flag=True)
@click.option('--replication-factor', help='The replication factor for each partition in the topic being created.',
cls=RequiredIf, required_if=['create'])
cls=RequiredIf, required_if=['is_create'])
@click.option('--partitions', help='The number of partitions for the topic being created or altered ', cls=RequiredIf,
required_if=['create'])
@click.option('--create', help='Create a new topic.', is_flag=True)
@click.option('--list', help='List all available topics.', is_flag=True)
@click.option('--topic', help='Topic Name', required=True, cls=NotRequiredIf, not_required_if='list')
required_if=['is_create'])
@click.option('--create', 'is_create', help='Create a new topic.', is_flag=True)
@click.option('--list', 'is_list', help='List all available topics.', is_flag=True)
@click.option('--topic', help='Topic Name', required=True, cls=NotRequiredIf, not_required_if='is_list')
@kfk.command()
def topics(topic, list, create, partitions, replication_factor, describe, output, native, delete, alter, config,
def topics(topic, is_list, is_create, partitions, replication_factor, is_describe, output, native, is_delete, is_alter, config,
delete_config, cluster, namespace):
"""The kafka topic(s) to be created, altered or described."""
if list:
if is_list:
list(cluster, namespace)
elif is_create:
create(topic, partitions, replication_factor, config, cluster, namespace)
elif is_describe:
describe(topic, output, native, cluster, namespace)
elif is_delete:
delete(topic, cluster, namespace)
elif is_alter:
alter(topic, partitions, replication_factor, config, delete_config, cluster, namespace)
else:
print_missing_options_for_command("topics")


def list(cluster, namespace):
os.system(
Kubectl().get().kafkatopics().label("strimzi.io/cluster={cluster}").namespace(namespace).build().format(
cluster=cluster))


def create(topic, partitions, replication_factor, config, cluster, namespace):
with open('{strimzi_path}/examples/topic/kafka-topic.yaml'.format(strimzi_path=STRIMZI_PATH).format(
version=STRIMZI_VERSION)) as file:
topic_dict = yaml.full_load(file)

topic_dict["metadata"]["name"] = topic
topic_dict["metadata"]["labels"]["strimzi.io/cluster"] = cluster
topic_dict["spec"]["partitions"] = int(partitions)
topic_dict["spec"]["replicas"] = int(replication_factor)

if len(config) > 0:
if topic_dict["spec"].get("config") is None:
topic_dict["spec"]["config"] = {}
add_resource_kv_config(config, topic_dict["spec"]["config"])

topic_yaml = yaml.dump(topic_dict)
topic_temp_file = create_temp_file(topic_yaml)
os.system(
Kubectl().get().kafkatopics().label("strimzi.io/cluster={cluster}").namespace(namespace).build().format(
cluster=cluster))
elif create:
with open('{strimzi_path}/examples/topic/kafka-topic.yaml'.format(strimzi_path=STRIMZI_PATH).format(
version=STRIMZI_VERSION)) as file:
topic_dict = yaml.full_load(file)

topic_dict["metadata"]["name"] = topic
topic_dict["metadata"]["labels"]["strimzi.io/cluster"] = cluster
topic_dict["spec"]["partitions"] = int(partitions)
topic_dict["spec"]["replicas"] = int(replication_factor)
Kubectl().create().from_file("{topic_temp_file_path}").namespace(namespace).build().format(
topic_temp_file_path=topic_temp_file.name))
topic_temp_file.close()

if len(config) > 0:
if topic_dict["spec"].get("config") is None:
topic_dict["spec"]["config"] = {}
add_resource_kv_config(config, topic_dict["spec"]["config"])

topic_yaml = yaml.dump(topic_dict)
topic_temp_file = create_temp_file(topic_yaml)
def describe(topic, output, native, cluster, namespace):
if output is not None:
if resource_exists("kafkatopics", topic, cluster, namespace):
os.system(
Kubectl().create().from_file("{topic_temp_file_path}").namespace(namespace).build().format(
topic_temp_file_path=topic_temp_file.name))
topic_temp_file.close()

elif describe:
if output is not None:
if resource_exists("kafkatopics", topic, cluster, namespace):
os.system(
Kubectl().get().kafkatopics(topic).namespace(namespace).output(output).build())
Kubectl().get().kafkatopics(topic).namespace(namespace).output(output).build())
else:
if native:
native_command = "bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic {topic}"
os.system(
Kubectl().exec("-it", "{cluster}-kafka-0").container("kafka").namespace(namespace).exec_command(
native_command).build().format(topic=topic, cluster=cluster))
else:
if native:
native_command = "bin/kafka-topics.sh --bootstrap-server localhost:9092 --describe --topic {topic}"
if resource_exists("kafkatopics", topic, cluster, namespace):
os.system(
Kubectl().exec("-it", "{cluster}-kafka-0").container("kafka").namespace(namespace).exec_command(
native_command).build().format(topic=topic, cluster=cluster))
else:
if resource_exists("kafkatopics", topic, cluster, namespace):
os.system(
Kubectl().describe().kafkatopics(topic).namespace(namespace).build())

elif delete:
if resource_exists("kafkatopics", topic, cluster, namespace):
os.system(Kubectl().delete().kafkatopics(topic).namespace(namespace).build())
Kubectl().describe().kafkatopics(topic).namespace(namespace).build())


def delete(topic, cluster, namespace):
if resource_exists("kafkatopics", topic, cluster, namespace):
os.system(Kubectl().delete().kafkatopics(topic).namespace(namespace).build())

elif alter:
if resource_exists("kafkatopics", topic, cluster, namespace):
file = get_resource_as_file("kafkatopics", topic, namespace)
topic_dict = yaml.full_load(file)

if partitions is not None:
topic_dict["spec"]["partitions"] = int(partitions)
def alter(topic, partitions, replication_factor, config, delete_config, cluster, namespace):
if resource_exists("kafkatopics", topic, cluster, namespace):
file = get_resource_as_file("kafkatopics", topic, namespace)
topic_dict = yaml.full_load(file)

if replication_factor is not None:
topic_dict["spec"]["replicas"] = int(replication_factor)
if partitions is not None:
topic_dict["spec"]["partitions"] = int(partitions)

delete_last_applied_configuration(topic_dict)
if replication_factor is not None:
topic_dict["spec"]["replicas"] = int(replication_factor)

if len(config) > 0:
if topic_dict["spec"].get("config") is None:
topic_dict["spec"]["config"] = {}
add_resource_kv_config(config, topic_dict["spec"]["config"])
delete_last_applied_configuration(topic_dict)

if len(delete_config) > 0:
if topic_dict["spec"].get("config") is not None:
delete_resource_config(delete_config, topic_dict["spec"]["config"])
if len(config) > 0:
if topic_dict["spec"].get("config") is None:
topic_dict["spec"]["config"] = {}
add_resource_kv_config(config, topic_dict["spec"]["config"])

topic_yaml = yaml.dump(topic_dict)
topic_temp_file = create_temp_file(topic_yaml)
os.system(
Kubectl().apply().from_file("{topic_temp_file_path}").namespace(namespace).build().format(
topic_temp_file_path=topic_temp_file.name))
topic_temp_file.close()
else:
print_resource_found_msg(cluster, namespace)
if len(delete_config) > 0:
if topic_dict["spec"].get("config") is not None:
delete_resource_config(delete_config, topic_dict["spec"]["config"])

topic_yaml = yaml.dump(topic_dict)
topic_temp_file = create_temp_file(topic_yaml)
os.system(
Kubectl().apply().from_file("{topic_temp_file_path}").namespace(namespace).build().format(
topic_temp_file_path=topic_temp_file.name))
topic_temp_file.close()
else:
print_missing_options_for_command("topics")
print_resource_found_msg(cluster, namespace)
47 changes: 24 additions & 23 deletions kfk/users_command.py
Expand Up @@ -26,44 +26,45 @@
@click.option('--add-acl', help='Add ACL to User', is_flag=True)
@click.option('--authorization-type', help='Authorization type for user',
type=click.Choice(['simple'], case_sensitive=True))
@click.option('--alter', help='Alter authentication-type, quotas, etc. of the user.', is_flag=True)
@click.option('--delete', help='Delete a user.', is_flag=True)
@click.option('--alter', 'is_alter', help='Alter authentication-type, quotas, etc. of the user.', is_flag=True)
@click.option('--delete', 'is_delete', help='Delete a user.', is_flag=True)
@click.option('-o', '--output',
help='Output format. One of: json|yaml|name|go-template|go-template-file|template|templatefile|jsonpath'
'|jsonpath-file.')
@click.option('--describe', help='List details for the given user.', is_flag=True)
@click.option('--describe', 'is_describe', help='List details for the given user.', is_flag=True)
@click.option('--authentication-type', help='Authentication type for user',
type=click.Choice(['tls', 'scram-sha-512'], case_sensitive=True), cls=RequiredIf, required_if=['create'])
@click.option('--create', help='Create a new user.', is_flag=True)
@click.option('--list', help='List all available users.', is_flag=True)
@click.option('--user', help='User Name', required=True, cls=NotRequiredIf, not_required_if='list')
type=click.Choice(['tls', 'scram-sha-512'], case_sensitive=True), cls=RequiredIf, required_if=['is_create'])
@click.option('--create', 'is_create', help='Create a new user.', is_flag=True)
@click.option('--list', 'is_list', help='List all available users.', is_flag=True)
@click.option('--user', help='User Name', required=True, cls=NotRequiredIf, not_required_if='is_list')
@kfk.command()
def users(user, list, create, authentication_type, describe, output, delete, alter, authorization_type, add_acl,
def users(user, is_list, is_create, authentication_type, is_describe, output, is_delete, is_alter, authorization_type, add_acl,
delete_acl, operation, host, resource_type, resource_name, resource_pattern_type, quota, delete_quota,
cluster, namespace):
"""The kafka user(s) to be created, altered or described."""
if list:
list_option(cluster, namespace)
elif create:
create_option(user, authentication_type, quota, cluster, namespace)
elif describe:
describe_option(user, output, cluster, namespace)
elif delete:
delete_option(cluster, namespace, user)
elif alter:
alter_option(user, authentication_type, authorization_type, add_acl, delete_acl, operation, host, resource_type,
if is_list:
list(cluster, namespace)

elif is_create:
create(user, authentication_type, quota, cluster, namespace)
elif is_describe:
describe(user, output, cluster, namespace)
elif is_delete:
delete(cluster, namespace, user)
elif is_alter:
alter(user, authentication_type, authorization_type, add_acl, delete_acl, operation, host, resource_type,
resource_name, resource_pattern_type, quota, delete_quota, cluster, namespace)
else:
print_missing_options_for_command("users")


def list_option(cluster, namespace):
def list(cluster, namespace):
os.system(
Kubectl().get().kafkausers().label("strimzi.io/cluster={cluster}").namespace(namespace).build().format(
cluster=cluster))


def create_option(user, authentication_type, quota, cluster, namespace):
def create(user, authentication_type, quota, cluster, namespace):
with open('{strimzi_path}/examples/user/kafka-user.yaml'.format(strimzi_path=STRIMZI_PATH).format(
version=STRIMZI_VERSION)) as file:
user_dict = yaml.full_load(file)
Expand All @@ -87,7 +88,7 @@ def create_option(user, authentication_type, quota, cluster, namespace):
user_temp_file.close()


def describe_option(user, output, cluster, namespace):
def describe(user, output, cluster, namespace):
if output is not None:
if resource_exists("kafkausers", user, cluster, namespace):
os.system(Kubectl().get().kafkausers(user).namespace(namespace).output(output).build())
Expand All @@ -96,12 +97,12 @@ def describe_option(user, output, cluster, namespace):
os.system(Kubectl().describe().kafkausers(user).namespace(namespace).build())


def delete_option(cluster, namespace, user):
def delete(cluster, namespace, user):
if resource_exists("kafkausers", user, cluster, namespace):
os.system(Kubectl().delete().kafkausers(user).namespace(namespace).build())


def alter_option(user, authentication_type, authorization_type, add_acl, delete_acl, operation, host, resource_type,
def alter(user, authentication_type, authorization_type, add_acl, delete_acl, operation, host, resource_type,
resource_name, resource_pattern_type, quota, delete_quota, cluster, namespace):
if resource_exists("kafkausers", user, cluster, namespace):
file = get_resource_as_file("kafkausers", user, namespace)
Expand Down

0 comments on commit f2fddac

Please sign in to comment.