Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions .codecov.yml
Original file line number Diff line number Diff line change
Expand Up @@ -5,18 +5,36 @@ coverage:
piaplib:
target: 45%
flags: piaplib
base: auto
paths:
- piaplib/
- piaplib/lint
- piaplib/pku
- piaplib/keyring
- piaplib/book
piaplib.book:
target: 70%
flags: book
paths:
- piaplib/book
piaplib.pku:
target: 50%
flags: pku
piaplib.pku:
- piaplib/pku
piaplib.lint:
target: 50%
flags: lint
- piaplib/lint
piaplib.keyring:
target: 75%
flags: keyring
paths:
- piaplib/keyring
tests:
target: 50%
flags: tests
paths:
- tests

flags:
piaplib:
Expand All @@ -40,4 +58,5 @@ flags:
- piaplib/lint
tests:
paths:
- tests/
- tests
joined: false
2 changes: 1 addition & 1 deletion .coveragerc
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
parallel = True

[report]
include = piaplib*
include = piaplib*,tests*
# Regexes for lines to exclude from consideration
exclude_lines =
# Have to re-enable the standard pragma
Expand Down
121 changes: 33 additions & 88 deletions piaplib/keyring/clearify.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,12 +42,6 @@
"""


try:
from . import saltify as saltify
except Exception:
import saltify as saltify


try:
from . import rand as rand
except Exception:
Expand Down Expand Up @@ -165,6 +159,10 @@ def makeKeystoreFile(theKey=str(rand.randPW(16)), somePath=None):

@remediation.error_handling
def packForRest(message=None, keyStore=None):
"""Serializes the given cleartext.
param ciphertext - str the encrypted data.
param keyStore - str the path to this file with the key.
"""
if keyStore is None:
keyStore = getKeyFilePath()
if hasBackendCommand():
Expand Down Expand Up @@ -193,13 +191,15 @@ def packForRest(message=None, keyStore=None):
# ciphertext = str(ciphertext).replace(str("\\n"), str(""))
return ciphertext
else:
UFsx2Kb_WrkG3LR = utils.readFile(keyStore)
seed = saltify.saltify(message, UFsx2Kb_WrkG3LR)
return packForRest_junk(message, UFsx2Kb_WrkG3LR, seed)
raise NotImplementedError("No Implemented Backend - BUG")


@remediation.error_handling
def unpackFromRest(ciphertext=None, keyStore=None):
"""Deserializes the given ciphertext.
param ciphertext - str the encrypted data.
param keyStore - str the path to this file with the key.
"""
if keyStore is None:
keyStore = getKeyFilePath()
if hasBackendCommand():
Expand Down Expand Up @@ -228,63 +228,7 @@ def unpackFromRest(ciphertext=None, keyStore=None):
# cleartext = str(cleartext).replace(str("\\n"), str(""))
return str(cleartext)
else:
raise NotImplementedError("BUG")
UFsx2Kb_WrkG3LR = utils.readFile(keyStore)
seed = saltify.saltify(ciphertext, UFsx2Kb_WrkG3LR)
return unpackFromRest_junk(ciphertext, UFsx2Kb_WrkG3LR, seed)


@remediation.bug_handling
def packForRest_junk(message=None, key='static key CHANGEME', seed='a static IV SEED'):
import six
if six.PY2:
from Crypto.Cipher import AES
obj = AES.new(
str(key.join("0123456789abcdef")).encode('utf8')[:16],
AES.MODE_CBC,
str(seed.join("0123456789ABCDEFG")).encode('utf8')[:16]
)
pad_text = str(bytes(b'\0' * (16 - (len(message) % 16))).decode('utf8'))
pad_message = str("{}{}").format(message, pad_text).encode('utf8')
ciphertext = obj.encrypt(pad_message)
del(obj)
return str(base64.standard_b64encode(ciphertext).decode('utf8'))
else:
from cryptography.fernet import Fernet
f = Fernet(base64.urlsafe_b64encode(
bytes(key.join(str("0123456789abcdefg")).encode('utf8'))[:32]
)[:KEY_BLOCK_SIZE])
ciphertext = f.encrypt(message.encode('utf8'))
return str(ciphertext.decode('utf8'))


@remediation.bug_handling
def unpackFromRest_junk(ciphertext=None, key='static key CHANGEME', seed='a static IV SEED'):
import six
if six.PY2:
from Crypto.Cipher import AES
obj = AES.new(
str(key.join("0123456789abcdef")).encode('utf8')[:16],
AES.MODE_CBC,
str(seed.join("0123456789ABCDEFG")).encode('utf8')[:16]
)
cleartext = obj.decrypt(base64.standard_b64decode(ciphertext.encode('utf8')))
for pad_len in range(16, 0, -1):
pad_text = str(bytes(b'\0' * (16 - pad_len)).decode('utf8'))
cleartext_unpad = cleartext.decode('utf8').rstrip(pad_text)
if (len(cleartext_unpad.encode('utf8')) is len(cleartext)):
continue
else:
cleartext = cleartext_unpad.encode('utf8')
del(obj)
return cleartext.decode('utf8')
else:
from cryptography.fernet import Fernet
f = Fernet(base64.urlsafe_b64encode(
bytes(key.join(str("0123456789abcdefg")).encode('utf8'))[:32]
)[:KEY_BLOCK_SIZE])
cleartext = f.decrypt(ciphertext.encode('utf8'))
return str(cleartext.decode('utf8'))
raise NotImplementedError("No Implemented Backend - BUG")


WEAK_ACTIONS = {u'pack': packForRest, u'unpack': unpackFromRest}
Expand All @@ -296,7 +240,7 @@ def unpackFromRest_junk(ciphertext=None, key='static key CHANGEME', seed='a stat

@remediation.error_handling
def parseArgs(arguments=None):
theArgs = None
theArgs = argparse.Namespace()
try:
parser = argparse.ArgumentParser(
prog=__prog__,
Expand Down Expand Up @@ -355,36 +299,37 @@ def parseArgs(arguments=None):
print(str(err.args))
err = None
del err
theArgs = None
theArgs = argparse.Namespace()
return theArgs


@remediation.bug_handling
def main(argv=None):
"""The main event"""
args = parseArgs(argv)
if args.msg is None:
return 2
theFile = None
output = None
if args.keystore is not None:
theFile = args.keystore
else:
theFile = None
output = None
if args.keystore is not None:
theFile = args.keystore
else:
theFile = str("""/tmp/.beta_PiAP_weak_key""")
if args.key is not None:
theFile = makeKeystoreFile(str(args.key), theFile)
try:
output = str(WEAK_ACTIONS[args.clear_action](str(args.msg), theFile))
theFile = str("""/tmp/.beta_PiAP_weak_key""")
if args.key is not None:
theFile = makeKeystoreFile(str(args.key), theFile)
try:
output = str(WEAK_ACTIONS[args.clear_action](str(args.msg), theFile))
if __name__ in u'__main__':
print(output)
except Exception as err:
print(str("FAILED DURRING CLEARIFY. ABORT."))
print(str(type(err)))
print(str(err))
print(str(err.args))
err = None
del err
output = None
else:
return output
except Exception as err:
print(str("FAILED DURRING CLEARIFY. ABORT."))
print(str(type(err)))
print(str(err))
print(str(err.args))
err = None
del err
output = None
del output
return 0


Expand Down
3 changes: 2 additions & 1 deletion piaplib/keyring/rand.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,8 @@ def randPW(count=None):
else:
x_count = count
try:
return str(randStr((x_count + 4))).replace(str("""\\x"""), str(""))[2:-1]
import string
return str("").join([string.printable[randInt(1, 1, 95)] for _ in range(x_count)])
except Exception as err:
print(str(u'FAILED DURRING RAND-PW. ABORT.'))
print(str(type(err)))
Expand Down
26 changes: 6 additions & 20 deletions piaplib/pku/upgrade.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,6 @@
import remediation as remediation
except Exception:
raise ImportError("Error Importing remediation")
except PendingDeprecationWarning as junkErr:
"""mute junk errors"""
junkErr = None
del(junkErr)
except Exception:
try:
try:
Expand Down Expand Up @@ -168,12 +164,7 @@ def upgradepip():
def upgradePiAPlib():
"""Upgrade piaplib via pip."""
upsream_repo = str("git+https://github.com/reactive-firewall/PiAP-python-tools.git")
try:
pip.main(args=["install", "--upgrade", upsream_repo])
except PendingDeprecationWarning as junkErr:
"""mute junk errors"""
junkErr = None
del(junkErr)
pip.main(args=["install", "--upgrade", upsream_repo])
return None


Expand All @@ -185,16 +176,11 @@ def upgradePiAPlib_depends():
"/PiAP-python-tools/master/requirements.txt"
)
utils.getFileResource(upsream_repo_depends, "temp_req.txt")
try:
pip.main(args=[
"install", "--upgrade-strategy",
"only-if-needed", "--upgrade",
"-r", "temp_req.txt"
])
except PendingDeprecationWarning as junkErr:
"""mute junk errors"""
junkErr = None
del(junkErr)
pip.main(args=[
"install", "--upgrade-strategy",
"only-if-needed", "--upgrade",
"-r", "temp_req.txt"
])
utils.cleanFileResource("temp_req.txt")
return None

Expand Down
6 changes: 5 additions & 1 deletion piaplib/pku/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class unicode(str):
raise ImportError("Error Importing utils")


__prog__ = str("""piaplib.pku.utils""")
"""The name of this program is piaplib.pku.utils"""


@remediation.error_handling
def literal_code(raw_input=None):
"""A simple attempt at validating raw python unicode. Always expect CWE-20.
Expand Down Expand Up @@ -162,7 +166,7 @@ def compactSpace(theInput_Str):
"""Try to remove the spaces from the input string."""
import re
sourceStr = literal_str(theInput_Str)
theList = re.sub(r' +', u' ', sourceStr)
theList = re.sub(r' +', str(""" """), sourceStr)
return theList


