-
Notifications
You must be signed in to change notification settings - Fork 16
/
createregistry.py
116 lines (101 loc) · 3.74 KB
/
createregistry.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
"""Kopf handler for the creation of a StrimziSchemaRegistry.
"""
import kopf
from ..k8s import create_k8sclient, get_deployment, get_service, get_secret
from ..certprocessor import create_secret
from ..deployments import (get_cluster_tls_listener, create_deployment,
create_service)
from .. import state
@kopf.on.create('roundtable.lsst.codes', 'v1beta1', 'strimzischemaregistries')
def create_registry(spec, meta, namespace, name, uid, logger, body, **kwargs):
"""Handle creation of a StrimziSchemaRegistry resource by deploying a
new Schema Registry.
"""
logger.info(f'Creating a new registry deployment: "{name}"')
k8s_client = create_k8sclient()
k8s_apps_v1_api = k8s_client.AppsV1Api()
k8s_cr_api = k8s_client.CustomObjectsApi()
k8s_core_v1_api = k8s_client.CoreV1Api()
# Pull the KafkaUser resource so we can get the cluster name
kafkauser = k8s_cr_api.get_namespaced_custom_object(
group='kafka.strimzi.io',
version='v1beta1',
namespace=namespace,
plural='kafkausers',
name=name # assume StrimziSchemaRegistry name matches
)
cluster_name = kafkauser['metadata']['labels']['strimzi.io/cluster']
# Pull the Kafka resource so we can get the listener
kafka = k8s_cr_api.get_namespaced_custom_object(
group='kafka.strimzi.io',
version='v1beta1',
namespace=namespace,
plural='kafkas',
name=cluster_name
)
bootstrap_server = get_cluster_tls_listener(kafka)
# Create the JKS-formatted truststore/keystore secrets
secret = create_secret(
kafka_username=name, # assume the StrimziSchemaRegistry name matches
namespace=namespace,
cluster=cluster_name,
owner=body,
k8s_client=k8s_client,
logger=logger
)
secret_name = secret['metadata']['name']
# Get the secret so now it has the resourceVersion metadata
secret_body = get_secret(
name=secret_name,
namespace=namespace,
k8s_client=k8s_client)
secret_version = secret_body['metadata']['resourceVersion']
deployment_exists = False
service_exists = False
try:
get_deployment(
name=name,
namespace=namespace,
k8s_client=k8s_client)
deployment_exists = True
except Exception:
logger.exception('Did not retrieve existing deployment')
# Create the Schema Registry deployment
if not deployment_exists:
dep_body = create_deployment(
name=name,
bootstrap_server=bootstrap_server,
secret_name=secret_name,
secret_version=secret_version)
# Set the StrimziSchemaRegistry as the owner
kopf.adopt(dep_body, owner=body)
dep_response = k8s_apps_v1_api.create_namespaced_deployment(
body=dep_body,
namespace=namespace
)
logger.debug(str(dep_response))
else:
logger.info('Deployment already exists')
try:
get_service(
name=name,
namespace=namespace,
k8s_client=k8s_client)
service_exists = True
except Exception:
logger.exception('Did not retrieve existing service')
# Create the http service to access the Schema Registry REST API
if not service_exists:
svc_body = create_service(
name=name)
# Set the StrimziSchemaRegistry as the owner
kopf.adopt(svc_body, owner=body)
svc_response = k8s_core_v1_api.create_namespaced_service(
body=svc_body,
namespace=namespace
)
logger.debug(str(svc_response))
else:
logger.info('Service already exists')
# Add the name of the registry to the cache
state.registry_names.add(name)