Skip to content
This repository has been archived by the owner on Aug 1, 2019. It is now read-only.

Commit

Permalink
Replace use of aiohttp with cherrypy
Browse files Browse the repository at this point in the history
* Aiohttp (and related libraries) have a python support policy
  which is causing us problems.
* Cherrypy supports threads which integrates well with the rest
  of Zuul.

Change-Id: Ib611df06035890d3e87fc5ad92fdfc7ac441edce
  • Loading branch information
James E. Blair committed May 31, 2018
1 parent 47f2004 commit 0eeceba
Show file tree
Hide file tree
Showing 13 changed files with 347 additions and 555 deletions.
4 changes: 2 additions & 2 deletions doc/source/developer/ansible.rst
Original file line number Diff line number Diff line change
Expand Up @@ -43,15 +43,15 @@ that starts a log streaming daemon on the build node.
All jobs run with the :py:mod:`zuul.ansible.callback.zuul_stream` callback
plugin enabled, which writes the build log to a file so that the
:py:class:`zuul.lib.log_streamer.LogStreamer` can provide the data on demand
over the finger protocol. Finally, :py:class:`zuul.web.LogStreamingHandler`
over the finger protocol. Finally, :py:class:`zuul.web.LogStreamHandler`
exposes that log stream over a websocket connection as part of
:py:class:`zuul.web.ZuulWeb`.

.. autoclass:: zuul.ansible.callback.zuul_stream.CallbackModule
:members:

.. autoclass:: zuul.lib.log_streamer.LogStreamer
.. autoclass:: zuul.web.LogStreamingHandler
.. autoclass:: zuul.web.LogStreamHandler
.. autoclass:: zuul.web.ZuulWeb

In addition to real-time streaming, Zuul also installs another callback module,
Expand Down
3 changes: 3 additions & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -29,3 +29,6 @@ uvloop;python_version>='3.5'
psutil
fb-re2>=1.0.6
paho-mqtt
cherrypy
ws4py
routes
15 changes: 3 additions & 12 deletions tests/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
# License for the specific language governing permissions and limitations
# under the License.

import asyncio
import configparser
from contextlib import contextmanager
import datetime
Expand Down Expand Up @@ -1894,22 +1893,14 @@ def _setUp(self):
listen_address='127.0.0.1', listen_port=0,
gear_server='127.0.0.1', gear_port=self.gearman_server_port,
info=zuul.model.WebInfo(),
_connections=self.connections)
loop = asyncio.new_event_loop()
loop.set_debug(True)
ws_thread = threading.Thread(target=self.web.run, args=(loop,))
ws_thread.start()
self.addCleanup(loop.close)
self.addCleanup(ws_thread.join)
connections=self.connections)
self.web.start()
self.addCleanup(self.web.stop)

self.host = 'localhost'
# Wait until web server is started
while True:
time.sleep(0.1)
if self.web.server is None:
continue
self.port = self.web.server.sockets[0].getsockname()[1]
self.port = self.web.port
try:
with socket.create_connection((self.host, self.port)):
break
Expand Down
17 changes: 3 additions & 14 deletions tests/unit/test_github_driver.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.

import asyncio
import threading
import os
import re
from testtools.matchers import MatchesRegex, StartsWith
Expand Down Expand Up @@ -789,23 +787,14 @@ def setUp(self):
self.web = zuul.web.ZuulWeb(
listen_address='127.0.0.1', listen_port=0,
gear_server='127.0.0.1', gear_port=self.gearman_server.port,
connections=[self.fake_github],
_connections=self.connections)
loop = asyncio.new_event_loop()
loop.set_debug(True)
ws_thread = threading.Thread(target=self.web.run, args=(loop,))
ws_thread.start()
self.addCleanup(loop.close)
self.addCleanup(ws_thread.join)
connections=self.connections)
self.web.start()
self.addCleanup(self.web.stop)

host = '127.0.0.1'
# Wait until web server is started
while True:
time.sleep(0.1)
if self.web.server is None:
continue
port = self.web.server.sockets[0].getsockname()[1]
port = self.web.port
try:
with socket.create_connection((host, port)):
break
Expand Down
28 changes: 6 additions & 22 deletions tests/unit/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -283,21 +283,13 @@ def test_decode_boundaries(self):
listen_address='::', listen_port=0,
gear_server='127.0.0.1', gear_port=self.gearman_server.port,
static_path=tempfile.gettempdir(),
_connections=self.connections)
loop = asyncio.new_event_loop()
loop.set_debug(True)
ws_thread = threading.Thread(target=web_server.run, args=(loop,))
ws_thread.start()
self.addCleanup(loop.close)
self.addCleanup(ws_thread.join)
connections=self.connections)
web_server.start()
self.addCleanup(web_server.stop)