Expand Down
81 changes: 80 additions & 1 deletion tests/test_enc.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,86 @@ def test_z_case_clearify_main_lazy(self):
print(str(""))
err = None
del err
theResult = False
assert theResult

def test_case_clearify_main_a(self):
"""Tests the helper function main pack of keyring.clearify"""
theResult = True
try:
from piaplib.keyring import clearify as clearify
if clearify.__name__ is None:
raise ImportError("Failed to import clearify")
test_out = clearify.main([
str("--pack"),
str("--msg=\"This is a test Message\""),
str("-K=testkeyneedstobelong")
])
self.assertIsNotNone(test_out)
try:
if isinstance(test_out, bytes):
test_out = test_out.decode('utf8')
except UnicodeDecodeError:
test_out = str(repr(bytes(test_out)))
if (str("U2FsdGVkX") in str(test_out)):
theResult = True
else:
if sys.platform.startswith("linux"):
theResult = False
else:
raise unittest.SkipTest("BETA. Experemental feature not ready yet.")
except Exception as err:
print(str(""))
print(str(type(err)))
print(str(err))
print(str((err.args)))
print(str(""))
err = None
del err
if sys.platform.startswith("linux"):
theResult = False
else:
raise unittest.SkipTest("BETA. Experemental feature not ready yet.")
assert theResult

