Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master
Fetching contributors…

Cannot retrieve contributors at this time

324 lines (281 sloc) 22.533 kb
"""
Copyright (c) 2010 Sencha Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.
"""
import png
import base64
import errno
import os
import re
import socket
import struct
import sys
import tempfile
from subprocess import Popen, PIPE, STDOUT
_ADB_PORT = 5037
_LOG_FILTER = 'RemoteJS'
_TARGET_ACTIVITY = 'com.sencha.remotejs/.RemoteJS'
_REMOTE_CAPTURE_PATH = '/data/data/com.sencha.remotejs/cache/remotejs-capture.png'
_g_targetDevice = ""
_g_base64Apk = b""
def _isProcessRunning(pid):
try:
return (os.waitpid(pid, os.WNOHANG) == (0, 0))
except:
return False
def targetDevice():
return _g_targetDevice
def setTargetDevice(id):
global _g_targetDevice
_g_targetDevice = id
def startConnection():
_g_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
_g_socket.connect(('127.0.0.1', _ADB_PORT))
sendData(_g_socket, 'host:transport:' + targetDevice())
return readOkay(_g_socket), _g_socket
def endConnection(socket):
socket.close()
def readData(socket, max = 4096):
return socket.recv(max)
def readOkay(socket):
data = socket.recv(4)
return data[0] == 'O' and data[1] == 'K' and data[2] == 'A' and data[3] == 'Y'
def sendData(socket, str):
return socket.sendall('%04X%s' % (len(str), str))
def execute(cmd):
fullCmd = 'adb '
if targetDevice():
fullCmd += '-s ' + targetDevice() + ' '
fullCmd += cmd
proc = Popen(fullCmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT)
proc.stdin.close()
proc.wait()
def query(cmd):
fullCmd = 'adb '
if targetDevice():
fullCmd += '-s ' + targetDevice() + ' '
fullCmd += cmd
proc = Popen(fullCmd, shell=True, stdin=PIPE, stdout=PIPE, stderr=STDOUT)
output = proc.stdout.read()
proc.stdin.close()
proc.wait()
return output
def devices():
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(('127.0.0.1', _ADB_PORT))
sendData(sock, 'host:devices')
if readOkay(sock):
readData(sock, 4) # payload size in hex
data = readData(sock)
reply = ""
while len(data):
reply += data
data = readData(sock)
endConnection(sock)
devices = re.sub('List of devices attached\s+', '', reply)
devices = devices.splitlines()
list = []
for elem in devices:
if elem.find('device') != -1:
list.append(re.sub(r'\s*device', '', elem))
return list
else: # adb server not running
endConnection(sock)
return None
def shell(cmd):
ok, socket = startConnection()
if not ok:
return None
sendData(socket, 'shell:' + cmd)
if readOkay(socket):
data = readData(socket)
result = ""
while len(data):
result += data
data = readData(socket)
endConnection(socket)
return result
else:
endConnection(socket)
return None
def reboot():
ok, socket = startConnection()
if not ok:
return False
sendData(socket, 'reboot:')
ok = readOkay(socket)
endConnection(socket)
return ok
def framebuffer():
def headerMap(ints):
if len(ints) == 12:
return {'bpp': ints[0], 'size': ints[1], 'width': ints[2], 'height': ints[3],
'red': {'offset': ints[4], 'length': ints[5]},
'blue': {'offset': ints[6], 'length': ints[7]},
'green': {'offset': ints[8], 'length': ints[9]},
'alpha': {'offset': ints[10], 'length': ints[11]}}
else:
return {'size': ints[0], 'width': ints[1], 'height': ints[2]}
ok, socket = startConnection()
if not ok:
return None, None
sendData(socket, 'framebuffer:')
if readOkay(socket):
version = struct.unpack('@I', readData(socket, 4))[0] # ntohl
if version == 16: # compatibility mode
headerFields = 3 # size, width, height
else:
headerFields = 12 # bpp, size, width, height, 4*(offset, length)
header = headerMap(struct.unpack('@IIIIIIIIIIII', readData(socket, headerFields * 4)))
sendData(socket, '\x00')
data = readData(socket)
result = ""
while len(data):
result += data
data = readData(socket)
endConnection(socket)
return header, result # pass size returned in header
else:
endConnection(socket)
return None, None
def captureScreen(localFileName):
def normalizeFrom8888(data):
for i in range(0, len(data), 4):
color = data[i:i+4]
data[i] = color[header['red']['offset'] / 8]
data[i+1] = color[header['green']['offset'] / 8]
data[i+2] = color[header['blue']['offset'] / 8]
if header['bpp'] == 32:
if header['alpha']['length'] == 0:
data[i+3] = 255
else:
data[i+3] = color[header['alpha']['offset'] / 8]
return data
def normalizeFrom565(data):
result = []
length = len(data)
for i in range(0, length, 2):
# isolate color components, assume RGB565
short = struct.pack('BB', data[i], data[i+1])
pixel = struct.unpack('@H', short)[0]
c1 = (pixel & 0b1111100000000000) >> 11
c2 = (pixel & 0b0000011111100000) >> 5
c3 = (pixel & 0b11111)
# convert color format and prepare result
result.append(c1 * 255 / 31)
result.append(c2 * 255 / 63)
result.append(c3 * 255 / 31)
# this approximation should be faster but is not really for some reason
#result.append((c1 << 3) | (c1 >> 2))
#result.append((c2 << 2) | (c2 >> 4))
#result.append((c3 << 3) | (c3 >> 2))
return result
header, data = framebuffer()
file = open(localFileName, 'wb')
data = list(data)
for i in range(len(data)):
data[i] = ord(data[i])
if header['bpp'] == 32:
pngWriter = png.Writer(size=(header['width'], header['height']), alpha=True)
pngWriter.write_array(file, normalizeFrom8888(data))
else: # assuming 16bpp 565 format
pngWriter = png.Writer(size=(header['width'], header['height']), alpha=False)
data = normalizeFrom565(data)
pngWriter.write_array(file, data)
file.close()
def captureWindow(localFileName):
cmd = ' logcat -c' # flush log
fullCmd = 'adb '
if targetDevice():
fullCmd += '-s ' + targetDevice() + ' '
fullCmd += cmd
proc = Popen(fullCmd, shell=True, stdout=PIPE, stderr=STDOUT)
proc.wait()
cmd = ' logcat ' + _LOG_FILTER + ':V *:S'
fullCmd = 'adb '
if targetDevice():
fullCmd += '-s ' + targetDevice() + ' '
fullCmd += cmd
proc = Popen(fullCmd, shell=True, stdout=PIPE, stderr=STDOUT)
execute("shell am start -a com.sencha.remotejs.ACTION_CAPTURE -n " + _TARGET_ACTIVITY)
while _isProcessRunning(proc.pid):
line = proc.stdout.readline()
if re.match(r'^I/' + _LOG_FILTER, line):
if line.find('Capture saved') != -1:
execute('pull ' + _REMOTE_CAPTURE_PATH + ' ' + localFileName)
return True
elif line.find('Capture error') != -1:
return False
def isAvailable():
return query('version').startswith('Android Debug Bridge')
def installDeviceTool():
uninstall('com.sencha.remotejs')
file = tempfile.NamedTemporaryFile()
file.write(base64.b64decode(_g_base64Apk))
file.flush()
ok, reply = install(file.name)
file.close()
return ok, reply
def uninstall(apk):
reply = shell('pm uninstall ' + apk)
if reply:
return reply.find('Success') != -1
else:
return False
def install(apk):
reply = query('install ' + apk).strip().split('\n')[-1]
if reply == 'Success':
return True, reply
else:
return False, reply
def evaluateJS(js):
expr = base64.b64encode('javascript:(function() { ' + js + '; })()')
cmd = 'shell am start -a android.intent.action.VIEW -n ' + _TARGET_ACTIVITY \
+ " -d '" + expr + "'"
execute(cmd)
def openUrl(url):
encodedUrl = base64.b64encode(url)
cmd = 'shell am start -a android.intent.action.VIEW -n ' + _TARGET_ACTIVITY \
+ " -d '" + encodedUrl + "'"
execute(cmd)
def filterLogcat(line):
line = re.sub(r'[A-Z]/' + _LOG_FILTER + '(\b)*\((\s)*(\d)+\): ', '', line)
line = re.sub(r'Console: ', '', line)
line = re.sub(r':(\d)+(\b)*', '', line)
line = re.sub(r'\r\n', '', line)
return line
def startServer():
execute('start-server')
# lineHandler must return a string
def readLogcat(lineHandler = filterLogcat):
cmd = ' logcat -c' # flush log
fullCmd = 'adb '
if targetDevice():
fullCmd += '-s ' + targetDevice() + ' '
fullCmd += cmd
proc = Popen(fullCmd, shell=True, stdout=PIPE, stderr=STDOUT)
proc.wait()
cmd = ' logcat ' + _LOG_FILTER + ':V *:S'
fullCmd = 'adb '
if targetDevice():
fullCmd += '-s ' + targetDevice() + ' '
fullCmd += cmd
proc = Popen(fullCmd, shell=True, stdout=PIPE, stderr=STDOUT)
while _isProcessRunning(proc.pid):
line = proc.stdout.readline()
if re.match(r'^[A-Z]/' + _LOG_FILTER, line):
line = lineHandler(line)
Jump to Line
Something went wrong with that request. Please try again.