Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Verify TLS certificate during XMPP connection. #1084

Closed
Changes from 1 commit
Commits
File filter...
Filter file types
Jump to…
Jump to file or symbol
Failed to load files and symbols.

Always

Just for now

Next

Verify TLS certificate during XMPP connection.

A new "check_certificate" attribute has been introduced in t.w.p.j.xmlstream.TLSInitiatingInitializer.
A new "check_certificate" init argument has been introduced in
t.w.p.j.client.BasicAuthenticator and t.w.p.j.client.XMPPAuthenticator.
  • Loading branch information...
goffi-contrib committed Nov 6, 2018
commit adbe5db7334885a8dd4a12ce3f2d8bbda4c72635
@@ -0,0 +1,2 @@
t.w.p.j.xmlstream.TLSInitiatingInitializer now has a "check_certificate" attribute (default to True) to reject connection if server certificate is invalid.
t.w.p.j.client.BasicAuthenticator and t.w.p.j.client.XMPPAuthenticator now have a "check_certificate" init argument.
@@ -180,6 +180,9 @@ class BasicAuthenticator(xmlstream.ConnectAuthenticator):
errors will be generated (again).
@ivar check_certificate: indicates if TLS certificate must be verified
@type check_certificate: C{bool}
@cvar INVALID_USER_EVENT: See L{IQAuthInitializer.INVALID_USER_EVENT}.
@type INVALID_USER_EVENT: L{str}
@@ -197,21 +200,25 @@ class BasicAuthenticator(xmlstream.ConnectAuthenticator):
AUTH_FAILED_EVENT = IQAuthInitializer.AUTH_FAILED_EVENT
REGISTER_FAILED_EVENT = "//event/client/basicauth/registerfailed"

def __init__(self, jid, password):
def __init__(self, jid, password, check_certificate=True):
xmlstream.ConnectAuthenticator.__init__(self, jid.host)
self.jid = jid
self.password = password
self.check_certificate = check_certificate

def associateWithStream(self, xs):
xs.version = (0, 0)
xmlstream.ConnectAuthenticator.associateWithStream(self, xs)

inits = [ (xmlstream.TLSInitiatingInitializer, False),
(IQAuthInitializer, True),
]
inits = [(xmlstream.TLSInitiatingInitializer, False,
{"check_certificate": self.check_certificate}),
(IQAuthInitializer, True, {}),
]

for initClass, required in inits:
for initClass, required, attribs in inits:
init = initClass(xs)
for attrib, value in attribs.items():
setattr(init, attrib, value)
init.required = required
xs.initializers.append(init)

@@ -302,7 +309,7 @@ def start(self):



def XMPPClientFactory(jid, password):
def XMPPClientFactory(jid, password, check_certificate=True):
"""
Client factory for XMPP 1.0 (only).
@@ -316,10 +323,12 @@ def XMPPClientFactory(jid, password):
@type jid: L{jid.JID}
@param password: password to authenticate with.
@type password: L{unicode}
@param check_certificate: indicates if TLS certificate must be verified
@type check_certificate: L{bool}
@return: XML stream factory.
@rtype: L{xmlstream.XmlStreamFactory}
"""
a = XMPPAuthenticator(jid, password)
a = XMPPAuthenticator(jid, password, check_certificate)
return xmlstream.XmlStreamFactory(a)