def test_case_clearify_main_b(self):
"""Tests the helper function main unpack of keyring.clearify"""
theResult = True
try:
temp_msg = str("""U2FsdGVkX1+dD6bFlND+Xa0bzNttrZfB5zYCp0mSEYfhMTpaM7U=""")
from piaplib.keyring import clearify as clearify
if clearify.__name__ is None:
raise ImportError("Failed to import clearify")
test_out = clearify.main([
str("--unpack"),
str("--msg={}").format(temp_msg),
str("-K=testkeyneedstobelong")
])
try:
if isinstance(test_out, bytes):
test_out = test_out.decode('utf8')
except UnicodeDecodeError:
test_out = str(repr(bytes(test_out)))
self.assertIsNotNone(test_out)
if (str("This is a test Message") in str(test_out)):
theResult = True
else:
if sys.platform.startswith("linux"):
print(str(repr(bytes(test_out))))
theResult = False
else:
raise unittest.SkipTest("BETA. Experemental feature not ready yet.")
except Exception as err:
print(str(""))
print(str(type(err)))
print(str(err))
print(str((err.args)))
print(str(""))
err = None
del err
if sys.platform.startswith("linux"):
theResult = False
else:
raise unittest.SkipTest("BETA. Experemental feature not ready yet.")
assert theResult


Expand Down
Loading