Permalink
Browse files

support password logins, involves rejigging commandline options

  • Loading branch information...
1 parent 68c19a7 commit 74a83283f75a6c90315f6df575b8b8984a06b2df Marc Sibson committed Feb 24, 2011
Showing with 168 additions and 95 deletions.
  1. +98 −79 tests/unit/test_client.py
  2. +47 −2 tests/unit/test_command.py
  3. +7 −0 vncdotool/client.py
  4. +16 −14 vncdotool/command.py
View
@@ -1,13 +1,15 @@
import mock
from nose.plugins.skip import SkipTest
-from vncdotool.client import VNCDoToolClient, VNCDoToolFactory
+from vncdotool import client
from vncdotool import rfb
+@mock.isolate(client.VNCDoToolClient,
+ excludes='math.sqrt,operator.add,client.KEYMAP')
class TestVNCDoToolClient(object):
def setUp(self):
- self.client = VNCDoToolClient()
+ self.client = client.VNCDoToolClient()
self.client.transport = mock.Mock()
self.client.factory = mock.Mock()
@@ -23,140 +25,157 @@ def _tryPIL(self):
raise SkipTest
def test_vncConnectionMade(self):
- client = self.client
- client.vncConnectionMade()
- factory = client.factory
- factory.clientConnectionMade.assert_called_once_with(client)
+ cli = self.client
+ cli.vncConnectionMade()
+ factory = cli.factory
+ factory.clientConnectionMade.assert_called_once_with(cli)
def test_keyPress_single_alpha(self):
- client = self.client
- client.keyPress('a')
- client.keyEvent.assert_a_call_exists_with(ord('a'), down=1)
- client.keyEvent.assert_a_call_exists_with(ord('a'), down=0)
+ cli = self.client
+ cli.keyPress('a')
+ cli.keyEvent.assert_a_call_exists_with(ord('a'), down=1)
+ cli.keyEvent.assert_a_call_exists_with(ord('a'), down=0)
def test_keyPress_multiple(self):
- client = self.client
- client.keyPress('ctrl-alt-del')
+ cli = self.client
+ cli.keyPress('ctrl-alt-del')
# XXX doesn't ensure correct order
- client.keyEvent.assert_a_call_exists_with(rfb.KEY_ControlLeft, down=1)
- client.keyEvent.assert_a_call_exists_with(rfb.KEY_AltLeft, down=1)
- client.keyEvent.assert_a_call_exists_with(rfb.KEY_Delete, down=1)
- client.keyEvent.assert_a_call_exists_with(rfb.KEY_ControlLeft, down=0)
- client.keyEvent.assert_a_call_exists_with(rfb.KEY_AltLeft, down=0)
- client.keyEvent.assert_a_call_exists_with(rfb.KEY_Delete, down=0)
+ cli.keyEvent.assert_a_call_exists_with(rfb.KEY_ControlLeft, down=1)
+ cli.keyEvent.assert_a_call_exists_with(rfb.KEY_AltLeft, down=1)
+ cli.keyEvent.assert_a_call_exists_with(rfb.KEY_Delete, down=1)
+ cli.keyEvent.assert_a_call_exists_with(rfb.KEY_ControlLeft, down=0)
+ cli.keyEvent.assert_a_call_exists_with(rfb.KEY_AltLeft, down=0)
+ cli.keyEvent.assert_a_call_exists_with(rfb.KEY_Delete, down=0)
def test_captureScreen(self):
- client = self.client
- client.vncConnectionMade()
- client.updates = mock.Mock()
+ cli = self.client
+ cli.vncConnectionMade()
+ cli.updates = mock.Mock()
fname = 'foo.png'
- d = client.captureScreen(fname)
- d.addCallback.assert_called_once_with(client._captureSave, fname)
- assert client.framebufferUpdateRequest.called
+ d = cli.captureScreen(fname)
+ d.addCallback.assert_called_once_with(cli._captureSave, fname)
+ assert cli.framebufferUpdateRequest.called
def test_captureSave(self):
- client = self.client
- client.screen = mock.Mock()
+ cli = self.client
+ cli.screen = mock.Mock()
fname = 'foo.png'
- r = client._captureSave(client.screen, fname)
- client.screen.save.assert_called_once_with(fname)
- assert r == client
+ r = cli._captureSave(cli.screen, fname)
+ cli.screen.save.assert_called_once_with(fname)
+ assert r == cli
def test_expectScreen(self):
self._tryPIL()
- client = self.client
- client.vncConnectionMade()
- client.updates = mock.Mock()
- client.image = mock.Mock()
+ cli = self.client
+ cli.vncConnectionMade()
+ cli.updates = mock.Mock()
+ cli.image = mock.Mock()
fname = 'something.png'
- d = client.expectScreen(fname, 5)
- assert client.framebufferUpdateRequest.called
- client.image.return_value.open.assert_called_once_with(fname)
- assert client.expected == client.image.return_value.open.return_value.histogram.return_value
- assert client.updates.get.called
- update = client.updates.get.return_value
- update.addCallback.assert_called_once_with(client._expectCompare, 5)
+ d = cli.expectScreen(fname, 5)
+ assert cli.framebufferUpdateRequest.called
+ cli.image.return_value.open.assert_called_once_with(fname)
+ assert cli.expected == cli.image.return_value.open.return_value.histogram.return_value
+ assert cli.updates.get.called
+ update = cli.updates.get.return_value
+ update.addCallback.assert_called_once_with(cli._expectCompare, 5)
assert d != update
def test_expectCompareSuccess(self):
- client = self.client
- d = client.deferred = mock.Mock()
- client.expected = [2, 2, 2]
+ cli = self.client
+ d = cli.deferred = mock.Mock()
+ cli.expected = [2, 2, 2]
image = mock.Mock()
image.histogram.return_value = [1, 2, 3]
- client._expectCompare(image, 5)
+ cli._expectCompare(image, 5)
- d.callback.assert_called_once_with(client)
- assert client.deferred is None
+ d.callback.assert_called_once_with(cli)
+ assert cli.deferred is None
def test_expectCompareExactSuccess(self):
- client = self.client
- d = client.deferred = mock.Mock()
- client.expected = [2, 2, 2]
+ cli = self.client
+ d = cli.deferred = mock.Mock()
+ cli.expected = [2, 2, 2]
image = mock.Mock()
image.histogram.return_value = [2, 2, 2]
- client._expectCompare(image, 0)
+ cli._expectCompare(image, 0)
- d.callback.assert_called_once_with(client)
- assert client.deferred is None
+ d.callback.assert_called_once_with(cli)
+ assert cli.deferred is None
def test_expectCompareFails(self):
- client = self.client
- client.deferred = mock.Mock()
- client.expected = [2, 2, 2]
- client.updates = mock.Mock()
+ cli = self.client
+ cli.deferred = mock.Mock()
+ cli.expected = [2, 2, 2]
+ cli.updates = mock.Mock()
image = mock.Mock()
image.histogram.return_value = [1, 1, 1]
- client._expectCompare(image, 0)
-
- assert not client.deferred.callback.called
- assert client.updates.get.called
- update = client.updates.get.return_value
- update.addCallback.assert_called_once_with(client._expectCompare, 0)
+ cli._expectCompare(image, 0)
+ assert not cli.deferred.callback.called
+ assert cli.updates.get.called
+ update = cli.updates.get.return_value
+ update.addCallback.assert_called_once_with(cli._expectCompare, 0)
def test_updateRectangeFirst(self):
- client = self.client
- client.updates = mock.Mock()
- client.image = mock.Mock()
+ cli = self.client
+ cli.updates = mock.Mock()
+ cli.image = mock.Mock()
data = mock.Mock()
- client.updateRectangle(0, 0, 100, 200, data)
+ cli.updateRectangle(0, 0, 100, 200, data)
- image = client.image.return_value
+ image = cli.image.return_value
image.fromstring.assert_called_once_with('RGB', (100, 200), data, 'raw', 'RGBX')
- assert client.updates.put(client.screen)
- assert client.screen == image.fromstring.return_value
+ assert cli.updates.put(cli.screen)
+ assert cli.screen == image.fromstring.return_value
def test_updateRectangeRegion(self):
- client = self.client
- client.updates = mock.Mock()
- client.image = mock.Mock()
- client.screen = mock.Mock()
- client.screen.size = (100, 100)
+ cli = self.client
+ cli.updates = mock.Mock()
+ cli.image = mock.Mock()
+ cli.screen = mock.Mock()
+ cli.screen.size = (100, 100)
data = mock.Mock()
- client.updateRectangle(20, 10, 50, 40, data)
+ cli.updateRectangle(20, 10, 50, 40, data)
- image = client.image.return_value
+ image = cli.image.return_value
image.fromstring.assert_called_once_with('RGB', (50, 40), data, 'raw', 'RGBX')
- assert client.updates.put(client.screen)
- paste = client.screen.paste
+ assert cli.updates.put(cli.screen)
+ paste = cli.screen.paste
paste.assert_called_once_with(image.fromstring.return_value, (20, 10))
+ def test_vncRequestPassword_prompt(self):
+ cli = self.client
+ cli.factory.password = None
+ cli.sendPassword = mock.Mock()
+ cli.vncRequestPassword()
+
+ password = client.getpass.getpass.return_value
+ assert client.getpass.getpass.called
+ assert cli.factory.password == password
+ cli.sendPassword.assert_called_once_with(password)
+
+ def test_vncRequestPassword_attribute(self):
+ cli = self.client
+ cli.sendPassword = mock.Mock()
+ cli.factory.password = 'mushroommushroom'
+ cli.vncRequestPassword()
+ cli.sendPassword.assert_called_once_with(cli.factory.password)
+
class TestVNCDoToolFactory(object):
def setUp(self):
- self.factory = VNCDoToolFactory()
+ self.factory = client.VNCDoToolFactory()
def test_init(self):
assert self.factory.deferred
View
@@ -15,14 +15,15 @@ def assertCalled(self, fn, *args):
def call_build_commands_list(self, commands):
command.build_command_list(self.factory, commands.split())
+
def test_alphanum_key(self):
self.call_build_commands_list('key a')
self.assertCalled(self.client.keyPress, 'a')
-
+
def test_control_key(self):
self.call_build_commands_list('key ctrl-c')
self.assertCalled(self.client.keyPress, 'ctrl-c')
-
+
def test_key_missing(self):
pass
@@ -83,3 +84,47 @@ def test_chain_type_expect(self):
call.assert_calls_exist_with(self.client.keyPress, key)
call.assert_calls_exist_with(self.client.expectScreen, 'password.png', 0)
+
+
+
+@mock.isolate(command.main)
+class TestMain(object):
+ def setUp(self):
+ self.factory = command.VNCDoToolFactory.return_value
+ self.options = mock.Mock()
+ self.options.display = 0
+ self.options.server = '127.0.0.1'
+ parse_args = command.VNCDoToolOptionParser.return_value.parse_args
+ parse_args.return_value = (self.options, [])
+
+ def test_single_host_name(self):
+ self.options.server = '10.11.12.13'
+ command.main()
+ connectTCP = command.reactor.connectTCP
+ connectTCP.assert_called_once_with('10.11.12.13', 5900, self.factory)
+ def test_host_port(self):
+ self.options.server = '10.11.12.13:4444'
+ command.main()
+ connectTCP = command.reactor.connectTCP
+ connectTCP.assert_called_once_with('10.11.12.13', 4444, self.factory)
+ def test_localhost_display(self):
+ self.options.display = 10
+ command.main()
+ connectTCP = command.reactor.connectTCP
+ connectTCP.assert_called_once_with('127.0.0.1', 5910, self.factory)
+
+ def test_host_display(self):
+ self.options.server = '10.11.12.13'
+ self.options.display = 10
+ command.main()
+ connectTCP = command.reactor.connectTCP
+ connectTCP.assert_called_once_with('10.11.12.13', 5910, self.factory)
+ def test_host_default(self):
+ command.main()
+ connectTCP = command.reactor.connectTCP
+ connectTCP.assert_called_once_with('127.0.0.1', 5900, self.factory)
+
+ def test_password(self):
+ self.options.password = 'openseseame'
+ command.main()
+ assert self.factory.password == 'openseseame'
View
@@ -9,6 +9,7 @@
import rfb
from twisted.internet.defer import Deferred, DeferredQueue
+import getpass
import math
import operator
@@ -222,6 +223,12 @@ def updateRectangle(self, x, y, width, height, data):
self.updates.put(self.screen)
+ def vncRequestPassword(self):
+ if self.factory.password is None:
+ self.factory.password = getpass.getpass()
+
+ self.sendPassword(self.factory.password)
+
class VNCDoToolFactory(rfb.RFBFactory):
logger = None
View
@@ -89,29 +89,31 @@ def build_command_list(factory, args):
def main():
usage = '%prog [options] CMD CMDARGS'
description = 'Command line interaction with a VNC server'
- op = VNCDoToolOptionParser(usage=usage, description=description,
- add_help_option=False)
+ op = VNCDoToolOptionParser(usage=usage, description=description)
op.add_option('-d', '--display', action='store', metavar='DISPLAY',
type='int', default=0,
- help='connect to vnc server on DISPLAY [%default]')
- op.add_option('--help', action='help',
- help='show this help message')
- op.add_option('-h', '--host', action='store', metavar='HOST',
+ help='connect to vnc server display :DISPLAY [%default]')
+ op.add_option('-p', '--password', action='store', metavar='PASSwORD',
+ help='use password to access server')
+ op.add_option('-s', '--server', action='store', metavar='ADDRESS',
default='127.0.0.1',
- help='connect to vnc server at HOST [%default]')
- op.add_option('-p', '--port', action='store', metavar='PORT',
- type='int',
- help='connect to vnc server on PORT')
+ help='connect to vnc server at ADDRESS[:PORT] [%default]')
op.add_option('-v', '--verbose', action='store_true')
opts, args = op.parse_args()
- if opts.port is None:
- opts.port = opts.display + 5900
-
if not len(args):
op.error('no command provided')
factory = VNCDoToolFactory()
+ try:
+ host, port = opts.server.split(':')
+ except ValueError:
+ host = opts.server
+ port = opts.display + 5900
+
+ if opts.password:
+ factory.password = opts.password
+
if opts.verbose:
print 'connecting to %s:%d' % (opts.host, opts.port)
factory.logger = logger
@@ -124,7 +126,7 @@ def main():
factory.deferred.addCallback(stop)
factory.deferred.addErrback(error)
- reactor.connectTCP(opts.host, opts.port, factory)
+ reactor.connectTCP(host, int(port), factory)
reactor.exit_status = 1
reactor.run()

0 comments on commit 74a8328

Please sign in to comment.