-
Notifications
You must be signed in to change notification settings - Fork 7
/
proxy.py
executable file
·165 lines (128 loc) · 5.42 KB
/
proxy.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
import json
import os
import time
from flask import Flask, Response, redirect, request, make_response, jsonify
import requests
import hashlib
app = Flask(__name__, static_url_path='', static_folder='public')
# add route for the statics
app.add_url_rule('/', 'root', lambda: app.send_static_file('index.html'))
# get configuration settings
app.config.from_object('config')
# get properties from config file
baseUrl = app.config.get('CONCOURSE_DOMAIN', 'http://concourse.change.me.hostname')
ciUsername = app.config.get('CONCOURSE_USERNAME', 'admin')
ciPassword = app.config.get('CONCOURSE_PASSWORD', 'admin')
ciTeam = app.config.get('CONCOURSE_TEAM', 'main')
# cached bearer token
bearerToken = ''
idx = 0
@app.route('/api/v1/pipelines', methods=['GET'])
def redirectPipelines():
'''
Make requests to the concourseCI and collect easy-to-parse output
about pipelines and job statuses
'''
# Get fresh auth header
tokenHeader = _getAuthenticationHeader()
# Get list of all the pipelines
try:
r = requests.get(baseUrl + '/api/v1/pipelines', headers=tokenHeader)
r.raise_for_status()
except requests.ConnectionError as e:
return Response("The ConcourseCI is not reachable", status=500, headers={'Etag': ''})
except requests.exceptions.HTTPError as e:
return Response("The ConcourseCI is not reachable, status code: " + str(e.response.status_code) + ", reason: " + e.response.reason, status=500, headers={'Etag': ''})
# Check that at least one worker is available
try:
from requests.auth import HTTPDigestAuth
responseWorkers = requests.get(baseUrl + '/api/v1/workers', headers=tokenHeader)
responseWorkers.raise_for_status()
if len(responseWorkers.json()) == 0:
return Response("There are no workers available!", status=500, headers={'Etag': ''})
except requests.exceptions.HTTPError as e:
return Response("The ConcourseCI is not reachable, status code: " + str(e.response.status_code) + ", reason: " + e.response.reason, status=500, headers={'Etag': ''})
# iterate over pipelines and find the status for each
lstPipelines = []
for pipeline in r.json():
details = {}
details['url'] = baseUrl + pipeline['url']
details['name'] = pipeline['name']
details['paused'] = pipeline['paused']
if (not pipeline["paused"]):
lstJobs = []
rr = requests.get(baseUrl + '/api/v1/teams/' + ciTeam + '/pipelines/' + pipeline['name'] + '/jobs', headers=tokenHeader)
for job in rr.json():
if job['next_build']:
lstJobs.append(
{
'status': job['next_build']['status'],
'id': job['next_build']['id']
}
)
elif job['finished_build']:
lstJobs.append(
{
'status': job['finished_build']['status'],
'id': job['finished_build']['id']
}
)
else:
lstJobs.append(
{'status': 'non-exist'}
)
details['jobs'] = lstJobs
lstPipelines.append(details)
# sort pipelines by name
lstPipelines = sorted(lstPipelines, key=lambda pipeline: pipeline['name'])
jsonResponse = json.dumps(lstPipelines)
# SHA1 should generate well-behaved etags
etag = hashlib.sha1(jsonResponse).hexdigest()
requestEtag = request.headers.get('If-None-Match', '')
if requestEtag == etag:
# the concourse status wasn't modify. Return only "not modified" status code, avoiding to refresh the page
return Response(
status=304,
mimetype='application/json',
headers={
'Cache-Control': 'public',
'Access-Control-Allow-Origin': '*',
'Etag': etag
})
else:
# there were changes since the last call. Return the full response
return Response(
jsonResponse,
mimetype='application/json',
headers={
'Cache-Control': 'public',
'Access-Control-Allow-Origin': '*',
'Etag': etag
})
def _getAuthenticationHeader():
'''
Method that returns the cached header for an authentication
and updates the bearer token periodically, because token
can be expired.
'''
global idx
global bearerToken
if (idx == 0 or idx > 5000):
# get the Bearer Token for the given team avoiding to request it again and again
try:
r = requests.get(baseUrl + '/api/v1/teams/' + ciTeam + '/auth/token', auth=(ciUsername, ciPassword))
r.raise_for_status()
# remember the new
bearerToken = r.json()['value']
idx = 1
except requests.exceptions.HTTPError as e:
idx = 0
return { "Authorization" : "Bearer nonsence" }
idx += 1
return { "Authorization" : "Bearer " + bearerToken }
if __name__ == '__main__':
# set debugging level to "ERROR"
import logging
logging.basicConfig(level=logging.ERROR)
port = int(os.environ.get('PORT', 3001))
app.run(host='0.0.0.0', port=port, debug=False)