-
Notifications
You must be signed in to change notification settings - Fork 11
/
deployment_utils.py
253 lines (236 loc) · 11.1 KB
/
deployment_utils.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
import os,yaml,logging,datetime,json
from kubernetes.client.exceptions import ApiException
def validate_spec(op_spec: dict) -> dict:
'''
Function to validate the deployment yaml spec
Param:
op_spec : dict -> operator yaml file spec
'''
spec = op_spec['new'].get('spec',None)
config = op_spec['new'].get('config',None)
return_data = {}
if spec:
if spec.get('resources'):
if spec.get('requests'):
if not spec.get('memory') or spec.get('cpu'):
raise Exception("Memory or cpu is missing under requests")
if spec.get('limits'):
if not spec.get('memory') or spec.get('cpu'):
raise Exception("Memory or cpu is missing under limits")
return_data['namespace'] = spec.get('namespace','typesense')
return_data['image'] = spec.get('image','typesense/typesense')
return_data['resources'] = spec.get('resources',None)
return_data['nodeSelector'] = spec.get('nodeSelector',None)
return_data['replicas'] = spec.get('replicas',3)
if spec.get('storageClass'):
if not spec['storageClass'].get('name') or not spec['storageClass'].get('size'):
raise Exception('Missing storageClass name or size')
return_data['storageClassName'] = spec['storageClass']['name']
return_data['storage'] = spec['storageClass']['size']
if spec.get('startupProbe'):
if not spec['startupProbe'].get('failureThreshold') or not spec['startupProbe'].get('periodSeconds'):
raise Exception('Missing startupProbe properties. Required: failureThreshold, periodSeconds')
return_data['startupProbe_failureThreshold'] = spec['startupProbe']['failureThreshold']
return_data['startupProbe_periodSeconds'] = spec['startupProbe']['periodSeconds']
if spec.get('livenessProbe'):
if not spec['livenessProbe'].get('failureThreshold') or not spec['livenessProbe'].get('periodSeconds'):
raise Exception('Missing livenessProbe properties. Required: failureThreshold, periodSeconds')
return_data['livenessProbe_failureThreshold'] = spec['livenessProbe']['failureThreshold']
return_data['livenessProbe_periodSeconds'] = spec['livenessProbe']['periodSeconds']
if config:
return_data['password'] = config.get('password','297beb01dd21c')
return return_data
def create_modify_namespace(core_obj: object,namespace='default') -> None:
'''
Function to create or modify namespace
Params:
core_obj: kubernetes CoreV1Api object
'''
try:
path = os.path.join(
os.path.dirname(__file__),
'templates/namespace.yaml'
)
configuration = None
with open(path,'r') as _file:
configuration = yaml.safe_load(_file)
if namespace !='default':
configuration['metadata']['name'] = namespace
try:
resp = core_obj.create_namespace(body=configuration)
logging.info(f"Created namespace {resp.metadata.name} successfully")
except ApiException as e:
e.body = json.loads(e.body)
if e.body['reason'] == "AlreadyExists":
logging.info(f"Namespace exists, Skipping namespace creation!")
else:
logging.error(f"Kubernets Api Exception - Namespace: {e.body} ")
raise Exception(f"Kubernets Api Exception - Namespace: {e.body}")
except Exception as e:
logging.error(f"Exception namespace: {e}")
raise Exception(f"Exception namespace: {e}")
def deploy_typesense_statefulset(apps_obj: object,spec: dict,update=False) -> None:
'''
Function to deploy Typesense statefulset
Params:
apps_obj: kubernetes AppsV1Api object
'''
try:
path = os.path.join(
os.path.dirname(__file__),
'templates/statefulset.yaml'
)
configuration = None
with open(path,'r') as _file:
configuration = yaml.safe_load(_file)
if spec.get('storageClassName'):
template = {
"volumeClaimTemplates": [
{
"metadata": {
"name": "data"
},
"spec": {
"accessModes": [
"ReadWriteOnce"
],
"storageClassName": spec['storageClassName'],
"resources": {
"requests": {
"storage": spec['storage']
}
}
}
}
]
}
configuration['spec']['volumeClaimTemplates'] = template['volumeClaimTemplates']
else:
# Use empty dir mount
configuration['spec']['template']['spec']['volumes'].append({"name":"data","emptyDir":{"sizeLimit":"500Mi"}})
configuration['metadata']['namespace'] = spec['namespace']
if spec.get('image'):
configuration['spec']['template']['spec']['containers'][0]['image'] = spec['image']
if spec.get('resources'):
configuration['spec']['template']['spec']['containers'][0]['resources'] = spec['resources']
if spec.get('nodeSelector'):
configuration['spec']['template']['spec']['nodeSelector'] = spec['nodeSelector']
if spec.get('password'):
configuration['spec']['template']['spec']['containers'][0]['command'][4] = spec['password']
if spec.get('replicas'):
configuration['spec']['replicas'] = spec['replicas']
if spec.get('startupProbe_failureThreshold') and spec.get('startupProbe_periodSeconds'):
configuration['spec']['template']['spec']['containers'][0]['startupProbe']['failureThreshold'] = spec['startupProbe_failureThreshold']
configuration['spec']['template']['spec']['containers'][0]['startupProbe']['periodSeconds'] = spec['startupProbe_periodSeconds']
if spec.get('livenessProbe_failureThreshold') and spec.get('livenessProbe_periodSeconds'):
configuration['spec']['template']['spec']['containers'][0]['livenessProbe']['failureThreshold'] = spec['livenessProbe_failureThreshold']
configuration['spec']['template']['spec']['containers'][0]['livenessProbe']['periodSeconds'] = spec['livenessProbe_periodSeconds']
if update:
configuration["spec"]["template"]["metadata"]["annotations"] = {
"kubectl.kubernetes.io/restartedAt": datetime.datetime.utcnow().isoformat()
}
apps_obj.patch_namespaced_stateful_set(
body=configuration,name="typesense", namespace=spec['namespace'])
else:
apps_obj.create_namespaced_stateful_set(
body=configuration, namespace=spec['namespace'])
except ApiException as e:
logging.error(f"Kubernets Api Exception - Statefulset: {e.body} ")
raise Exception(f"Kubernets Api Exception - Statefulset: {e.body} ")
except Exception as e:
logging.error(f"Exception statefulset: {e}")
raise Exception(f"Exception statefulset: {e}")
def deploy_configmap(core_obj: object,replicas=None,namespace='default',update=False) -> None:
'''
Function to create configmap used by Typesense
Params:
core_obj: kubernetes CoreV1Api object
'''
try:
nodes = []
path = os.path.join(
os.path.dirname(__file__),
'templates/configmap.yaml'
)
configuration = None
with open(path,'r') as _file:
configuration = yaml.safe_load(_file)
configuration['metadata']['namespace'] = namespace
if replicas:
for count in range(0,int(replicas)):
nodes.append(('typesense-{}.ts.{}.svc.cluster.local:8107:8108').format(str(count),namespace))
configuration['data']['nodes'] = ','.join(nodes)
if update:
core_obj.patch_namespaced_config_map(body=configuration,namespace=namespace,name="nodeslist")
else:
core_obj.create_namespaced_config_map(body=configuration,namespace=namespace)
logging.info(f"Created Configmap nodeslist successfully")
except ApiException as e:
logging.error(f"Kubernets Api Exception - Configmap: {e.body} ")
raise Exception(f"Kubernets Api Exception - Configmap: {e.body} ")
except Exception as e:
logging.error(f"Exception configmap: {e}")
raise Exception(f"Exception configmap: {e}")
def deploy_service(core_obj: object,namespace='default') -> None:
'''
Function to deploy service mapping to connect with Typesense
Params:
core_obj: kubernetes CoreV1Api object
'''
try:
'''
----Deploy service---
'''
service_path = os.path.join(
os.path.dirname(__file__),
'templates/service.yaml'
)
configuration = None
with open(service_path,'r') as _file:
configuration = yaml.safe_load(_file)
configuration['metadata']['namespace'] = namespace
resp = core_obj.create_namespaced_service(body=configuration,namespace=namespace)
logging.info(f"Created Service {resp.metadata.name} successfully")
'''
----Deploy headless service---
'''
headless_service_path = os.path.join(
os.path.dirname(__file__),
'templates/headless-service.yaml'
)
with open(headless_service_path,'r') as _file:
configuration = yaml.safe_load(_file)
configuration['metadata']['namespace'] = namespace
resp = core_obj.create_namespaced_service(body=configuration,namespace=namespace)
logging.info(f"Created Headless Service {resp.metadata.name} successfully")
except ApiException as e:
logging.error(f"Kubernets Api Exception - Service: {e.body} ")
raise Exception(f"Kubernets Api Exception - Service: {e.body} ")
except Exception as e:
logging.error(f"Exception service: {e}")
raise Exception(f"Exception service: {e}")
def cleanup(apps_obj: object,core_obj: object,namespace='default') -> None:
'''
Function to cleanup all resources
Params:
core_obj: kubernetes CoreV1Api object
'''
try:
# Delete configmap
core_obj.delete_namespaced_config_map('nodeslist',namespace)
# Delete headless service
core_obj.delete_namespaced_service('ts',namespace)
# Delete service
core_obj.delete_namespaced_service('typesense-svc',namespace)
# Delete statefulset
apps_obj.delete_namespaced_stateful_set('typesense',namespace)
except ApiException as e:
e.body = json.loads(e.body)
if e.body['reason'] == "NotFound":
logging.info(f"Skipping Cleanup as {e.body['message']}")
else:
logging.error(f"Kubernets Api Exception - Cleanup: {e.body} ")
raise Exception(f"Kubernets Api Exception - Cleanup: {e.body} ")
except Exception as e:
logging.error(f"Exception Cleanup: {e}")
raise Exception(f"Exception Cleanup: {e}")