-
Notifications
You must be signed in to change notification settings - Fork 47
/
HttpRequest.py
109 lines (88 loc) · 3.36 KB
/
HttpRequest.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
#!/usr/bin/env python
# -*- coding: utf-8 -*-
#
# This file is subject to the terms and conditions defined in
# file 'LICENSE.md', which is part of this source code package.
#
import re
import base64
import json
import os
import tempfile
import requests
import urllib3
from kubernetes.utils.ConvertData import convert
from six.moves.urllib.parse import urlencode
RE_VALID_SSL_IP = re.compile(
r'^https://(([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])\.){3}([0-9]|[1-9][0-9]|1[0-9]{2}|2[0-4][0-9]|25[0-5])')
class HttpRequest:
def __init__(self, method='GET', host='localhost:80', url='/', data=None, auth=None,
cert=None, ca_cert=None, ca_cert_data=None, token=None):
self.http_method = method
self.http_host = host
self.url = url
self.data = data
self.auth = auth
self.cert = cert
self.ca_cert = ca_cert
self.ca_cert_data = ca_cert_data
self.token = token
def send(self):
state = dict(success=False, reason=None, status=None, data=None)
http_headers = dict()
http_headers['Accept'] = 'application/json'
if self.http_method in ['PUT', 'POST', 'PATCH']:
http_headers['Content-type'] = 'application/json'
if self.token is not None:
http_headers['Authorization'] = 'Bearer {token}'.format(token=self.token)
if self.data is not None and self.http_method in ['GET']:
url = "{0}?{1}".format(self.url, urlencode(self.data))
self.url = url
self.url = self.http_host + self.url
temp = None
verify = False
if self.ca_cert is not None:
verify = self.ca_cert
if self.ca_cert_data is not None:
temp = tempfile.NamedTemporaryFile(delete=False)
data = base64.b64decode(self.ca_cert_data)
temp.write(data)
temp.close()
verify = temp.name
# TODO: TLS issue with Python 2.7 and urllib3 when hostname is an IP address
# A better fix should be found but I can't think of anything else for now.
search_result = RE_VALID_SSL_IP.search(self.http_host)
if search_result:
verify = False
urllib3.disable_warnings()
try:
response = requests.request(
method=self.http_method,
url=self.url,
auth=self.auth,
cert=self.cert,
headers=http_headers,
data="" if self.data is None else json.dumps(self.data),
verify=verify
)
except Exception as err:
raise err
finally:
if temp is not None:
os.unlink(temp.name)
state['status'] = response.status_code
state['reason'] = response.reason
# There was an issue with "kubectl logs" type requests where returned content is "text/plain" and
# we do have characters of unknown origin.
try:
resp_data = response.content.decode('utf-8')
except UnicodeDecodeError:
resp_data = response.content
if len(resp_data) > 0:
try:
state['data'] = convert(data=json.loads(resp_data))
except Exception:
state['data'] = resp_data
if 200 <= state['status'] <= 299:
state['success'] = True
return state