-
Notifications
You must be signed in to change notification settings - Fork 3k
/
chrome_android.py
124 lines (96 loc) · 4.54 KB
/
chrome_android.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
import subprocess
from .base import Browser, ExecutorBrowser, require_arg
from .base import get_timeout_multiplier # noqa: F401
from .chrome import executor_kwargs as chrome_executor_kwargs
from ..webdriver_server import ChromeDriverServer
from ..executors.executorwebdriver import (WebDriverTestharnessExecutor, # noqa: F401
WebDriverRefTestExecutor) # noqa: F401
from ..executors.executorchrome import ChromeDriverWdspecExecutor # noqa: F401
__wptrunner__ = {"product": "chrome_android",
"check_args": "check_args",
"browser": "ChromeAndroidBrowser",
"executor": {"testharness": "WebDriverTestharnessExecutor",
"reftest": "WebDriverRefTestExecutor",
"wdspec": "ChromeDriverWdspecExecutor"},
"browser_kwargs": "browser_kwargs",
"executor_kwargs": "executor_kwargs",
"env_extras": "env_extras",
"env_options": "env_options",
"timeout_multiplier": "get_timeout_multiplier"}
_wptserve_ports = set()
def check_args(**kwargs):
require_arg(kwargs, "package_name")
require_arg(kwargs, "webdriver_binary")
def browser_kwargs(test_type, run_info_data, config, **kwargs):
return {"package_name": kwargs["package_name"],
"device_serial": kwargs["device_serial"],
"webdriver_binary": kwargs["webdriver_binary"],
"webdriver_args": kwargs.get("webdriver_args")}
def executor_kwargs(test_type, server_config, cache_manager, run_info_data,
**kwargs):
# Use update() to modify the global list in place.
_wptserve_ports.update(set(
server_config['ports']['http'] + server_config['ports']['https'] +
server_config['ports']['ws'] + server_config['ports']['wss']
))
executor_kwargs = chrome_executor_kwargs(test_type, server_config,
cache_manager, run_info_data,
**kwargs)
# Remove unsupported options on mobile.
del executor_kwargs["capabilities"]["goog:chromeOptions"]["prefs"]
assert kwargs["package_name"], "missing --package-name"
executor_kwargs["capabilities"]["goog:chromeOptions"]["androidPackage"] = \
kwargs["package_name"]
if kwargs.get("device_serial"):
executor_kwargs["capabilities"]["goog:chromeOptions"]["androidDeviceSerial"] = \
kwargs["device_serial"]
return executor_kwargs
def env_extras(**kwargs):
return []
def env_options():
# allow the use of host-resolver-rules in lieu of modifying /etc/hosts file
return {"server_host": "127.0.0.1"}
class ChromeAndroidBrowser(Browser):
"""Chrome is backed by chromedriver, which is supplied through
``wptrunner.webdriver.ChromeDriverServer``.
"""
def __init__(self, logger, package_name, webdriver_binary="chromedriver",
device_serial=None, webdriver_args=None):
Browser.__init__(self, logger)
self.package_name = package_name
self.device_serial = device_serial
self.server = ChromeDriverServer(self.logger,
binary=webdriver_binary,
args=webdriver_args)
self.setup_adb_reverse()
def _adb_run(self, args):
cmd = ['adb']
if self.device_serial:
cmd.extend(['-s', self.device_serial])
cmd.extend(args)
self.logger.info(' '.join(cmd))
subprocess.check_call(cmd)
def setup_adb_reverse(self):
self._adb_run(['wait-for-device'])
self._adb_run(['forward', '--remove-all'])
self._adb_run(['reverse', '--remove-all'])
# "adb reverse" forwards network connection from device to host.
for port in _wptserve_ports:
self._adb_run(['reverse', 'tcp:%d' % port, 'tcp:%d' % port])
def start(self, **kwargs):
self.server.start(block=False)
def stop(self, force=False):
self.server.stop(force=force)
def pid(self):
return self.server.pid
def is_alive(self):
# TODO(ato): This only indicates the driver is alive,
# and doesn't say anything about whether a browser session
# is active.
return self.server.is_alive()
def cleanup(self):
self.stop()
self._adb_run(['forward', '--remove-all'])
self._adb_run(['reverse', '--remove-all'])
def executor_browser(self):
return ExecutorBrowser, {"webdriver_url": self.server.url}