@@ -356,14 +365,17 @@ class XMPPAuthenticator(xmlstream.ConnectAuthenticator):
@type jid: L{jid.JID}
@ivar password: password to be used during SASL authentication.
@type password: L{unicode}
@ivar check_certificate: indicates if TLS certificate must be verified
@type check_certificate: C{bool}
"""

namespace = 'jabber:client'

def __init__(self, jid, password):
def __init__(self, jid, password, check_certificate=True):
xmlstream.ConnectAuthenticator.__init__(self, jid.host)
self.jid = jid
self.password = password
self.check_certificate = check_certificate


def associateWithStream(self, xs):
@@ -378,13 +390,16 @@ def associateWithStream(self, xs):
xmlstream.ConnectAuthenticator.associateWithStream(self, xs)

xs.initializers = [CheckVersionInitializer(xs)]
inits = [ (xmlstream.TLSInitiatingInitializer, False),
(sasl.SASLInitiatingInitializer, True),
(BindInitializer, False),
(SessionInitializer, False),
]

for initClass, required in inits:
inits = [(xmlstream.TLSInitiatingInitializer, False,
{"check_certificate": self.check_certificate}),
(sasl.SASLInitiatingInitializer, True, {}),
(BindInitializer, False, {}),
(SessionInitializer, False, {}),
]

for initClass, required, attribs in inits:
init = initClass(xs)
for attrib, value in attribs.items():
setattr(init, attrib, value)
init.required = required
xs.initializers.append(init)
@@ -402,19 +402,22 @@ class TLSInitiatingInitializer(BaseFeatureInitiatingInitializer):
@cvar wanted: indicates if TLS negotiation is wanted.
@type wanted: C{bool}
@cvar check_certificate: indicates if TLS certificate must be verified
@type check_certificate: C{bool}
"""

feature = (NS_XMPP_TLS, 'starttls')
wanted = True
check_certificate = True
_deferred = None

def onProceed(self, obj):
"""
Proceed with TLS negotiation and reset the XML stream.
"""

self.xmlstream.removeObserver('/failure', self.onFailure)
ctx = ssl.CertificateOptions()
trustRoot = ssl.platformTrust() if self.check_certificate else None
ctx = ssl.CertificateOptions(trustRoot=trustRoot)
self.xmlstream.transport.startTLS(ctx)
self.xmlstream.reset()
self.xmlstream.sendHeader()
@@ -413,6 +413,13 @@ def testBasic(self):
self.assertIsInstance(session, client.SessionInitializer)

self.assertFalse(tls.required)
self.assertTrue(tls.check_certificate)
self.assertTrue(sasl.required)
self.assertFalse(bind.required)
self.assertFalse(session.required)

# we now check that check_certificate can be disabled
xs = client.XMPPClientFactory(self.client_jid,
'secret', False).buildProtocol(None)
_, tls, _, _, _ = xs.initializers
self.assertFalse(tls.check_certificate)
@@ -759,6 +759,45 @@ def testNotWantedNotRequired(self):
return d


def _assertCheckCertificate(self, ctx):
self.assertTrue(ctx.verify)
self.assertIsNotNone(ctx.trustRoot)
self.done.append('TLS')


def testCheckCertificate(self):
"""
Check that check_certificate enable certificate validation
"""
self.xmlstream.transport = proto_helpers.StringTransport()
self.xmlstream.transport.startTLS = self._assertCheckCertificate
self.xmlstream.reset = lambda: self.done.append('reset')
self.xmlstream.sendHeader = lambda: self.done.append('header')
d = self.init.start()
self.xmlstream.dataReceived("<proceed xmlns='%s'/>" % NS_XMPP_TLS)
return d


def _assertNoCheckCertificate(self, ctx):
self.assertFalse(ctx.verify)
self.assertIsNone(ctx.trustRoot)
self.done.append('TLS')


def testNoCheckCertificate(self):
"""
Check that check_certificate can be disabled
"""
self.xmlstream.transport = proto_helpers.StringTransport()
self.xmlstream.transport.startTLS = self._assertNoCheckCertificate
self.xmlstream.reset = lambda: self.done.append('reset')
self.xmlstream.sendHeader = lambda: self.done.append('header')
self.init.check_certificate = False
d = self.init.start()
self.xmlstream.dataReceived("<proceed xmlns='%s'/>" % NS_XMPP_TLS)
return d


def testFailed(self):
"""
Test failed TLS negotiation.
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.