Skip to content

Commit

Permalink
Full coverage for the browser components.
Browse files Browse the repository at this point in the history
Notably this means dropping the custom BasicAuthTransport for XMLRPC
in synchronize.py because it relied on classes that don't exist in
Python 3. But the built-in transports handle Basic Auth if the info is
encoded in the URL, so do that instead.
  • Loading branch information
jamadden committed May 24, 2017
1 parent d69875e commit 71dcce0
Show file tree
Hide file tree
Showing 10 changed files with 370 additions and 108 deletions.
3 changes: 3 additions & 0 deletions CHANGES.rst
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ CHANGES

- Drop test dependency on ``zope.app.testing``.

- The synchronization view now uses Python's built-in transport for
handling Basic Authentication. This means that the username and
password should not contain a ``:``.

3.6.4 (2012-12-14)
------------------
Expand Down
22 changes: 11 additions & 11 deletions src/zope/app/i18n/browser/configure.zcml
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
<zope:configure
<zope:configure
xmlns:zope="http://namespaces.zope.org/zope"
xmlns="http://namespaces.zope.org/browser">

<pages
permission="zope.ManageServices"
<pages
permission="zope.ManageServices"
for="zope.i18n.interfaces.ITranslationDomain"
class=".translate.Translate">

<page name="translate.html" template="translate.pt"
<page name="translate.html" template="translate.pt"
menu="zmi_views" title="Translate" />
<page name="translateMessage.html" template="translatemessage.pt" />
<page name="editMessages.html" attribute="editMessages" />
Expand All @@ -20,11 +20,11 @@
</pages>


<pages
permission="zope.ManageServices"
<pages
permission="zope.ManageServices"
for="zope.i18n.interfaces.ITranslationDomain"
class=".exportimport.ExportImport">

<page name="exportImportForm.html" template="exportimport.pt"
menu="zmi_views" title="Import/Export" />
<page name="export.html" attribute="exportMessages" />
Expand All @@ -34,10 +34,10 @@


<pages
permission="zope.ManageServices"
permission="zope.ManageServices"
for="zope.i18n.interfaces.ITranslationDomain"
class=".synchronize.Synchronize">

<page name="synchronizeForm.html" template="synchronize.pt"
menu="zmi_views" title="Synchronize" />
<page name="synchronize.html" attribute="synchronize" />
Expand All @@ -58,8 +58,8 @@
permission="zope.ManageServices"
/>

<icon
name="zmi_icon"
<icon
name="zmi_icon"
for="zope.i18n.interfaces.ITranslationDomain"
file="./i18n_domain.gif" />

Expand Down
6 changes: 3 additions & 3 deletions src/zope/app/i18n/browser/exportimport.pt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<div>
<select name="languages:list" size="3" style="width: 80%"
multiple="multiple">
<option value=""
<option value=""
tal:attributes="value language"
tal:content="language"
tal:repeat="language view/getAllLanguages"></option>
Expand All @@ -35,9 +35,9 @@
<input type="file" name="file" size="20" value="" />
</div>
<div>
<input type="submit" name="@@import.html:method" value="Import"
<input type="submit" name="@@import.html:method" value="Import"
i18n:attributes="value import-button"/>
<input type="submit" name="@@export.html:method" value="Export"
<input type="submit" name="@@export.html:method" value="Export"
i18n:attributes="value export-button"/>
</div>
</td>
Expand Down
113 changes: 36 additions & 77 deletions src/zope/app/i18n/browser/synchronize.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,14 @@

try:
from urllib import unquote
from urllib import quote
from urlparse import urlparse
from urlparse import urlunparse
except ImportError:
from urllib.parse import unquote
from urllib.parse import quote
from urllib.parse import urlparse
from urllib.parse import urlunparse

from base64 import encodestring

Expand All @@ -49,7 +55,8 @@ class Synchronize(BaseView):
_('Newer Local'), _('Does not exist')]