# Wait until web server is started
while True:
if web_server.server is None:
time.sleep(0.1)
continue
port = web_server.server.sockets[0].getsockname()[1]
port = web_server.port
try:
with socket.create_connection((self.host, port)):
break
Expand Down Expand Up @@ -374,21 +366,13 @@ def test_websocket_streaming(self):
listen_address='::', listen_port=0,
gear_server='127.0.0.1', gear_port=self.gearman_server.port,
static_path=tempfile.gettempdir(),
_connections=self.connections)
loop = asyncio.new_event_loop()
loop.set_debug(True)
ws_thread = threading.Thread(target=web_server.run, args=(loop,))
ws_thread.start()
self.addCleanup(loop.close)
self.addCleanup(ws_thread.join)
connections=self.connections)
web_server.start()
self.addCleanup(web_server.stop)

# Wait until web server is started
while True:
if web_server.server is None:
time.sleep(0.1)
continue
port = web_server.server.sockets[0].getsockname()[1]
port = web_server.port
try:
with socket.create_connection((self.host, port)):
break
Expand Down
19 changes: 3 additions & 16 deletions tests/unit/test_web.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,9 @@
# License for the specific language governing permissions and limitations
# under the License.

import asyncio
import json
import threading
import os
import urllib.parse
import time
import socket

import requests
Expand Down Expand Up @@ -63,25 +60,15 @@ def setUp(self):
listen_address='127.0.0.1', listen_port=0,
gear_server='127.0.0.1', gear_port=self.gearman_server.port,
info=zuul.model.WebInfo.fromConfig(self.zuul_ini_config),
connections=self.connections.connections.values(),
_connections=self.connections
connections=self.connections
)
loop = asyncio.new_event_loop()
loop.set_debug(True)
ws_thread = threading.Thread(target=self.web.run, args=(loop,))
ws_thread.start()
self.addCleanup(loop.close)
self.addCleanup(ws_thread.join)
self.web.start()
self.addCleanup(self.web.stop)

self.host = 'localhost'
self.port = self.web.port
# Wait until web server is started
while True:
time.sleep(0.1)
if self.web.server is None:
continue
self.port = self.web.server.sockets[0].getsockname()[1]
print(self.host, self.port)
try:
with socket.create_connection((self.host, self.port)):
break
Expand Down
18 changes: 4 additions & 14 deletions zuul/cmd/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,9 @@
# License for the specific language governing permissions and limitations
# under the License.

import asyncio
import logging
import signal
import sys
import threading

import zuul.cmd
import zuul.model
Expand Down Expand Up @@ -55,13 +53,11 @@ def _run(self):
params['ssl_cert'] = get_default(self.config, 'gearman', 'ssl_cert')
params['ssl_ca'] = get_default(self.config, 'gearman', 'ssl_ca')

params['_connections'] = self.connections
params['connections'] = []
params['connections'] = self.connections
# Validate config here before we spin up the ZuulWeb object
for conn_name, connection in self.connections.connections.items():
try:
if connection.validateWebConfig(self.config, self.connections):
params['connections'].append(connection)
connection.validateWebConfig(self.config, self.connections)
except Exception:
self.log.exception("Error validating config")
sys.exit(1)
Expand All @@ -72,25 +68,19 @@ def _run(self):
self.log.exception("Error creating ZuulWeb:")
sys.exit(1)

loop = asyncio.get_event_loop()
signal.signal(signal.SIGUSR1, self.exit_handler)
signal.signal(signal.SIGTERM, self.exit_handler)

self.log.info('Zuul Web Server starting')
self.thread = threading.Thread(target=self.web.run,
args=(loop,),
name='web')
self.thread.start()
self.web.start()

try:
signal.pause()
except KeyboardInterrupt:
print("Ctrl + C: asking web server to exit nicely...\n")
self.exit_handler(signal.SIGINT, None)

self.thread.join()
loop.stop()
loop.close()
self.web.stop()
self.log.info("Zuul Web Server stopped")

def run(self):
Expand Down
16 changes: 5 additions & 11 deletions zuul/connection/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,26 +75,20 @@ def maintainCache(self, relevant):
still in use. Anything in our cache that isn't in the supplied
list should be safe to remove from the cache."""

def getWebHandlers(self, zuul_web, info):
"""Return a list of web handlers to register with zuul-web.
def getWebController(self, zuul_web, info):
"""Return a cherrypy web controller to register with zuul-web.
:param zuul.web.ZuulWeb zuul_web:
Zuul Web instance.
:param zuul.model.WebInfo info:
The WebInfo object for the Zuul Web instance. Can be used by
plugins to toggle API capabilities.
:returns: List of `zuul.web.handler.BaseWebHandler` instances.
:returns: A `zuul.web.handler.BaseWebController` instance.
"""
return []
return None

