Permalink
Browse files

It works!

  • Loading branch information...
1 parent 8ed28f4 commit 10353f158b13201f589126b3977f8b008715accd @mricon mricon committed Feb 28, 2012
Showing with 532 additions and 0 deletions.
  1. +193 −0 test.py
  2. +10 −0 test/secrets/invalid.totp
  3. +10 −0 test/secrets/valid.totp
  4. +67 −0 totp.fcgi
  5. BIN totpcgi/.__init__.py.swp
  6. +252 −0 totpcgi/__init__.py
  7. BIN totpcgi/__init__.pyc
View
193 test.py
@@ -0,0 +1,193 @@
+#!/usr/bin/python -tt
+import unittest
+import totpcgi
+import pyotp
+import time
+import json
+import sys
+import os
+
+secrets_dir = 'test/secrets'
+status_dir = 'test/status'
+
+class GATest(unittest.TestCase):
+
+ def cleanStatus(self, user='valid'):
+ status_file = os.path.join(status_dir, '%s.json' % user)
+ if os.access(status_file, os.R_OK):
+ os.unlink(status_file)
+
+ def setCustomStatus(self, status, user='valid'):
+ status_file = os.path.join(status_dir, '%s.json' % user)
+ fh = open(status_file, 'w')
+ json.dump(status, fh, indent=4)
+ fh.close()
+
+ def setUp(self):
+ # Remove any existing status files for user "valid"
+ self.cleanStatus()
+
+ def tearDown(self):
+ self.cleanStatus()
+
+ def getValidUser(self):
+ return totpcgi.GAUser('valid', secrets_dir, status_dir)
+
+ def testValidSecretParsing(self):
+ gau = self.getValidUser()
+
+ self.assertEqual(gau.totp.secret, 'VN7J5UVLZEP7ZAGM',
+ 'Secret read from valid.totp did not match')
+ self.assertEqual(gau.user, 'valid',
+ 'User did not match')
+ self.assertEqual(gau.rate_limit, (4, 40),
+ 'RATE_LIMIT did not parse correctly')
+ self.assertEqual(gau.window_size, 18,
+ 'WINDOW_SIZE did not parse correctly')
+
+ scratch_tokens = [88709766,11488461,27893432,60474774,10449492]
+
+ self.assertItemsEqual(scratch_tokens, gau.scratch_tokens)
+
+ def testInvalidSecretParsing(self):
+ with self.assertRaises(totpcgi.UserFileError):
+ totpcgi.GAUser('invalid', secrets_dir, status_dir)
+
+ def testInvalidUsername(self):
+ with self.assertRaisesRegexp(totpcgi.VerifyFailed,
+ 'invalid characters'):
+ gau = totpcgi.GAUser('../../etc/passwd', secrets_dir, status_dir)
+
+ def testNonExistentValidUser(self):
+ with self.assertRaises(totpcgi.UserNotFound):
+ gau = totpcgi.GAUser('bob@example.com', secrets_dir, status_dir)
+
+ def testValidToken(self):
+ gau = self.getValidUser()
+ totp = pyotp.TOTP(gau.totp.secret)
+ token = totp.now()
+ self.assertEqual(gau.verify_token(token), 'Valid token used')
+
+ # try using it again
+ with self.assertRaisesRegexp(totpcgi.VerifyFailed, 'been used once'):
+ gau.verify_token(token)
+
+ # and again, to make sure it is preserved in status
+ with self.assertRaisesRegexp(totpcgi.VerifyFailed, 'been used once'):
+ gau.verify_token(token)
+
+ def testWindowSize(self):
+ gau = self.getValidUser()
+ totp = pyotp.TOTP(gau.totp.secret)
+ # get a token from 60 seconds ago
+ past_token = totp.at(int(time.time())-60)
+ future_token = totp.at(int(time.time())+60)
+
+ # this should fail
+ gau.window_size = 0
+ with self.assertRaisesRegexp(totpcgi.VerifyFailed, 'Not a valid token'):
+ gau.verify_token(past_token)
+ with self.assertRaisesRegexp(totpcgi.VerifyFailed, 'Not a valid token'):
+ gau.verify_token(future_token)
+
+ # this should work
+ gau.window_size = 10
+ self.assertEqual(gau.verify_token(past_token),
+ 'Valid token within window size used')
+ self.assertEqual(gau.verify_token(future_token),
+ 'Valid token within window size used')
+
+ def testRateLimit(self):
+ gau = self.getValidUser()
+
+ # just in case the lightning strikes at that very number
+ if gau.now_token == 555555:
+ token = '555556'
+ else:
+ token = '555555'
+
+ # We now fail 4 times consecutively
+ with self.assertRaisesRegexp(totpcgi.VerifyFailed, 'Not a valid token'):
+ gau.verify_token(token)
+ with self.assertRaisesRegexp(totpcgi.VerifyFailed, 'Not a valid token'):
+ gau.verify_token(token)
+ with self.assertRaisesRegexp(totpcgi.VerifyFailed, 'Not a valid token'):
+ gau.verify_token(token)
+ with self.assertRaisesRegexp(totpcgi.VerifyFailed, 'Not a valid token'):
+ gau.verify_token(token)
+
+ # We should now get a rate-limited error
+ with self.assertRaisesRegexp(totpcgi.VerifyFailed, 'Rate-limit'):
+ gau.verify_token(token)
+
+ # Same with a valid token
+ with self.assertRaisesRegexp(totpcgi.VerifyFailed, 'Rate-limit'):
+ gau.verify_token(gau.now_token)
+
+ # Make sure we recover from rate-limiting correctly
+ old_timestamp = gau.now_timestamp-(31+(gau.rate_limit[1]*10))
+ status = {
+ 'fail_timestamps': [
+ old_timestamp,
+ old_timestamp,
+ old_timestamp,
+ old_timestamp
+ ],
+ 'success_timestamps': [],
+ 'used_scratch_tokens': []
+ }
+ self.setCustomStatus(status)
+
+ with self.assertRaisesRegexp(totpcgi.VerifyFailed, 'Not a valid token'):
+ gau.verify_token(token)
+
+ # Valid token should work, too
+ self.setCustomStatus(status)
+ self.assertEqual(gau.verify_token(gau.now_token), 'Valid token used')
+
+ def testInvalidToken(self):
+ gau = self.getValidUser()
+ # just in case the lightning strikes at that very number
+ if gau.now_token == 555555:
+ token = '555556'
+ else:
+ token = '555555'
+
+ with self.assertRaisesRegexp(totpcgi.VerifyFailed, 'Not a valid token'):
+ gau.verify_token(token)
+
+ with self.assertRaisesRegexp(totpcgi.VerifyFailed, 'too long'):
+ self.cleanStatus()
+ gau.verify_token('12345678910')
+
+ with self.assertRaisesRegexp(totpcgi.VerifyFailed, 'not an integer'):
+ self.cleanStatus()
+ gau.verify_token('WAKKA')
+
+ with self.assertRaisesRegexp(totpcgi.VerifyFailed,
+ 'Not a valid scratch-token'):
+ gau.verify_token('11112222')
+
+ def testScratchTokens(self):
+ gau = self.getValidUser()
+
+ ret = gau.verify_token('88709766')
+ self.assertEqual(ret, 'Scratch-token used')
+
+ # try using it again
+ with self.assertRaisesRegexp(totpcgi.VerifyFailed,
+ 'Scratch-token already used once'):
+ gau.verify_token('88709766')
+
+ # try using another token
+ ret = gau.verify_token('11488461')
+ self.assertEqual(ret, 'Scratch-token used')
+
+ # use first one again to make sure it's preserved in the status file
+ with self.assertRaisesRegexp(totpcgi.VerifyFailed,
+ 'Scratch-token already used once'):
+ gau.verify_token('88709766')
+
+
+if __name__ == '__main__':
+ unittest.main()
View
@@ -0,0 +1,10 @@
+WAKKAWAKKA
+" RATE_LIMIT 3 30
+" WINDOW_SIZE 17
+" DISALLOW_REUSE
+" TOTP_AUTH
+88709766
+11488461
+27893432
+60474774
+10449492
View
@@ -0,0 +1,10 @@
+VN7J5UVLZEP7ZAGM
+" RATE_LIMIT 4 40
+" WINDOW_SIZE 18
+" DISALLOW_REUSE
+" TOTP_AUTH
+88709766
+11488461
+27893432
+60474774
+10449492
View
@@ -0,0 +1,67 @@
+#!/usr/bin/python -tt
+from flup.server import fcgi
+from cgi import parse_qs
+import syslog
+import logging
+import ConfigParser
+
+import totpcgi
+
+SECRETS_DIR = '/var/lib/totpcgi/secrets'
+STATUS_DIR = '/var/lib/totpcgi/status'
+
+syslog.openlog('totp.fcgi', logoption=syslog.LOG_PID, facility=syslog.LOG_AUTH)
+
+def bad_request(start_response, why):
+ output = 'ERR\n' + why + '\n'
+ start_response('400 BAD REQUEST', [('Content-Type', 'text/plain'),
+ ('Content-Length', str(len(output)))])
+ return output
+
+def webapp(environ, start_response):
+ if environ['REQUEST_METHOD'] != 'POST':
+ return bad_request(start_response, "Missing post data")
+
+ rq_len = int(environ.get('CONTENT_LENGTH', 0))
+ rq_data = environ['wsgi.input'].read(rq_len)
+
+ form = parse_qs(rq_data)
+
+ must_keys = ('user', 'token', 'mode')
+
+ for must_key in must_keys:
+ if must_key not in form.keys():
+ return bad_request(start_response, "Missing field: %s" % must_key)
+
+ user = form['user'][0]
+ token = form['token'][0]
+ mode = form['mode'][0]
+
+ remote_host = environ.get('REMOTE_ADDR')
+
+ if mode != 'PAM_SM_AUTH':
+ return bad_request(start_response, "We only support PAM_SM_AUTH")
+
+ ga = totpcgi.GoogleAuthenticator(SECRETS_DIR, STATUS_DIR)
+
+ try:
+ status = ga.verify_user_token(user, token)
+ except Exception, ex:
+ syslog.syslog(syslog.LOG_NOTICE,
+ 'Failure: user=%s, mode=%s, host=%s, message=%s' % (user, mode,
+ remote_host, ex.message))
+ return bad_request(start_response, ex.message)
+
+ syslog.syslog(syslog.LOG_NOTICE,
+ 'Success: user=%s, mode=%s, host=%s, message=%s' % (user, mode,
+ remote_host, status))
+
+ start_response('200 OK', [('Content-type', 'text/plain'),
+ ('Content-Length', str(len(status)))])
+ #
+ # pam_url wants to see a PSK
+ return status
+
+
+
+fcgi.WSGIServer(webapp).run()
View
Binary file not shown.
Oops, something went wrong.

0 comments on commit 10353f1

Please sign in to comment.