From 557e2416acedae581b8683f2070e808f266047a8 Mon Sep 17 00:00:00 2001 From: "Jeremy R. Gray" Date: Fri, 20 Sep 2013 23:13:30 -0400 Subject: [PATCH] TESTS --- pyfilesec/__init__.py | 173 +++++++++++++-------------------------- pyfilesec/constants.py | 7 +- tests/openssl_version_97 | 1 + tests/test_pyfilesec.py | 56 ++++++++++--- 4 files changed, 109 insertions(+), 128 deletions(-) diff --git a/pyfilesec/__init__.py b/pyfilesec/__init__.py index ca24bbf..57602b1 100755 --- a/pyfilesec/__init__.py +++ b/pyfilesec/__init__.py @@ -163,28 +163,12 @@ def register(self, new_functions, test_keys=None): sf = SecFile(test_file, codec=test_co) sf.encrypt(keep=True, **enc_kwargs) sf.decrypt(**dec_kwargs) - except EncryptError: - fatal('Codec registration: Encrypt test failed', - CodecRegistryError) - except DecryptError: - fatal('Codec registration: Decrypt test failed', - CodecRegistryError) finally: - try: - os.unlink(test_file) - except: - pass - try: - if sf.file: - recovered = open(sf.file, 'rb').read() - os.unlink(sf.file) - except: - pass - try: - assert recovered == test_datum.str - except: - fatal('Codec registration: enc-dec self-test failed', - CodecRegistryError) + os.unlink(test_file) + if sf.file: + recovered = open(sf.file, 'rb').read() + os.unlink(sf.file) + assert recovered == test_datum.str and 'Codec reg: enc-dec failed' for key, fxn in list(new_functions.items()): try: @@ -402,6 +386,7 @@ def load_metadata(self): """ if hasattr(self, 'meta') and self.meta: return json.load(open(self.meta, 'rb')) + return NO_META_DATA @property def metadataf(self): @@ -513,7 +498,7 @@ def is_tracked(self): self._get_git_info(self.file), self._get_hg_info(self.file)]) - def _get_git_info(self, path): + def _get_git_info(self, path, git='git'): """Report whether a directory or file is tracked in a git repo. Can test any generic filename, not just current file:: @@ -527,11 +512,11 @@ def _get_git_info(self, path): if not path or not exists(path): return False try: - sys_call(['git']) + sys_call([git]) except OSError: # no git, not call-able return False - cmd = ['git', 'ls-files', abspath(path)] + cmd = [git, 'ls-files', abspath(path)] is_tracked = bool(sys_call(cmd, ignore_error=True)) logging.debug('path %s tracked in git repo: %s' % (path, is_tracked)) @@ -1080,8 +1065,7 @@ def decrypt(self, priv=None, pphr=None, keep_meta=False, keep_enc=False, return self - def rotate(self, pub=None, priv=None, pphr=None, - hmac_key=None, pad=None, keep_meta=True): + def rotate(self, pub=None, priv=None, pphr=None, hmac_key=None, pad=None): """Swap old encryption for new: decrypt-then-re-encrypt. Conceptually there are three separate steps: decrypt with ``priv`` @@ -1109,8 +1093,6 @@ def rotate(self, pub=None, priv=None, pphr=None, passphrase for the private key (as a string, or filename) `pub` : path to the new public key to be used for the new encryption. - `keep_meta` : - if False, unlink the meta file after decrypt `hmac_key` : key (string) to use for an HMAC to be saved in the meta-data """ @@ -1137,36 +1119,21 @@ def rotate(self, pub=None, priv=None, pphr=None, self.set_file(sf.file) # decrypted file name logging.debug('rotate self.meta = %s' % self.meta) - if isfile(self.meta): - try: - md = self.load_metadata() - logging.debug(name + ': read metadata from file') - except: - logging.error(name + ': failed to read metadata') - md = self.NO_META_DATA - else: - logging.debug(name + ': self.meta no such file') - md = self.NO_META_DATA + md = self.load_metadata() # get NO_META_DATA if missing pad = max(0, pad) # disallow -1, don't want errors mid-rotate - if pad is not None and pad > -1: + if pad == 0 or self._ok_to_pad(pad): SecFile(self.file).pad(pad) file_dec = self.file # track file names so can destroy if needed sf = SecFile(self.file).encrypt(pub=enc_rsakeys.pub, date=True, meta=md, keep=False, hmac_key=hmac_key) self.set_file(sf.file) # newly encrypted file name - except FileNotEncryptedError: - fatal(name + ': not given an encrypted file', - FileNotEncryptedError) finally: # generally rotate must not leave any decrypted stuff. exception: # decrypt, destroy orig.enc, *then* get exception --> now what? # unlikely situation: require_keys(pub) before starting, if dec # works then directory write permissions are ok - if file_dec and isfile(file_dec): - SecFile(file_dec).destroy() - if hasattr(self, 'meta') and isfile(self.meta): - os.unlink(self.meta) # not sensitive + file_dec and isfile(file_dec) and SecFile(file_dec).destroy() unset_umask() self.result.update({'file': self.file, @@ -1290,24 +1257,19 @@ def destroy(self): cmd_Destroy = (DESTROY_EXE,) + DESTROY_OPTS + (target_file,) good_sys_call = False - try: - __, err = sys_call(cmd_Destroy, stderr=True) - good_sys_call = not err - # mac srm will warn about multiple links via stderr -> disp unknown - except OSError as e: - logging.warning(name + ': %s, %s' % (e, ' '.join(cmd_Destroy))) + __, err = sys_call(cmd_Destroy, stderr=True) + good_sys_call = not err + # mac srm will warn about multiple links via stderr -> disp unknown disposition = pfs_UNKNOWN if not isfile(target_file): - disposition = (pfs_UNKNOWN, pfs_DESTROYED)[good_sys_call] + if good_sys_call: + disposition = pfs_DESTROYED else: logging.error(name + ': falling through to trying 1 pass of zeros') with open(target_file, 'wb') as fd: fd.write(chr(0) * getsize(target_file)) - shutil.rmtree(target_file, ignore_errors=True) # should be gone - if isfile(target_file): # yikes, its an undead file - fatal(name + ': %s remains after destroy()' % (target_file, - DestroyError)) + shutil.rmtree(target_file) duration = round(get_time() - destroy_t0, 4) self.reset() # clear everything, including self.result @@ -1427,8 +1389,7 @@ def _check(self): if f.name[0] in ['.', os.sep] or f.name[1:3] == ':\\'] tar.close() if badNames: - fatal('bad/dubious internal file names', - SecFileArchiveFormatError) + fatal('bad/dubious internal file names', SecFileArchiveFormatError) return True def unpack(self): @@ -1466,12 +1427,10 @@ def unpack(self): elif fname.endswith(META_EXT): self.meta = os.path.join(tmp_dir, fname) else: - fatal(name + ': unexpected file in cipher_text archive', - SecFileArchiveFormatError) - + logging.error(name + ': unexpected file %s in archive' % fname) + # seems better to allow unpack to proceed anyway, eg, to rotate if not all([self.data_aes, self.pwd_rsa, self.meta]): - logging.warn('did not find 3 files in archive %s' % - self.name) + logging.error('did not find 3 files in archive %s' % self.name) unset_umask() return self.data_aes, self.pwd_rsa, self.meta @@ -1528,14 +1487,6 @@ def test(self): """ self.require(req=NEED_PUB | NEED_PRIV) pubk, pub_bits = self.sniff(self.pub) - if pubk != 'pub': - fatal('RsaKeys test: pub is not a public key', PublicKeyError) - privk, enc = self.sniff(self.priv) - if privk != 'priv': - fatal('RsaKeys test: priv is not a private key', PrivateKeyError) - if enc != bool(self.pphr): - fatal('RsaKeys test: no passphrase for encrypted private key', - PassphraseError) # compare pub against pub as extracted from priv: cmdEXTpub = [OPENSSL, 'rsa', '-in', self.priv, '-pubout'] @@ -1546,9 +1497,11 @@ def test(self): if test_pub not in open(self.pub, 'rb').read(): fatal('public key not paired with private key', PublicKeyError) - if pub_bits < RSA_MODULUS_MIN: - fatal('public key too short; no real security below 1024 bits', - PublicKeyTooShortError) + # .update() will detect and fail before we get here, so use assert: + #if pub_bits < RSA_MODULUS_MIN: + # fatal('public key too short; no real security below 1024 bits', + # PublicKeyTooShortError) + assert pub_bits >= RSA_MODULUS_MIN if pub_bits < RSA_MODULUS_WARN: logging.warning('short RSA key') @@ -1586,13 +1539,12 @@ def sniff(self, key): return keytype, modulus if '-----BEGIN' in line and 'PRIVATE KEY-----' in line: keytype = 'priv' - if len(line) >= 64: - enc = False - return keytype, enc # hit end of header info + #if len(line) >= 64: + # enc = False + # return keytype, enc # hit end of header info if 'ENCRYPTED' in line and keytype == 'priv': - enc = True - return keytype, enc - return None, None + return keytype, True + return keytype, enc def require(self, req): """Raise error if key requirement(s) ``req`` are not met; assert-like. @@ -1709,10 +1661,9 @@ def check_entropy(self): # SecurityServer daemon is supposed to ensure entropy is available: ps = sys_call(['ps', '-e']) securityd = sys_call(['which', 'securityd']) # full path + e = 'securityd NOT running (= bad)' if securityd in ps: e = securityd + ' running (= good)' - else: - e = 'securityd NOT running (= bad)' rdrand = sys_call(['sysctl', 'hw.optional.rdrand']) e += '\n rdrand: ' + rdrand if rdrand == 'hw.optional.rdrand: 1': @@ -1720,8 +1671,6 @@ def check_entropy(self): elif sys.platform.startswith('linux'): avail = sys_call(['cat', '/proc/sys/kernel/random/entropy_avail']) e = 'entropy_avail: ' + avail - if int(avail) < 50: - e += ' (= not so much...)' return e def generate(self, pub='pub.pem', priv='priv.pem', pphr=None, bits=4096): @@ -1741,8 +1690,7 @@ def generate(self, pub='pub.pem', priv='priv.pem', pphr=None, bits=4096): sys_call(cmdGEN + [str(bits)], stdin=pphr) # Extract pub from priv: - cmdEXTpub = [OPENSSL, 'rsa', '-in', priv, - '-pubout', '-out', pub] + cmdEXTpub = [OPENSSL, 'rsa', '-in', priv, '-pubout', '-out', pub] if pphr: cmdEXTpub += ['-passin', 'stdin'] sys_call(cmdEXTpub, stdin=pphr) @@ -1752,9 +1700,8 @@ def generate(self, pub='pub.pem', priv='priv.pem', pphr=None, bits=4096): RsaKeys(pub=pub, priv=priv, pphr=pphr).test() except: msg = 'new RSA keys failed to validate; removing them' - logging.error(msg) self._cleanup(msg, priv, pub) - raise + fatal(msg, RuntimeError) return _abspath(pub), _abspath(priv) def _cleanup(self, msg, pub='', priv='', pphr=None): @@ -1850,30 +1797,29 @@ def dialog(self, interactive=True, args=None): if args.passfile: pphrout_msg = ' pphr = %s' % pphr_out print(pphrout_msg) + print('\nEnter a passphrase for the private key (16 or more chars)' + '\n or press to auto-generate a passphrase') pphr_auto = True + bits = RSA_BITS_DEFAULT if interactive: import _getpass - print('\nEnter a passphrase for the private key (16 or more chars)' - '\n or press to auto-generate a passphrase') try: pphr = SecStr(_getpass.getpass('Passphrase: ')) except ValueError: pass # hit return, == want auto-generate else: + if len(pphr.str) < 16: + return self._cleanup('\n > passphrase too short, exiting <') pphr_auto = False pphr2 = SecStr(_getpass.getpass('same again: ')) if pphr.str != pphr2.str: return self._cleanup(' > Passphrase mismatch. Exiting. <') pphr2.zero() + bits = int(input23('\nKey length (2048, 4096, 8192): [%d] ' % + RSA_BITS_DEFAULT)) if pphr_auto: print('(auto-generating a passphrase)') pphr = printable_pwd(PPHR_BITS_DEFAULT) - if len(pphr.str) < 16: - return self._cleanup('\n > passphrase too short, exiting <') - bits = RSA_BITS_DEFAULT - if interactive: - bits = int(input23('\nKey length (2048, 4096, 8192): [%d] ' % - RSA_BITS_DEFAULT)) bits_msg = ' using %i' % bits bit = max(bits, RSA_MODULUS_WARN) print(bits_msg) @@ -1886,16 +1832,14 @@ def dialog(self, interactive=True, args=None): sys.stdout.flush() try: time.sleep(nap) - except KeyboardInterrupt: + except: # eg KeyboardInterrupt return self._cleanup(' > cancelled, exiting <', pub, priv, pphr) msg = '\nGenerating RSA keys (using %s)\n' % openssl_version print(msg) try: self.generate(pub, priv, pphr.str, bits) - except KeyboardInterrupt: - return self._cleanup(' > cancelled, exiting <', pub, priv, pphr) - except: + except: # eg KeyboardInterrupt return self._cleanup('\n > exception in generate(), exiting <', pub, priv, pphr) @@ -1907,13 +1851,10 @@ def dialog(self, interactive=True, args=None): if pphr_auto: if args.passfile: pphr_msg = 'passphrase: %s' % pphr_out - try: - set_umask() - with open(pphr_out, 'wb') as fd: - fd.write(pphr.str) - unset_umask() - except: - return self._cleanup('', pub, priv, pphr) + set_umask() + with open(pphr_out, 'wb') as fd: + fd.write(pphr.str) + unset_umask() if (not isfile(pphr_out) or not getsize(pphr_out) == PPHR_OUT_SIZE): return self._cleanup(' > failed to save passphrase file <', @@ -1921,10 +1862,10 @@ def dialog(self, interactive=True, args=None): elif args.clipboard: try: import _pyperclip + except ImportError: + fatal("can't use clipboard: no getclip etc?", ImportError) except RuntimeError: fatal("can't use clipboard: no display?", RuntimeError) - except ImportError: - fatal("can't use clipboard: no xclip / getclip?", ImportError) _pyperclip.copy(pphr.str) pphr_msg = ('passphrase: saved to clipboard only... ' 'paste it somewhere safe!!\n' @@ -1935,7 +1876,7 @@ def dialog(self, interactive=True, args=None): pphr_out = pphr.str print(pphr_msg) warn_msg = (' > Keep the private key private! <\n' - ' > Do not lose the passphrase! <') + ' > Do not lose the passphrase! <') print(warn_msg) del(pphr) @@ -2101,10 +2042,10 @@ def _decrypt_rsa_aes256cbc(data_enc, pwd_rsa, priv, pphr=None, openssl=None): try: pwd, se_RSA = sys_call(cmdRSA, stdin=pphr, stderr=True, sec_str=True) pwd_zero = threading.Timer(0.02, pwd.zero) - pwd_zero.start() # AES decrypt can be slow, best to clear pwd + pwd_zero.start() # AES decrypt can be minutes, best to clear pwd ___, se_AES = sys_call(cmdAES, stdin=pwd, stderr=True) except: - if isfile(data_dec): + if isfile(data_dec) and isfile(data_enc): SecFile(data_dec).destroy() fatal('%s: Could not decrypt (exception in RSA or AES step)' % name, DecryptError) @@ -2306,6 +2247,7 @@ def set_destroy(): 'linux2': ('-f', '-u', '-n', '7'), 'win32': ('-q', '-p', '7')} DESTROY_OPTS = opts[sys.platform] + global DESTROY_EXE try: # darwin DESTROY_EXE = which('srm') except WhichError: @@ -2661,9 +2603,8 @@ def _parse_args(): args = (__name__ == "__main__") and _parse_args() -logging = set_logging(args and bool(args.verbose)) -openssl_path_wanted = args and args.openssl -OPENSSL, openssl_version = set_openssl(openssl_path_wanted) +logging = set_logging(args and args.verbose) +OPENSSL, openssl_version = set_openssl(args and args.openssl) DESTROY_EXE, DESTROY_OPTS = set_destroy() py64bit = (sys.maxsize == 9223372036854775807) diff --git a/pyfilesec/constants.py b/pyfilesec/constants.py index f60ce11..bfe2f1b 100644 --- a/pyfilesec/constants.py +++ b/pyfilesec/constants.py @@ -69,13 +69,16 @@ UMASK = 0o077 # need u+x permission for directories old_umask = None # set as global in set_umask, unset_umask +lib_path = os.path.abspath(__file__).strip('co') # .py not .pyc, .pyo +lib_dir = os.path.split(lib_path)[0] + # do win32 stuff to improve test coverage % when tested on linux: # string to help be sure a .bat file belongs to pfs (win32, set_openssl): bat_identifier = '-- pyFileSec .bat file --' appdata_lib_dir = '' if sys.platform == 'win32': - appdata_lib_dir = os.path.join(os.environ['APPDATA'], split(lib_dir)[-1]) - if not isdir(appdata_lib_dir): + appdata_lib_dir = os.path.join(os.environ['APPDATA'], os.path.split(lib_dir)[-1]) + if not os.path.isdir(appdata_lib_dir): os.mkdir(appdata_lib_dir) DESTROY_EXE = os.path.join(appdata_lib_dir, '_sdelete.bat') sd_bat_template = """@echo off diff --git a/tests/openssl_version_97 b/tests/openssl_version_97 index 1cb2bda..aa0ce3f 100755 --- a/tests/openssl_version_97 +++ b/tests/openssl_version_97 @@ -1 +1,2 @@ +#!/bin/sh echo 'OpenSSL 0.9.7 version (for testing)' diff --git a/tests/test_pyfilesec.py b/tests/test_pyfilesec.py index 6278e97..5a1eb26 100644 --- a/tests/test_pyfilesec.py +++ b/tests/test_pyfilesec.py @@ -121,7 +121,7 @@ def test_import_getpass_which(self): import which os.chdir(self.tmp) - def test_secfile_base(self): + def test_SecFileBase(self): test_file = 'tf' with open(test_file, 'wb') as fd: fd.write('a') @@ -145,6 +145,7 @@ def test_secfile_base(self): sf.hardlinks sf.is_in_dropbox sf._get_git_info(None) + sf._get_git_info('.', git='notgit') sf._get_svn_info(None) sf._get_hg_info(None) @@ -163,7 +164,17 @@ def test_secfile_base(self): sf.metadata # non-writeable dir: - # TO-DO + no_write = 'non_writeable_test' + os.makedirs(no_write) + f = os.path.join(no_write, 'tmp_no_write') + with open(f, 'wb') as fd: + fd.write('x') + sf = SecFile(f) + assert sf.is_in_writeable_dir + os.chmod(no_write, stat.S_IREAD) + assert sf.is_in_writeable_dir == False + os.chmod(no_write, stat.S_IRWXU) + shutil.rmtree(no_write) # bad file name sf._file = test_file + 'xyz' @@ -177,6 +188,7 @@ def test_secfile_base(self): sf.read() assert sf.metadata != {} assert sf.metadataf != '{}' + sf.snippet # .pem file (warns) sf.set_file(pub) @@ -191,9 +203,9 @@ def test_misc_helper(self): good_path = OPENSSL with pytest.raises(RuntimeError): set_openssl('junk.glop') - #with pytest.raises(RuntimeError): - # p = os.path.join(os.path.split(__file__)[0], 'openssl_version_97') - # set_openssl(p) + with pytest.raises(RuntimeError): + p = os.path.join(os.path.split(__file__)[0], 'openssl_version_97') + set_openssl(p) set_openssl(good_path) if sys.platform in ['win32']: # exercise more code by forcing a reconstructon of the .bat files: @@ -222,6 +234,7 @@ def test_SecFile_basics(self): fd.write('a') sf = SecFile(test_file) str(sf) + repr(sf) # encrypt-encrypted warning: pub, priv, pphr = self._known_values()[:3] @@ -264,7 +277,6 @@ def test_SecFile_basics(self): shutil.rmtree('.svn') def test_RsaKeys(self): - # placeholder for more tests pub, priv, pphr, bits = self._known_values()[:4] # test individual keys: @@ -290,16 +302,25 @@ def test_RsaKeys(self): # test integrity of the set of keys: rk = RsaKeys(pub, priv, pphr).test() + rk.sniff(priv) # same again, no passphrase: pub_no, priv_no, bits_no = self._known_values_no_pphr()[:3] rk_no = RsaKeys(pub_no, priv_no).test() + # mismatched pub priv: + with pytest.raises(PublicKeyError): + rk = RsaKeys(pub, priv_no).test() + + # make short keys, test if pub_bits < RSA_MODULUS_MIN + #sys_call([OPENSSL, 'genrsa', '-out', priv, str(RSA_MODULUS_MIN)]) + #sys_call([OPENSSL, 'rsa', '-in', priv, '-pubout', '-out', pub]) + #rk = RsaKeys(pub, priv).test() + # test get_key_length function klen = get_key_length(pub) - cmdGETMOD = [OPENSSL, 'rsa', '-modulus', '-in', - pub, '-pubin', '-noout'] - modulus = sys_call(cmdGETMOD).replace('Modulus=', '') + cmdMOD = [OPENSSL, 'rsa', '-modulus', '-in', pub, '-pubin', '-noout'] + modulus = sys_call(cmdMOD).replace('Modulus=', '') assert hexdigits_re.match(modulus) # test sniff @@ -340,6 +361,20 @@ def test_SecFileArchive(self): s.name = 'ttt' s.unpack() + # construct an archive with a bad file name: + sfa = SecFileArchive() + sfa.name = 'sfa_name' + ENC_EXT + aes = 'aes' + AES_EXT + #pwd = 'pwd' + RSA_EXT + md = 'md' + META_EXT + bad = md + 'BAD' + files = [aes, md, bad] # omit pwd to test + for f in files: + with open(f, 'wb') as fd: + fd.write('x') + sfa._make_tar(files, keep=True) + sfa.unpack() + ''' # test fall-through decryption method: with open('abc' + AES_EXT, 'wb') as fd: @@ -953,7 +988,6 @@ def test_rotate(self): prvTmp2 = 'prvkey2 no unicode.pem ' # file names pwd = printable_pwd(180) pphr2 = ' ' + pwd.str + ' ' # spaces in pphr - #print pwd, pphr2 pub2, priv2 = GenRSA().generate(pubTmp2, prvTmp2, pphr2, 1024) # Rotate encryption including padding change: @@ -1086,6 +1120,8 @@ def test_rename(self): tmp.write('x') secure_rename(tmp.name, 'abc') + + def test_hmac(self): # verify pfs hmac implementation against a widely used example: key = 'key'