def validateWebConfig(self, config, connections):
"""Validate config and determine whether to register web handlers.
By default this method returns False, which means this connection
has no web handlers to register.
If the method returns True, then its `getWebHandlers` method
should be called during route registration.
"""Validate web config.
If there is a fatal error, the method should raise an exception.
Expand Down
54 changes: 23 additions & 31 deletions zuul/driver/github/githubconnection.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@
# License for the specific language governing permissions and limitations
# under the License.

import asyncio
import collections
import datetime
import logging
Expand All @@ -25,7 +24,7 @@
import json
import traceback

from aiohttp import web
import cherrypy
import cachecontrol
from cachecontrol.cache import DictCache
from cachecontrol.heuristics import BaseHeuristic
Expand All @@ -39,7 +38,7 @@
import gear

from zuul.connection import BaseConnection
from zuul.web.handler import BaseDriverWebHandler
from zuul.web.handler import BaseWebController
from zuul.lib.config import get_default
from zuul.model import Ref, Branch, Tag, Project
from zuul.exceptions import MergeFailure
Expand Down Expand Up @@ -1149,8 +1148,8 @@ def _get_statuses(self, project, sha):

return statuses

def getWebHandlers(self, zuul_web, info):
return [GithubWebhookHandler(self, zuul_web, 'POST', 'payload')]
def getWebController(self, zuul_web, info):
return GithubWebController(zuul_web, self)

def validateWebConfig(self, config, connections):
if 'webhook_token' not in self.connection_config:
Expand All @@ -1160,38 +1159,37 @@ def validateWebConfig(self, config, connections):
return True


class GithubWebhookHandler(BaseDriverWebHandler):
class GithubWebController(BaseWebController):

log = logging.getLogger("zuul.GithubWebhookHandler")
log = logging.getLogger("zuul.GithubWebController")

def __init__(self, connection, zuul_web, method, path):
super(GithubWebhookHandler, self).__init__(
connection=connection, zuul_web=zuul_web, method=method, path=path)
def __init__(self, zuul_web, connection):
self.connection = connection
self.zuul_web = zuul_web
self.token = self.connection.connection_config.get('webhook_token')

def _validate_signature(self, body, headers):
try:
request_signature = headers['x-hub-signature']
except KeyError:
raise web.HTTPUnauthorized(
reason='X-Hub-Signature header missing.')
raise cherrypy.HTTPError(401, 'X-Hub-Signature header missing.')

payload_signature = _sign_request(body, self.token)

self.log.debug("Payload Signature: {0}".format(str(payload_signature)))
self.log.debug("Request Signature: {0}".format(str(request_signature)))
if not hmac.compare_digest(
str(payload_signature), str(request_signature)):
raise web.HTTPUnauthorized(
reason=('Request signature does not match calculated payload '
'signature. Check that secret is correct.'))
raise cherrypy.HTTPError(
401,
'Request signature does not match calculated payload '
'signature. Check that secret is correct.')

return True

def setEventLoop(self, event_loop):
self.event_loop = event_loop

async def handleRequest(self, request):
@cherrypy.expose
@cherrypy.tools.json_out(content_type='application/json; charset=utf-8')
def payload(self):
# Note(tobiash): We need to normalize the headers. Otherwise we will
# have trouble to get them from the dict afterwards.
# e.g.
Expand All @@ -1202,28 +1200,22 @@ async def handleRequest(self, request):
# modifies the header casing in its own way and by specification http
# headers are case insensitive so just lowercase all so we don't have
# to take care later.
# Note(corvus): Don't use cherrypy's json_in here so that we
# can validate the signature.
headers = dict()
for key, value in request.headers.items():
for key, value in cherrypy.request.headers.items():
headers[key.lower()] = value
body = await request.read()
body = cherrypy.request.body.read()
self._validate_signature(body, headers)
# We cannot send the raw body through gearman, so it's easy to just
# encode it as json, after decoding it as utf-8
json_body = json.loads(body.decode('utf-8'))

gear_task = self.event_loop.run_in_executor(
None, self.zuul_web.rpc.submitJob,
job = self.zuul_web.rpc.submitJob(
'github:%s:payload' % self.connection.connection_name,
{'headers': headers, 'body': json_body})

try:
job = await asyncio.wait_for(gear_task, 300)
except asyncio.TimeoutError:
self.log.exception("Gearman timeout:")
return web.json_response({'error_description': 'Internal error'},
status=500)

return web.json_response(json.loads(job.data[0]))
return json.loads(job.data[0])


def _status_as_tuple(status):
Expand Down
Loading

1 comment on commit 0eeceba

@webknjaz
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's amusing to see one of frameworks I maintain being replaced by other framework I also maintain 🤣 🚀

Please sign in to comment.