def __init__(self, context, request):
super(Synchronize, self).__init__(context, request)
self.context = context
self.request = request

self.sync_url = self.request.cookies.get(
'sync_url', DEFAULT)
Expand All @@ -60,25 +67,32 @@ def __init__(self, context, request):
'sync_languages', '').split(','))
self._connection = None

def _make_sync_url(self):
# make sure the URL contains the http:// prefix
url = self.sync_url
if not url.startswith(('http://', 'https://')):
url = 'http://' + url

# Add username and password to the url.
parts = urlparse(url)
if '@' not in parts.netloc:
parts = list(parts)
parts[1] = self.sync_username + ':' + self.sync_password + '@' + parts[1]
url = urlunparse(parts)

return url

def _connect(self):
'''Connect to the remote server via XML-RPC HTTP; return status'''
# make sure the URL contains the http:// prefix
if not self.sync_url.startswith('http://'):
url = 'http://' + self.sync_url
else:
url = self.sync_url

# Now try to connect
self._connection = Server(url,
transport=BasicAuthTransport(self.sync_username,
self.sync_password))
self._connection = Server(self._make_sync_url())

# check whether the connection was made and the Master Babel Tower
# exists
try:
self._connection.getAllLanguages()
return 1
return 1 # pragma: no cover
except Exception:
self._connection = None
return 0
Expand All @@ -91,22 +105,15 @@ def _isConnected(self):
'''Check whether we are currently connected to the server; return
boolean'''

if self._connection is not None and self._connection.getAllLanguages():
return True
return False

return bool(self._connection is not None and self._connection.getAllLanguages())

def canConnect(self):
'''Checks whether we can connect using this server and user data;
return boolean'''
if self._isConnected():
return 1

try:
return self._connect()
except Exception:
return 0
return True

return self._connect()

def getAllLanguages(self):
connected = self._isConnected()
Expand All @@ -117,24 +124,21 @@ def getAllLanguages(self):
return []



def queryMessages(self):
connected = self._isConnected()
if not connected: connected = self._connect()

fmsgs = []
if connected:
fmsgs = self._connection.getMessagesFor(self.sync_languages)
else:
fmsgs = []

return self.context.getMessagesMapping(self.sync_languages,
fmsgs)

def queryMessageItems(self):
items = self.queryMessages().items()
items = removeSecurityProxy(items)
items.sort(key=lambda x: x[0][0] + x[0][1])
return items
return sorted(items, key=lambda x: x[0][0] + x[0][1])

def getStatus(self, fmsg, lmsg, verbose=1):
state = 0
Expand All @@ -149,24 +153,21 @@ def getStatus(self, fmsg, lmsg, verbose=1):
elif fmsg['mod_time'] == lmsg['mod_time']:
state = 0

if verbose:
return self.messageStatus[state]
return state

return self.messageStatus[state] if verbose else state

def saveSettings(self):
self.sync_languages = self.request.form.get('sync_languages', [])
self.request.response.setCookie('sync_languages',
','.join(self.sync_languages))
','.join(self.sync_languages))
self.request.response.setCookie('sync_url',
urllib.quote(self.request['sync_url']).strip())
quote(self.request['sync_url']).strip())
self.request.response.setCookie('sync_username',
self.request['sync_username'])
self.request['sync_username'])
self.request.response.setCookie('sync_password',
self.request['sync_password'])
self.request['sync_password'])

return self.request.response.redirect(self.request.URL[-1]+
'/@@synchronizeForm.html')
return self.request.response.redirect(self.request.URL[-1] +
'/@@synchronizeForm.html')


def synchronize(self):
Expand All @@ -192,45 +193,3 @@ def synchronizeMessages(self):
self.context.synchronize(new_mapping)
return self.request.response.redirect(self.request.URL[-1]+
'/@@synchronizeForm.html')



