Skip to content

Commit

Permalink
use aiohttp to rebuild RPC module
Browse files Browse the repository at this point in the history
  • Loading branch information
jadbin committed Nov 8, 2016
1 parent b3a2033 commit d781930
Show file tree
Hide file tree
Showing 11 changed files with 85 additions and 156 deletions.
1 change: 1 addition & 0 deletions requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
aiohttp
async-timeout
pykafka
pymongo
pyyaml
Expand Down
7 changes: 3 additions & 4 deletions tests/test_rpc.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import pytest

from xpaw.rpc import RpcServer, RpcClient
from xpaw.errors import RpcNotFound
from xpaw.errors import RpcError

from .helpers import wait_server_start

Expand Down Expand Up @@ -71,7 +71,6 @@ def run():
loop.close()

def stop_loop():
server.shutdown()
loop.call_soon_threadsafe(loop.stop)

loop = asyncio.new_event_loop()
Expand Down Expand Up @@ -130,10 +129,10 @@ async def _test():
def test_method_not_found(self, rpc_server, rpc_client):
async def _test():
async_rpc_client = RpcClient(self.rpc_addr, loop=loop)
with pytest.raises(RpcNotFound):
with pytest.raises(RpcError):
await async_rpc_client.handle()

with pytest.raises(RpcNotFound):
with pytest.raises(RpcError):
rpc_client.handle()
loop = asyncio.new_event_loop()
loop.run_until_complete(_test())
Expand Down
24 changes: 12 additions & 12 deletions tests/test_selector.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,36 +7,36 @@


class TestSelector:
def test_first_and_last(self):
def test_selector_list(self):
html = """<li>a</li><div><ul><li>b</li><li>c</li></ul></div><ul><li>d</li></ul>"""
s = Selector(html)
assert s.select("//li/text()").first.value == "a"
assert s.select("//li/text()").last.value == "d"
assert s.select("//div").select(".//li/text()").first.value == "b"
assert s.select("//div").select(".//li/text()").last.value == "c"
assert s.select("//li/text()")[0].value == "a"
assert s.select("//li/text()")[-1].value == "d"
assert s.select("//div").select(".//li/text()").value[0] == "b"
assert s.select("//div").select(".//li/text()").value[-1] == "c"

def test_attribute_selection(self):
html = """<div style=display:none><a href="http://example.com/" target=_blank></div>"""
s = Selector(html)
assert s.select("//a/@href").first.value == "http://example.com/"
assert s.select("//a").select("@target").first.value == "_blank"
assert s.select("//div/@style").first.value == "display:none"
assert s.select("//a/@href")[0].value == "http://example.com/"
assert s.select("//a").select("@target")[0].value == "_blank"
assert s.select("//div/@style")[0].value == "display:none"

def test_text_selection(self):
html = """<div><p>expression: <var>x</var>+<var>y</var>=<var>z</var></p></div>"""
s = Selector(html)
assert s.select("//var[last()-1]/text()").first.value == "y"
assert s.select("//var[last()-1]/text()")[0].value == "y"
assert s.select("//var/text()").value == ["x", "y", "z"]
assert s.select("//p").first.value == "<p>expression: <var>x</var>+<var>y</var>=<var>z</var></p>"
assert s.select("//p")[0].value == "<p>expression: <var>x</var>+<var>y</var>=<var>z</var></p>"

def test_encoding_detection(self):
html = "<html lang=en><title>测试</title><meta charset=utf-8><meta http-equiv=Content-Type content='text/html; charset=gbk' /></html>"
body = html.encode("gbk")
with pytest.raises(UnicodeDecodeError):
s = Selector(body.decode("utf-8"))
s = Selector(body.decode("ascii", errors="ignore"))
assert s.select("//meta/@charset").first.value == "utf-8"
content_type = s.select("//meta[@http-equiv='Content-Type']").select("@content").first.value
assert s.select("//meta/@charset")[0].value == "utf-8"
content_type = s.select("//meta[@http-equiv='Content-Type']").select("@content")[0].value
assert content_type == "text/html; charset=gbk"
mtype, stype, _, params = parse_mimetype(content_type)
assert mtype == "text" and stype == "html" and params.get("charset") == "gbk"
Expand Down
6 changes: 3 additions & 3 deletions xpaw/_patch.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,9 @@
_socket_init = socket.socket.__init__


def socket_init(*args, **kw):
_socket_init(*args, **kw)
args[0].setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0))
def socket_init(self, *args, **kw):
_socket_init(self, *args, **kw)
self.setsockopt(socket.SOL_SOCKET, socket.SO_LINGER, struct.pack("ii", 1, 0))


socket.socket.__init__ = socket_init
15 changes: 10 additions & 5 deletions xpaw/agent.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@

import aiohttp
from aiohttp import web
import async_timeout

from xpaw.ds import PriorityQueue

