diff --git a/spyder_terminal/server/main.py b/spyder_terminal/server/main.py index 905daed2..f7b2ee48 100755 --- a/spyder_terminal/server/main.py +++ b/spyder_terminal/server/main.py @@ -36,11 +36,11 @@ clr = 'cls' -def create_app(shell): +def create_app(shell, close_future=None): """Create and return a tornado Web Application instance.""" settings = {"static_path": os.path.join( os.path.dirname(__file__), "static")} - application = tornado.web.Application(routes.ROUTES, + application = tornado.web.Application(routes.gen_routes(close_future), debug=True, serve_traceback=True, autoreload=True, **settings) diff --git a/spyder_terminal/server/rest/term_rest.py b/spyder_terminal/server/rest/term_rest.py index a5205d72..8d9d2ca9 100644 --- a/spyder_terminal/server/rest/term_rest.py +++ b/spyder_terminal/server/rest/term_rest.py @@ -9,10 +9,6 @@ class MainHandler(tornado.web.RequestHandler): """Handles creation of new terminals.""" - def initialize(self, db=None): - """Stump initialization function.""" - self.db = db - @tornado.gen.coroutine def get(self): """GET verb.""" @@ -30,10 +26,6 @@ def post(self): class ResizeHandler(tornado.web.RequestHandler): """Handles resizing of terminals.""" - def initialize(self, db=None): - """Stump initialization function.""" - self.db = db - @tornado.gen.coroutine def get(self): """GET verb: Forbidden.""" diff --git a/spyder_terminal/server/routes.py b/spyder_terminal/server/routes.py index 458cde59..88196cf4 100644 --- a/spyder_terminal/server/routes.py +++ b/spyder_terminal/server/routes.py @@ -36,3 +36,14 @@ ] ROUTES = REST + WS + WEB + + +def gen_routes(close_future): + """Return a list of HTML redirection routes.""" + if close_future is not None: + ws = [] + for route in WS: + ws.append((route[0], route[1], + dict(close_future=close_future))) + return REST + ws + WEB + return ROUTES diff --git a/spyder_terminal/server/tests/__init__.py b/spyder_terminal/server/tests/__init__.py index e69de29b..1af0ba8f 100644 --- a/spyder_terminal/server/tests/__init__.py +++ b/spyder_terminal/server/tests/__init__.py @@ -0,0 +1,16 @@ +""" +tests module. + +========= + +Provides: + 1. Websocket and HTTP server methods tests. + +How to use the documentation +---------------------------- +Documentation is available in one form: docstrings provided +with the code + +Copyright (c) 2016, Edgar A. Margffoy. +MIT, see LICENSE for more details. +""" diff --git a/spyder_terminal/server/tests/print_size.py b/spyder_terminal/server/tests/print_size.py new file mode 100644 index 00000000..2c1dc253 --- /dev/null +++ b/spyder_terminal/server/tests/print_size.py @@ -0,0 +1,99 @@ +#!/usr/bin/env python + +""" +Print console size on UNIX and Windows systems: +Taken from: https://gist.github.com/jtriley/1108174 +""" + +import os +import shlex +import struct +import platform +import subprocess + + +def get_terminal_size(): + """ getTerminalSize() + - get width and height of console + - works on linux,os x,windows,cygwin(windows) + originally retrieved from: + http://stackoverflow.com/questions/566746/how-to-get-console-window-width-in-python + """ + current_os = platform.system() + tuple_xy = None + if current_os == 'Windows': + tuple_xy = _get_terminal_size_windows() + if tuple_xy is None: + tuple_xy = _get_terminal_size_tput() + # needed for window's python in cygwin's xterm! + if current_os in ['Linux', 'Darwin'] or current_os.startswith('CYGWIN'): + tuple_xy = _get_terminal_size_linux() + if tuple_xy is None: + print("default") + tuple_xy = (80, 25) # default value + return tuple_xy + + +def _get_terminal_size_windows(): + try: + from ctypes import windll, create_string_buffer + # stdin handle is -10 + # stdout handle is -11 + # stderr handle is -12 + h = windll.kernel32.GetStdHandle(-12) + csbi = create_string_buffer(22) + res = windll.kernel32.GetConsoleScreenBufferInfo(h, csbi) + if res: + (bufx, bufy, curx, cury, wattr, + left, top, right, bottom, + maxx, maxy) = struct.unpack("hhhhHhhhhhh", csbi.raw) + sizex = right - left + 1 + sizey = bottom - top + 1 + return sizex, sizey + except: + pass + + +def _get_terminal_size_tput(): + """ + # get terminal width + src: http://stackoverflow.com/questions/263890/ + how-do-i-find-the-width-height-of-a-terminal-window + """ + try: + cols = int(subprocess.check_call(shlex.split('tput cols'))) + rows = int(subprocess.check_call(shlex.split('tput lines'))) + return (cols, rows) + except: + pass + + +def _get_terminal_size_linux(): + def ioctl_GWINSZ(fd): + try: + import fcntl + import termios + cr = struct.unpack('hh', + fcntl.ioctl(fd, termios.TIOCGWINSZ, '1234')) + return cr + except: + pass + cr = ioctl_GWINSZ(0) or ioctl_GWINSZ(1) or ioctl_GWINSZ(2) + if not cr: + try: + fd = os.open(os.ctermid(), os.O_RDONLY) + cr = ioctl_GWINSZ(fd) + os.close(fd) + except: + pass + if not cr: + try: + cr = (os.environ['LINES'], os.environ['COLUMNS']) + except: + return None + return int(cr[1]), int(cr[0]) + + +if __name__ == "__main__": + sizex, sizey = get_terminal_size() + print(sizex, sizey) diff --git a/spyder_terminal/server/tests/test_server.py b/spyder_terminal/server/tests/test_server.py index bd5d3895..fbc0e476 100644 --- a/spyder_terminal/server/tests/test_server.py +++ b/spyder_terminal/server/tests/test_server.py @@ -7,33 +7,57 @@ import os import sys +import urllib import os.path as osp -sys.path.append(osp.realpath(osp.dirname(__file__) + "/..")) +from tornado import testing, websocket, gen +from tornado.concurrent import Future from spyder.utils.programs import find_program + +sys.path.append(osp.realpath(osp.dirname(__file__) + "/..")) + from main import create_app -from tornado import testing, httpserver, gen, websocket +LOCATION = os.path.realpath(os.path.join(os.getcwd(), + os.path.dirname(__file__))) +LOCATION_SLASH = LOCATION.replace('\\', '/') +LINE_END = '\n' SHELL = '/usr/bin/env bash' WINDOWS = os.name == 'nt' if WINDOWS: + LINE_END = '\r\n' SHELL = find_program('cmd.exe') class TerminalServerTests(testing.AsyncHTTPTestCase): """Main server tests.""" def get_app(self): - return create_app(SHELL) + """Return HTTP/WS server.""" + self.close_future = Future() + return create_app(SHELL, self.close_future) - def _mk_connection(self): + def _mk_connection(self, pid): return websocket.websocket_connect( - 'ws://localhost:{}/'.format(self.port) + 'ws://127.0.0.1:{0}/terminals/{1}'.format( + self.get_http_port(), pid) ) + @gen.coroutine + def close(self, ws): + """ + Close a websocket connection and wait for the server side. + + If we don't wait here, there are sometimes leak warnings in the + tests. + """ + ws.close() + yield self.close_future + @testing.gen_test def test_main_get(self): + """Test if HTML source is rendered.""" response = yield self.http_client.fetch( self.get_url('/'), method="GET" @@ -42,6 +66,7 @@ def test_main_get(self): @testing.gen_test def test_main_post(self): + """Test that POST requests to root are forbidden.""" try: yield self.http_client.fetch( self.get_url('/'), @@ -50,4 +75,90 @@ def test_main_post(self): ) except Exception: pass - # self.assertEqual(response.code, 403) + + @testing.gen_test + def test_create_terminal(self): + """Test terminal creation.""" + data = {'rows': '25', 'cols': '80'} + response = yield self.http_client.fetch( + self.get_url('/api/terminals'), + method="POST", + body=urllib.urlencode(data) + ) + self.assertEqual(response.code, 200) + + @testing.gen_test + def test_terminal_communication(self): + """Test terminal creation.""" + data = {'rows': '25', 'cols': '80'} + response = yield self.http_client.fetch( + self.get_url('/api/terminals'), + method="POST", + body=urllib.urlencode(data) + ) + pid = response.body + sock = yield self._mk_connection(pid) + msg = yield sock.read_message() + test_msg = 'Ham, eggs and spam' + sock.write_message(' ' + test_msg) + msg = '' + while test_msg not in msg: + msg = yield sock.read_message() + self.assertTrue('Ham, eggs and spam' in msg) + + @testing.gen_test + def test_terminal_closing(self): + """Test terminal destruction.""" + data = {'rows': '25', 'cols': '80'} + response = yield self.http_client.fetch( + self.get_url('/api/terminals'), + method="POST", + body=urllib.urlencode(data) + ) + pid = response.body + sock = yield self._mk_connection(pid) + _ = yield sock.read_message() + yield self.close(sock) + try: + sock.write_message(' This shall not work') + except AttributeError: + pass + + @testing.gen_test + def test_terminal_resize(self): + """Test terminal resizing.""" + data = {'rows': '25', 'cols': '80'} + response = yield self.http_client.fetch( + self.get_url('/api/terminals'), + method="POST", + body=urllib.urlencode(data) + ) + + pid = response.body + sock = yield self._mk_connection(pid) + _ = yield sock.read_message() + + data = {'rows': '23', 'cols': '73'} + response = yield self.http_client.fetch( + self.get_url('/api/terminals/{0}/size'.format(pid)), + method="POST", + body=urllib.urlencode(data) + ) + + sock.write_message('cd {0}{1}'.format(LOCATION_SLASH, LINE_END)) + msg = '' + while LOCATION not in msg: + msg = yield sock.read_message() + + python_exec = 'python print_size.py' + sock.write_message(python_exec) + msg = '' + while '.py' not in msg: + msg = yield sock.read_message() + sock.write_message(LINE_END) + + expected_size = '(73, 23)' + msg = '' + while expected_size not in msg: + msg = yield sock.read_message() + self.assertIn(expected_size, msg) diff --git a/spyder_terminal/server/websockets/term_ws.py b/spyder_terminal/server/websockets/term_ws.py index 7d3b1530..09ab9b3c 100644 --- a/spyder_terminal/server/websockets/term_ws.py +++ b/spyder_terminal/server/websockets/term_ws.py @@ -9,7 +9,11 @@ class MainSocket(tornado.websocket.WebSocketHandler): """Handles long polling communication between xterm.js and server.""" - def open(self, pid, *args, **kwargs): + def initialize(self, close_future=None): + """Base class initialization.""" + self.close_future = close_future + + def open(self, pid): """Open a Websocket associated to a console.""" print("WebSocket opened") print(pid) @@ -22,6 +26,8 @@ def on_close(self): print('TTY Off!') print("WebSocket closed") self.application.term_manager.stop_term(self.pid) + if self.close_future is not None: + self.close_future.set_result(("Done!")) def on_message(self, message): """Execute a command on console."""