class BasicAuthTransport(Transport):
def __init__(self, username=None, password=None, verbose=0):
self.username=username
self.password=password
self.verbose=verbose

def request(self, host, handler, request_body, verbose=0):
# issue XML-RPC request

self.verbose = verbose

h = httplib.HTTP(host)
h.putrequest("POST", handler)

# required by HTTP/1.1
h.putheader("Host", host)

# required by XML-RPC
h.putheader("User-Agent", self.user_agent)
h.putheader("Content-Type", "text/xml")
h.putheader("Content-Length", str(len(request_body)))

# basic auth
if self.username is not None and self.password is not None:
h.putheader("AUTHORIZATION", "Basic %s" %
encodestring("%s:%s" % (self.username, self.password)
).replace("\012", ""))
h.endheaders()

if request_body:
h.send(request_body)

errcode, errmsg, headers = h.getreply()

if errcode != 200:
raise ProtocolError(host + handler,
errcode, errmsg, headers)

return self.parse_response(h.getfile())
92 changes: 92 additions & 0 deletions src/zope/app/i18n/browser/tests/test_exportimport.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
##############################################################################
#
# Copyright (c) 2017 Zope Foundation and Contributors.
# All Rights Reserved.
#
# This software is subject to the provisions of the Zope Public License,
# Version 2.1 (ZPL). A copy of the ZPL should accompany this distribution.
# THIS SOFTWARE IS PROVIDED "AS IS" AND ANY AND ALL EXPRESS OR IMPLIED
# WARRANTIES ARE DISCLAIMED, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
# WARRANTIES OF TITLE, MERCHANTABILITY, AGAINST INFRINGEMENT, AND FITNESS
# FOR A PARTICULAR PURPOSE.
#
##############################################################################
"""This module tests the Gettext Export and Import funciotnality of the
Translation Domain.
"""
import unittest
import time
from io import BytesIO

from zope.component.testlayer import ZCMLFileLayer

from zope.app.i18n.translationdomain import TranslationDomain
from zope.app.i18n.browser.exportimport import ExportImport
import zope.app.i18n.tests
from zope.publisher.browser import TestRequest


class TestExportImport(unittest.TestCase):

layer = ZCMLFileLayer(zope.app.i18n.tests)

data = b'''
msgid ""
msgstr ""
"Project-Id-Version: Zope 3\\n"
"PO-Revision-Date: %s\\n"
"Last-Translator: Zope 3 Gettext Export Filter\\n"
"Zope-Language: de\\n"
"Zope-Domain: default\\n"
"MIME-Version: 1.0\\n"
"Content-Type: text/plain; charset=UTF-8\\n"
"Content-Transfer-Encoding: 8bit\\n"
msgid "Choose"
# comment
msgstr "Ausw\xc3\xa4hlen!"
# comment
# comment
msgid "greeting"
msgstr "hallo"
# comment
'''

def setUp(self):
super(TestExportImport, self).setUp()

self._domain = TranslationDomain()
self._domain.domain = 'default'

def testImportExport(self):
view = ExportImport()
view.context = self._domain
view.request = TestRequest()
view.request.getURL = lambda _x: 'url'

# Insert some extra lines and comments for the parser to skip
import_data = b'\n\n'.join(self.data.split(b'\n'))

view.importMessages(['de'],
BytesIO(import_data % b'2002/02/02 02:02'))

result = view.exportMessages('de')

dt = time.time()
dt = time.localtime(dt)
dt = time.strftime('%Y/%m/%d %H:%M', dt)
if not isinstance(dt, bytes):
dt = dt.encode("utf-8")

expected = self.data.replace(b'# comment\n', b'') % dt
self.assertEqual(result.strip(), expected.strip())

def test_suite():
return unittest.defaultTestLoader.loadTestsFromName(__name__)


if __name__ == '__main__':
unittest.main(defaultTest=test_suite)
Loading

0 comments on commit 71dcce0

Please sign in to comment.