Expand Down Expand Up @@ -56,7 +57,8 @@ def _start():
resource.add_route("POST", lambda r, i=i: self._post_proxy_list(r, i))
host, port = self._server_listen.split(":")
port = int(port)
self._server_loop.run_until_complete(self._server_loop.create_server(app.make_handler(access_log=None), host, port))
self._server_loop.run_until_complete(
self._server_loop.create_server(app.make_handler(access_log=None), host, port))
t = threading.Thread(target=_start)
t.start()

Expand Down Expand Up @@ -93,7 +95,9 @@ async def _get_proxy_list(self, request, id_string):
host, _ = peername
log.debug("'{0}' request '/{1}', count={2}, detail={3}".format(host, id_string, count, detail))
proxy_list = self._proxy_managers[id_string].get_proxy_list(count, detail=detail)
return web.Response(body=json.dumps(proxy_list).encode("utf-8"))
return web.Response(body=json.dumps(proxy_list).encode("utf-8"),
charset="utf-8",
content_type="application/json")

async def _post_proxy_list(self, request, id_string):
"""
Expand All @@ -107,7 +111,7 @@ async def _post_proxy_list(self, request, id_string):
peername = request.transport.get_extra_info("peername")
if peername:
host, _ = peername
log.debug("'{0}' post proxy list, the number of proxies: {1}".format(host, len(addr_list)))
log.debug("'{0}' post {1} proxies to '/{2}'".format(host, len(addr_list), id_string))
self._proxy_managers[id_string].add_proxy(*addr_list)
return web.Response(status=200)

Expand Down Expand Up @@ -322,7 +326,7 @@ async def _check():
proxy = addr
try:
with aiohttp.ClientSession(loop=self._loop) as session:
with aiohttp.Timeout(self._timeout, loop=self._loop):
with async_timeout.timeout(self._timeout, loop=self._loop):
async with session.request("GET", self._url, proxy=proxy) as resp:
if resp.status != self._http_status:
return False
Expand Down Expand Up @@ -399,7 +403,8 @@ def update_timestamp(self, proxy):
:param xpaw.agent.ProxyInfo proxy: proxy information
"""
self._conn.execute("REPLACE INTO {0} (addr, timestamp) VALUES ('{1}', {2})".format(self.TBL_NAME, proxy.addr, proxy.timestamp))
self._conn.execute(
"REPLACE INTO {0} (addr, timestamp) VALUES ('{1}', {2})".format(self.TBL_NAME, proxy.addr, proxy.timestamp))
self._update_count += 1
if self._update_count >= self.COMMIT_COUNT:
self._conn.commit()
Expand Down
6 changes: 4 additions & 2 deletions xpaw/downloader.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import asyncio

import aiohttp
import async_timeout

from xpaw.middleware import MiddlewareManager
from xpaw.http import HttpRequest, HttpResponse
Expand Down Expand Up @@ -80,7 +81,8 @@ async def _handle_request(request, middleware):
for method in middleware.request_handlers:
res = await method(request)
if not (res is None or isinstance(res, (HttpRequest, HttpResponse))):
raise TypeError("Request handler must return None, HttpRequest or HttpResponse, got {0}".format(type(res)))
raise TypeError(
"Request handler must return None, HttpRequest or HttpResponse, got {0}".format(type(res)))
if res:
return res

Expand All @@ -105,7 +107,7 @@ async def _handle_error(request, error, middleware):
async def _get(self, request, timeout):
log.debug("HTTP request: {0} {1}".format(request.method, request.url))
with aiohttp.ClientSession(cookies=request.cookies, loop=self._loop) as session:
with aiohttp.Timeout(timeout, loop=self._loop):
with async_timeout.timeout(timeout, loop=self._loop):
async with session.request(request.method,
request.url,
headers=request.headers,
Expand Down
3 changes: 2 additions & 1 deletion xpaw/downloadermws/proxy.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import logging

import aiohttp
import async_timeout

log = logging.getLogger(__name__)

Expand Down Expand Up @@ -56,7 +57,7 @@ async def _update_proxy_list(self):
log.debug("Update proxy list")
try:
with aiohttp.ClientSession(loop=self._loop) as session:
with aiohttp.Timeout(self._update_timeout, loop=self._loop):
with async_timeout.timeout(self._update_timeout, loop=self._loop):
async with session.get(self._agent_addr) as resp:
body = await resp.read()
proxy_list = json.loads(body.decode(encoding="utf-8"))
Expand Down
18 changes: 0 additions & 18 deletions xpaw/errors.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,24 +25,6 @@ class RpcError(Exception):
"""


class RpcNotFound(RpcError):
"""
RPC not found.
"""


class RpcTimeoutError(RpcError):
"""
RPC timeout error.
"""


class RpcParsingError(RpcError):
"""
Fail to parse data.
"""


class UsageError(Exception):
"""
CLI usage error.
Expand Down
Loading

0 comments on commit d781930

Please sign in to comment.