diff --git a/.circleci/config.yml b/.circleci/config.yml
new file mode 100644
index 0000000..efa6f16
--- /dev/null
+++ b/.circleci/config.yml
@@ -0,0 +1,21 @@
+version: 2
+jobs:
+ build:
+ working_directory: ~/PiAP-python-tools
+ docker:
+ - image: circleci/python:3.6.1
+ environment:
+ CI: cicleci
+ POSTGRES_DB: circle_test
+ steps:
+ - checkout
+ - run:
+ command: |
+ make clean
+ - run:
+ command: |
+ make test
+ - run:
+ command: |
+ make clean
+destination: tr1
diff --git a/.coveragerc b/.coveragerc
index 7423488..9b946b3 100644
--- a/.coveragerc
+++ b/.coveragerc
@@ -9,11 +9,12 @@ exclude_lines =
pragma: no cover
except Exception
+ except BaseException:
# Don't complain if tests don't hit defensive assertion code:
raise AssertionError
raise NotImplementedError
raise ImportError
- except unittest.SkipTest as skiperr
+ except unittest.SkipTest
except IOError
except OSError
diff --git a/.stickler.yml b/.stickler.yml
new file mode 100644
index 0000000..11bff83
--- /dev/null
+++ b/.stickler.yml
@@ -0,0 +1,10 @@
+linters:
+ shellcheck:
+ shell: bash
+ flake8:
+ max-line-length: 100
+ max-complexity: 10
+ ignore: 'W191,W391'
+ exclude: ['.git', '__pycache__', 'docs', '.tox', 'build']
+files:
+ ignore: ['*/*.pyc', '*.pyc', '*~', '.git', '__pycache__']
diff --git a/Makefile b/Makefile
index f12ad18..7eaadb6 100644
--- a/Makefile
+++ b/Makefile
@@ -102,7 +102,7 @@ test-tox: cleanup
$(QUIET)$(ECHO) "$@: Done."
test-style: cleanup
- $(QUIET)flake8 --ignore=W191,W391 --max-line-length=100 --verbose --count
+ $(QUIET)flake8 --ignore=W191,W391 --max-line-length=100 --verbose --count --config=.flake8.ini
$(QUIET)$(ECHO) "$@: Done."
cleanup:
@@ -133,6 +133,9 @@ cleanup:
$(QUIET)rm -f ./the_test_file*.json 2>/dev/null || true
$(QUIET)rm -f ./the_test_file*.yml 2>/dev/null || true
$(QUIET)rm -f ./the_test_file*.yaml 2>/dev/null || true
+ $(QUIET)rm -f ./the_test_file*.enc 2>/dev/null || true
+ $(QUIET)rm -f ./.weak_test_key_* || true
+ $(QUIET)rm -f ./test.secret || true
$(QUIET)rm -f ./the_test_url_file*.txt 2>/dev/null || true
$(QUIET)rm -f /tmp/.beta_PiAP_weak_key 2>/dev/null || true
diff --git a/docs/conf.py b/docs/conf.py
index 86fb0e2..8bdf9ff 100644
--- a/docs/conf.py
+++ b/docs/conf.py
@@ -41,7 +41,7 @@
master_doc = 'index'
# General information about the project.
-project = u'restart_service_handler'
+project = u'Pocket_PiAP_Python_tools'
copyright = u'2017, reactive-firewall'
# The version info for the project you're documenting, acts as replacement for
@@ -51,7 +51,7 @@
# The short X.Y version.
version = 'v0.2'
# The full version, including alpha/beta/rc tags.
-release = 'v0.2.4'
+release = 'v0.2.7'
# The language for content autogenerated by Sphinx. Refer to documentation
# for a list of supported languages.
diff --git a/piaplib/__init__.py b/piaplib/__init__.py
index acb94f2..1ac0c9a 100644
--- a/piaplib/__init__.py
+++ b/piaplib/__init__.py
@@ -16,7 +16,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-__version__ = """0.2.5"""
+__version__ = """0.2.7"""
try:
import sys
diff --git a/piaplib/keyring/clearify.py b/piaplib/keyring/clearify.py
index 22101e0..06e21a8 100644
--- a/piaplib/keyring/clearify.py
+++ b/piaplib/keyring/clearify.py
@@ -52,6 +52,13 @@
except Exception:
import pku.remediation as remediation
+try:
+ from remediation import PiAPError as PiAPError
+except Exception:
+ try:
+ from piaplib.pku.remediation import PiAPError as PiAPError
+ except Exception:
+ raise ImportError("Error Importing PiAPError")
try:
from ..pku import utils as utils
@@ -242,6 +249,46 @@ def unpackFromRest(ciphertext=None, keyStore=None):
raise NotImplementedError("No Implemented Backend - BUG")
+@remediation.error_handling
+def unpackFromFile(somefile, keyStore=None):
+ """Reads the raw encrypted file and decrypts it."""
+ read_data = None
+ try:
+ someFilePath = utils.addExtension(somefile, str('enc'))
+ with utils.open_func(someFilePath, mode=u'r', encoding=u'utf-8') as enc_data_file:
+ read_enc_data = enc_data_file.read()
+ read_data = unpackFromRest(read_enc_data, keyStore)
+ except Exception as clearerr:
+ read_data = None
+ baton = PiAPError(clearerr, str("Failed to load or deycrypt file."))
+ clearerr = None
+ del clearerr
+ raise baton
+ return read_data
+
+
+@remediation.error_handling
+def packToFile(somefile, data, keyStore=None):
+ """Writes the raw encrypted file."""
+ if data is None:
+ return False
+ if somefile is None:
+ return False
+ did_write = False
+ try:
+ someFilePath = utils.literal_code(utils.addExtension(str(somefile), str("enc")))
+ if someFilePath is not None:
+ encData = packForRest(data, keyStore)
+ with utils.open_func(file=someFilePath, mode=u'wb+') as enc_data_file:
+ utils.write_func(enc_data_file, utils.literal_str(encData).encode("utf-8"))
+ del(encData)
+ did_write = True
+ except Exception as clearerr:
+ raise remediation.PiAPError(clearerr, str("Failed to write or encrypt file."))
+ did_write = False
+ return did_write
+
+
WEAK_ACTIONS = {u'pack': packForRest, u'unpack': unpackFromRest}
""" The Pocket bag Unit actions.
pack - save/pack/pickle functions.
diff --git a/piaplib/lint/clients_check_status.py b/piaplib/lint/clients_check_status.py
index 1b35076..2886eb3 100755
--- a/piaplib/lint/clients_check_status.py
+++ b/piaplib/lint/clients_check_status.py
@@ -350,8 +350,8 @@ def get_client_list(lan_interface=None):
return theResult
-@remediation.error_handling
-def get_client_status(client=None, use_html=False, lan_interface=None):
+@remediation.error_handling # noqa C901
+def get_client_status(client=None, use_html=False, lan_interface=None): # noqa C901
"""Generate the status"""
theResult = None
try:
@@ -456,6 +456,23 @@ def get_client_ip(client=None, use_html=False, lan_interface=None):
return theResult
+@remediation.error_handling
+def showAllClients(verbose_mode, output_html, client_interface):
+ """Used by main to show all. Not intended to be called directly"""
+ if output_html:
+ print(
+ "
" +
+ "| Client | MAC | IP | Status | "
+ )
+ client_list = get_client_list(client_interface)
+ if client_list is None:
+ client_list = []
+ for client_name in client_list:
+ print(show_client(str(client_name), verbose_mode, output_html, client_interface))
+ if output_html:
+ print("
")
+
+
@remediation.bug_handling
def main(argv=None):
"""The main function."""
@@ -470,30 +487,17 @@ def main(argv=None):
if args.interface is not None:
client_interface = args.interface
if args.show_all is True:
- if output_html:
- print(str(
- u'' +
- u'| Client | MAC | IP | Status | '
- ))
+ showAllClients(verbose, output_html, client_interface)
+ elif args.list is True:
client_list = get_client_list(client_interface)
if client_list is None:
client_list = []
for client_name in client_list:
- print(show_client(str(client_name), verbose, output_html, client_interface))
- if output_html:
- print("
")
+ print(str(client_name))
else:
- if args.list is True:
- client_list = get_client_list(client_interface)
- if client_list is None:
- client_list = []
- for client_name in client_list:
- print(str(client_name))
- else:
- ip = args.ip
- print(show_client(ip, verbose, output_html, client_interface))
- return 0
- return 0
+ ip = args.ip
+ print(show_client(ip, verbose, output_html, client_interface))
+ return 0
except Exception as main_err:
print(str("client_check_status: REALLY BAD ERROR: ACTION will not be compleated! ABORT!"))
print(str(main_err))
diff --git a/piaplib/lint/html_generator.py b/piaplib/lint/html_generator.py
index bedd661..80b381e 100644
--- a/piaplib/lint/html_generator.py
+++ b/piaplib/lint/html_generator.py
@@ -153,16 +153,16 @@ def gen_html_ul(somelist=None, id=None, name=None):
items = [gen_html_li(x) for x in somelist]
theresult = None
if id is not None and has_special_html_chars(id) is not True:
- if name is not None and has_special_html_chars(name) is not True:
- theresult = str(u'').format(str(name), str(id))
- for item in items:
- theresult = str(theresult + item)
- else:
- theresult = str(u'').format(str(id))
- for item in items:
- theresult = str(theresult + item)
+ if name is None or has_special_html_chars(name) is True:
+ name = utils.literal_str(id)
+ theresult = str(u'').format(
+ utils.literal_str(name),
+ utils.literal_str(id)
+ )
+ for item in items:
+ theresult = str(theresult + item)
elif name is not None and has_special_html_chars(name) is not True:
- theresult = str(u'').format(str(name))
+ theresult = str(u'').format(utils.literal_str(name))
for item in items:
theresult = str(theresult + item)
else:
@@ -184,13 +184,17 @@ def gen_html_li(item=None, id=None, name=None):
"""
if id is not None and has_special_html_chars(id) is not True:
if name is not None and has_special_html_chars(name) is not True:
- return str(u'- {}
').format(str(name), str(id), str(item))
+ return str(u'- {}
').format(
+ utils.literal_str(name),
+ utils.literal_str(id),
+ utils.literal_str(item)
+ )
else:
- return str(u'- {}
').format(id, str(item))
+ return str(u'- {}
').format(id, utils.literal_str(item))
elif name is not None and has_special_html_chars(name) is not True:
- return str(u'- {}
').format(id, str(item))
+ return str(u'- {}
').format(id, utils.literal_str(item))
else:
- return str(u'- {}
').format(str(item))
+ return str(u'- {}
').format(utils.literal_str(item))
def gen_html_label(content=None, role=HTML_LABEL_ROLES[0], id=None, name=None):
diff --git a/piaplib/lint/iface_check_status.py b/piaplib/lint/iface_check_status.py
index 0912403..32227b4 100755
--- a/piaplib/lint/iface_check_status.py
+++ b/piaplib/lint/iface_check_status.py
@@ -86,7 +86,7 @@ def parseargs(arguments=None):
)
parser.add_argument(
'-i', '--interface',
- default=interfaces.INTERFACE_CHOICES[1], choices=interfaces.INTERFACE_CHOICES,
+ default=interfaces.INTERFACE_CHOICES[0], choices=interfaces.INTERFACE_CHOICES,
help='The interface to show.'
)
parser.add_argument(
@@ -213,7 +213,7 @@ def get_iface_list():
if x in interfaces.INTERFACE_CHOICES:
theResult.append(x)
theResult = utils.compactList([x for x in theResult])
- """while '[aehltw]{3}[n]?[0-9]+' would probably work well here, best to whitelist. """
+ """while regex would probably work well here, best to whitelist. """
return theResult
@@ -305,34 +305,39 @@ def get_iface_ip_list(iface=u'lo', use_html=False):
return theResult
+@remediation.error_handling
+def showAlliFace(verbose_mode, output_html):
+ """Used by main to show all. Not intended to be called directly"""
+ if output_html:
+ print(
+ "" +
+ "| Interface | MAC | IP | State | "
+ )
+ for iface_name in get_iface_list():
+ print(show_iface(iface_name, verbose_mode, output_html))
+ if output_html:
+ print("
")
+
+
def main(argv=None):
"""The main function."""
args = parseargs(argv)
try:
verbose = False
+ output_html = False
if args.verbose_mode is not None:
verbose = args.verbose_mode
if args.output_html is not None:
output_html = args.output_html
if args.show_all is True:
- if output_html:
- print(
- "" +
- "| Interface | MAC | IP | State | "
- )
+ showAlliFace(verbose, output_html)
+ elif args.list is True:
for iface_name in get_iface_list():
- print(show_iface(iface_name, verbose, output_html))
- if output_html:
- print("
")
+ print(str(iface_name))
else:
- if args.list is True:
- for iface_name in get_iface_list():
- print(str(iface_name))
- else:
- interface = args.interface
- print(show_iface(interface, verbose, output_html))
- return 0
- return 0
+ interface = args.interface
+ print(show_iface(interface, verbose, output_html))
+ return 0
except Exception as main_err:
logs.log(
str("iface_check_status: REALLY BAD ERROR: ACTION will not be compleated! ABORT!"),
diff --git a/piaplib/lint/users_check_status.py b/piaplib/lint/users_check_status.py
index 76983ed..a7a0fda 100644
--- a/piaplib/lint/users_check_status.py
+++ b/piaplib/lint/users_check_status.py
@@ -370,7 +370,7 @@ def get_user_list():
return theResult
-def get_user_status(user_name=None, use_html=False):
+def get_user_status(user_name=None, use_html=False): # noqa C901
"""Generate the status"""
theResult = None
try:
@@ -560,7 +560,7 @@ def get_user_ip(user=None, use_html=False):
return theResult
-def main(argv=None):
+def main(argv=None): # noqa C901
"""The main function."""
args = parseargs(argv)
try:
diff --git a/piaplib/pku/config.py b/piaplib/pku/config.py
index 4cf9ccb..397464c 100755
--- a/piaplib/pku/config.py
+++ b/piaplib/pku/config.py
@@ -33,7 +33,7 @@
try:
import utils as utils
except Exception:
- raise ImportError("Error Importing config")
+ raise ImportError("Error Importing utils for config")
try:
@@ -45,27 +45,11 @@
raise ImportError("Error Importing remediation")
-def addExtension(somefile, extension):
- """Ensures the given extension is used."""
- if (somefile is None):
- return None
- if (extension is None):
- return somefile
- if (len(str(somefile)) > len(extension)):
- offset = (-1 * len(extension))
- if (extension in str(somefile)[offset:]) and (str(".") in str(somefile)):
- return somefile
- else:
- return str("{}.{}").format(somefile, extension)
- else:
- return str("{}.{}").format(somefile, extension)
-
-
def hasJsonSupport():
support_json = False
try:
support_json = (json.__name__ is not None)
- except:
+ except BaseException:
support_json = False
return support_json
@@ -75,7 +59,7 @@ def readJsonFile(somefile):
"""Reads the raw json file."""
read_data = None
try:
- someFilePath = addExtension(somefile, str('json'))
+ someFilePath = utils.addExtension(somefile, str('json'))
with utils.open_func(someFilePath, mode=u'r', encoding=u'utf-8') as json_data_file:
read_data = json.load(fp=json_data_file, encoding=u'utf-8')
except Exception as jsonerr:
@@ -96,7 +80,7 @@ def writeJsonFile(somefile, data):
return False
did_write = False
try:
- someFilePath = addExtension(somefile, str('json'))
+ someFilePath = utils.addExtension(somefile, str('json'))
with utils.open_func(someFilePath, mode=u'w+', encoding=u'utf-8') as outfile:
jsonData = json.dumps(
obj=dict(data),
@@ -133,7 +117,7 @@ def hasYamlSupport():
support_yaml = False
try:
support_yaml = (yaml.__name__ is not None)
- except Exception:
+ except BaseException:
support_yaml = False
return support_yaml
@@ -144,7 +128,7 @@ def readYamlFile(somefile):
return None
read_data = None
try:
- someFilePath = addExtension(somefile, str('yaml'))
+ someFilePath = utils.addExtension(somefile, str('yaml'))
with utils.open_func(file=someFilePath, mode=u'r', encoding=u'utf-8') as ymalfile:
if yaml.version_info < (0, 15):
read_data = yaml.safe_load(ymalfile)
@@ -168,7 +152,7 @@ def writeYamlFile(somefile, data):
return False
did_write = False
try:
- someFilePath = addExtension(somefile, str('yaml'))
+ someFilePath = utils.addExtension(somefile, str('yaml'))
did_write = utils.writeFile(someFilePath, yaml.dump(data))
except Exception as yamlerr:
print("")
@@ -182,6 +166,144 @@ def writeYamlFile(somefile, data):
return did_write
+try:
+ try:
+ import configparser as configparser
+ except Exception:
+ try:
+ import ConfigParser as configparser
+ except Exception:
+ raise ImportError("Error Importing ConfigParser utils for config")
+except Exception:
+ pass
+
+
+@remediation.error_handling
+def getDefaultMainConfigFile():
+ import os
+ # logging['timefmt'] = str("""%a %b %d %H:%M:%S %Z %Y""")
+ default_config = dict({
+ 'PiAP-logging': dict({
+ 'mode': str("stdout"),
+ 'dir': str("/var/log"),
+ 'keyfile': repr(None),
+ 'encryptlogs': repr(False)
+ }),
+ 'PiAP-logging-outputs': dict({
+ 'splunk': repr(False),
+ 'syslog': repr(False),
+ 'file': repr(False),
+ 'stdout': repr(True)
+ }),
+ 'PiAP-rand': dict({
+ 'keyfile': repr(None),
+ 'entropy_function': repr(os.urandom),
+ 'char_upper': repr(99),
+ 'char_lower': repr(1),
+ 'char_range': repr(tuple(('${PiAP-rand:char_lower}', '${PiAP-rand:char_upper}'))),
+ 'passphrase_length': repr(16),
+ 'passphrase_encoding': str("utf-8"),
+ 'SSID_length': repr(20)
+ }),
+ 'PiAP-network-lan': dict({
+ 'keyfile': repr(None),
+ 'network_ipv4_on': repr(True),
+ 'network_ipv4': repr(tuple(("10.0.40", 0))),
+ 'ipv4_dhcp_reserved': repr({}),
+ 'network_ipv6_on': repr(False),
+ 'network_ipv6': repr(None),
+ 'ipv6_dhcp_reserved': repr(None)
+ })
+ })
+ return default_config
+
+
+@remediation.error_handling
+def writeDefaultMainConfigFile(confFile=str('/var/opt/PiAP/PiAP.conf')):
+ theResult = False
+ if writeMainConfigFile(confFile, getDefaultMainConfigFile()):
+ theResult = True
+ return theResult
+
+
+@remediation.error_passing
+def mergeConfigParser(theConfig=None, config_data=None, overwrite=False):
+ """
+ Merges the Configuration Dictionary into a configparser.
+ param theConfig - configparser.ConfigParser the ConfigParser.
+ param config_data - dict the configuration to merge.
+ param overwrite - boolean determining if the dict is record of truth or if theConfig is.
+ """
+ if theConfig is None:
+ theConfig = configparser.ConfigParser(allow_no_value=True)
+ if config_data is not None:
+ for someSection in config_data.keys():
+ if not theConfig.has_section(someSection):
+ theConfig.add_section(someSection)
+ for someOption in config_data[someSection].keys():
+ if not theConfig.has_option(someSection, someOption) or (overwrite is True):
+ theConfig.set(someSection, someOption, config_data[someSection][someOption])
+ return theConfig
+
+
+@remediation.error_handling
+def parseConfigParser(config_data=None, theConfig=None, overwrite=True):
+ """
+ Merges the Configuration Dictionary into a configparser.
+ param config_data - dict the configuration to merge.
+ param theConfig - configparser.ConfigParser the ConfigParser.
+ param overwrite - boolean determining if the dict is record of truth or if theConfig is.
+ """
+ if config_data is None:
+ config_data = dict({})
+ if theConfig is not None:
+ for someSection in theConfig.sections():
+ if str(someSection) not in config_data.keys():
+ config_data[someSection] = dict({})
+ for someOption in theConfig.options(someSection):
+ if str(someOption) not in config_data[someSection].keys() or (overwrite is True):
+ config_data[someSection][someOption] = theConfig.get(someSection, someOption)
+ return config_data
+
+
+@remediation.error_handling
+def writeMainConfigFile(confFile=str('/var/opt/PiAP/PiAP.conf'), config_data=None):
+ """Generates the Main Configuration file for PiAPlib"""
+ try:
+ mainConfig = configparser.ConfigParser(allow_no_value=True)
+ default_config = loadMainConfigFile(confFile)
+ mainConfig = mergeConfigParser(mainConfig, config_data, True)
+ mainConfig = mergeConfigParser(mainConfig, default_config, False)
+ try:
+ with utils.open_func(file=confFile, mode='w+') as configfile:
+ mainConfig.write(configfile)
+ except Exception:
+ with open(confFile, 'wb') as configfile:
+ mainConfig.write(configfile)
+ except Exception as err:
+ print(str(err))
+ print(str(type(err)))
+ print(str((err.args)))
+ return False
+ return True
+
+
+@remediation.error_handling
+def loadMainConfigFile(confFile='/var/opt/PiAP/PiAP.conf'):
+ try:
+ mainConfig = configparser.ConfigParser(allow_no_value=True)
+ result_config = getDefaultMainConfigFile()
+ with utils.open_func(file=confFile, mode=u'r', encoding=u'utf-8') as configfile:
+ mainConfig.read(configfile)
+ result_config = parseConfigParser(result_config, mainConfig, True)
+ except Exception as err:
+ print(str(err))
+ print(str(type(err)))
+ print(str((err.args)))
+ return getDefaultMainConfigFile()
+ return result_config
+
+
if __name__ in u'__main__':
raise NotImplementedError("ERROR: Can not run config as main. Yet?")
diff --git a/piaplib/pku/interfaces.py b/piaplib/pku/interfaces.py
index e48cf0c..b4ec4e6 100644
--- a/piaplib/pku/interfaces.py
+++ b/piaplib/pku/interfaces.py
@@ -40,7 +40,7 @@
"""The name of this PiAPLib tool is Pocket Knife Interfaces Unit"""
-IFACE_PREFIXES = [str("wlan"), str("eth"), str("usb"), str("br"), str("mon")]
+IFACE_PREFIXES = [str("lan"), str("wlan"), str("eth"), str("usb"), str("br"), str("mon")]
"""whitelist of valid iface prefixes"""
@@ -63,7 +63,7 @@ def parseargs(arguments=None):
parser.add_argument(
'-i',
'--interface',
- default=INTERFACE_CHOICES[1],
+ default=INTERFACE_CHOICES[0],
choices=INTERFACE_CHOICES,
help='The interface to use.'
)
diff --git a/piaplib/pku/remediation.py b/piaplib/pku/remediation.py
index 7a338c9..c680257 100644
--- a/piaplib/pku/remediation.py
+++ b/piaplib/pku/remediation.py
@@ -105,6 +105,21 @@ def helper_func(*args, **kwargs):
return helper_func
+def error_breakpoint(error, context=None):
+ """Just logs the error and returns None"""
+ timestamp = getTimeStamp()
+ logs.log(str("An error occured at {}").format(timestamp), "Error")
+ logs.log(str(context), "Debug")
+ logs.log(str(type(error)), "Error")
+ logs.log(str(error), "Error")
+ logs.log(str((error.args)), "Error")
+ if isinstance(error, PiAPError):
+ logs.log(str(error.cause), "Error")
+ logs.log(str(type(error.cause)), "Error")
+ logs.log(str((error.args)), "Error")
+ return None
+
+
def error_handling(func):
"""Runs a function in try-except"""
import functools
@@ -116,21 +131,10 @@ def safety_func(*args, **kwargs):
try:
theOutput = func(*args, **kwargs)
except Exception as err:
- timestamp = getTimeStamp()
- logs.log(str("An error occured at {}").format(timestamp), "Error")
- logs.log(str(func), "Debug")
- logs.log(str(type(err)), "Error")
- logs.log(str(err), "Error")
- logs.log(str((err.args)), "Error")
- if isinstance(err, PiAPError):
- logs.log(str((err.cause)), "Error")
- logs.log(str(type(err.cause)), "Error")
- logs.log(str(type(err.args)), "Error")
- logs.log(str(""), "Error")
+ theOutput = error_breakpoint(error=err, context=func)
# sys.exc_clear()
err = None
del err
- theOutput = None
return theOutput
return safety_func
diff --git a/piaplib/pku/utils.py b/piaplib/pku/utils.py
index 57e30e1..a69b84b 100644
--- a/piaplib/pku/utils.py
+++ b/piaplib/pku/utils.py
@@ -295,9 +295,42 @@ def xstr(some_str=None):
""" I/O and Files """
+@remediation.error_passing
+def addExtension(somefile, extension):
+ """Ensures the given extension is used."""
+ if (somefile is None):
+ return None
+ if (extension is None):
+ return somefile
+ if (len(str(somefile)) > len(extension)):
+ offset = (-1 * len(extension))
+ if (extension in str(somefile)[offset:]) and (str(".") in str(somefile)):
+ return somefile
+ else:
+ return str("{}.{}").format(somefile, extension)
+ else:
+ return str("{}.{}").format(somefile, extension)
+
+
+@remediation.error_handling
+def xisfile(somefile):
+ """Ensures the given file is available for reading."""
+ if (somefile is None):
+ return False
+ import os.path
+ if os.path.isabs(somefile) and os.path.isfile(somefile):
+ return True
+ else:
+ return os.path.isfile(os.path.abspath(somefile))
+
+
@remediation.error_passing
def open_func(file, mode='r', buffering=-1, encoding=None):
""" cross-python open function """
+ if xstr("r") in xstr(mode):
+ if not xisfile(file):
+ logs.log(str("[CWE-73] File expected, but not found. Redacted filename."), "Info")
+ file = None
try:
import six
if six.PY2:
@@ -313,7 +346,7 @@ def open_func(file, mode='r', buffering=-1, encoding=None):
@remediation.error_passing
def write_func(someFile, the_data=None):
- """ cross-python open function """
+ """ cross-python write function """
try:
import six
if six.PY2:
diff --git a/setup.cfg b/setup.cfg
index 4aa2152..8ea9ed6 100644
--- a/setup.cfg
+++ b/setup.cfg
@@ -1,6 +1,6 @@
[metadata]
name = piaplib
-version = 0.2.5
+version = 0.2.7
author = Mr. Walls
author-email = reactive-firewall@users.noreply.github.com
summary = Beta python tools for PiAP
diff --git a/setup.py b/setup.py
index be4fe06..5d92a4f 100755
--- a/setup.py
+++ b/setup.py
@@ -58,7 +58,7 @@
setup(
name="""piaplib""",
- version="""0.2.5""",
+ version="""0.2.7""",
description="""Beta for PiAP python tools""",
long_description=readme,
install_requires=requirements,
diff --git a/standards b/standards
index 9e35a1f..7bb6d14 100644
--- a/standards
+++ b/standards
@@ -12,6 +12,9 @@ Standards checklist:
PEP324 - subproccess
RFC 4880 - pgpy (bsd compatable python pgp)
+ RFC3530 - NFSv4
+
+
encryption:
aes-256-ctr
- ??
diff --git a/tests/test_basic.py b/tests/test_basic.py
index f9fc3d2..6715194 100644
--- a/tests/test_basic.py
+++ b/tests/test_basic.py
@@ -49,29 +49,16 @@ def test_actual_depends(self):
theResult = True
try:
import sys
- if sys.__name__ is None:
- theResult = False
import os
- if os.__name__ is None:
- theResult = False
import argparse
- if argparse.__name__ is None:
- theResult = False
import subprocess
- if subprocess.__name__ is None:
- theResult = False
import time
- if time.__name__ is None:
- theResult = False
import re
- if re.__name__ is None:
- theResult = False
import hashlib
- if hashlib.__name__ is None:
- theResult = False
import hmac
- if hmac.__name__ is None:
- theResult = False
+ for depends in [sys, os, argparse, subprocess, time, re, hashlib, hmac]:
+ if depends.__name__ is None:
+ theResult = False
except Exception as impErr:
print(str(""))
print(str(type(impErr)))
diff --git a/tests/test_config.py b/tests/test_config.py
index 2f44b08..7f1766a 100644
--- a/tests/test_config.py
+++ b/tests/test_config.py
@@ -83,135 +83,6 @@ def test_syntax(self):
theResult = False
assert theResult
- def test_case_config_none_ext(self):
- """Tests the addExtension when input is None"""
- theResult = True
- try:
- from .context import piaplib as piaplib
- if piaplib.__name__ is None:
- raise ImportError("Failed to import pku")
- from piaplib import pocket as pocket
- if pocket.__name__ is None:
- raise ImportError("Failed to import utils")
- from piaplib import pku as pku
- if pku.__name__ is None:
- raise ImportError("Failed to import pku")
- from pku import config as config
- if config.__name__ is None:
- raise ImportError("Failed to import config")
- self.assertIsNone(config.addExtension(None, None))
- except Exception as err:
- print(str(""))
- print(str(type(err)))
- print(str(err))
- print(str((err.args)))
- print(str(""))
- err = None
- del err
- theResult = False
- assert theResult
-
- def test_case_config_file_with_none_ext(self):
- """Tests the addExtension when input is (test, None)"""
- theResult = True
- try:
- from .context import piaplib as piaplib
- if piaplib.__name__ is None:
- raise ImportError("Failed to import pku")
- from piaplib import pocket as pocket
- if pocket.__name__ is None:
- raise ImportError("Failed to import utils")
- from piaplib import pku as pku
- if pku.__name__ is None:
- raise ImportError("Failed to import pku")
- from pku import config as config
- if config.__name__ is None:
- raise ImportError("Failed to import config")
- test = None
- test_name = str("test_no_dot")
- self.assertIsNone(test)
- test = config.addExtension(test_name, None)
- self.assertIsNotNone(test)
- self.assertEqual(test, test_name)
- except Exception as err:
- print(str(""))
- print(str(type(err)))
- print(str(err))
- print(str((err.args)))
- print(str(""))
- err = None
- del err
- theResult = False
- assert theResult
-
- def test_case_config_file_with_short_ext(self):
- """Tests the addExtension when input is (test, txt)"""
- theResult = True
- try:
- from .context import piaplib as piaplib
- if piaplib.__name__ is None:
- raise ImportError("Failed to import pku")
- from piaplib import pocket as pocket
- if pocket.__name__ is None:
- raise ImportError("Failed to import utils")
- from piaplib import pku as pku
- if pku.__name__ is None:
- raise ImportError("Failed to import pku")
- from pku import config as config
- if config.__name__ is None:
- raise ImportError("Failed to import config")
- test = None
- test_name = str("test_no_dot")
- test_ext = str("txt")
- self.assertIsNone(test)
- test = config.addExtension(test_name, test_ext)
- self.assertIsNotNone(test)
- self.assertNotEqual(test, test_name)
- except Exception as err:
- print(str(""))
- print(str(type(err)))
- print(str(err))
- print(str((err.args)))
- print(str(""))
- err = None
- del err
- theResult = False
- assert theResult
-
- def test_case_config_file_with_long_ext(self):
- """Tests the addExtension when input is (test, much_longer_extension)"""
- theResult = True
- try:
- from .context import piaplib as piaplib
- if piaplib.__name__ is None:
- raise ImportError("Failed to import pku")
- from piaplib import pocket as pocket
- if pocket.__name__ is None:
- raise ImportError("Failed to import utils")
- from piaplib import pku as pku
- if pku.__name__ is None:
- raise ImportError("Failed to import pku")
- from pku import config as config
- if config.__name__ is None:
- raise ImportError("Failed to import config")
- test = None
- test_name = str("test")
- test_ext = str("much_longer_extension")
- self.assertIsNone(test)
- test = config.addExtension(test_name, test_ext)
- self.assertIsNotNone(test)
- self.assertNotEqual(test, test_name)
- except Exception as err:
- print(str(""))
- print(str(type(err)))
- print(str(err))
- print(str((err.args)))
- print(str(""))
- err = None
- del err
- theResult = False
- assert theResult
-
def test_case_config_supports_json(self):
"""Tests the config.hasJsonSupport() function"""
theResult = True
@@ -299,6 +170,56 @@ def test_case_json_read_write_file(self):
theResult = False
assert theResult
+ def test_case_default_config(self):
+ """Tests the default configuration functions"""
+ theResult = False
+ try:
+ from piaplib import pku as pku
+ if pku.__name__ is None:
+ raise ImportError("Failed to import pku")
+ from pku import config as config
+ if config.__name__ is None:
+ raise ImportError("Failed to import config")
+ self.assertIsNotNone(config.getDefaultMainConfigFile())
+ theResult = True
+ except Exception as err:
+ print(str(""))
+ print(str("Error in test of default config"))
+ print(str(type(err)))
+ print(str(err))
+ print(str((err.args)))
+ print(str(""))
+ err = None
+ del err
+ theResult = False
+ assert theResult
+
+ def test_case_write_default_config(self):
+ """Tests the write default configuration functions"""
+ theResult = False
+ try:
+ from piaplib import pku as pku
+ if pku.__name__ is None:
+ raise ImportError("Failed to import pku")
+ from pku import config as config
+ if config.__name__ is None:
+ raise ImportError("Failed to import config")
+ test_path = str("/tmp/test_config.cnf")
+ self.assertTrue(config.writeMainConfigFile(test_path))
+ self.assertIsNotNone(config.loadMainConfigFile(test_path))
+ theResult = True
+ except Exception as err:
+ print(str(""))
+ print(str("Error in test of default config"))
+ print(str(type(err)))
+ print(str(err))
+ print(str((err.args)))
+ print(str(""))
+ err = None
+ del err
+ theResult = False
+ assert theResult
+
def test_case_yaml_read_write_file(self):
"""Tests the YAML read and write functions"""
theResult = False
diff --git a/tests/test_enc.py b/tests/test_enc.py
index a61192b..651e6f1 100644
--- a/tests/test_enc.py
+++ b/tests/test_enc.py
@@ -38,7 +38,7 @@
raise ImportError("Failed to import test context")
-class BookTestSuite(unittest.TestCase):
+class CryptoTestSuite(unittest.TestCase):
"""Special Pocket keyring crypto test cases."""
def test_absolute_truth_and_meaning(self):
@@ -112,7 +112,7 @@ def test_z_case_clearify_setKeyFile(self):
raise ImportError("Failed to import clearify")
self.assertIsNotNone(clearify.makeKeystoreFile(
str("This is not a real key"),
- str("../test.secret")
+ str("./test.secret")
))
except Exception as err:
print(str(""))
@@ -156,7 +156,7 @@ def test_z_case_clearify_setKeyFile_no_key(self):
raise ImportError("Failed to import clearify")
self.assertIsNotNone(clearify.makeKeystoreFile(
None,
- str("../test.secret")
+ str("./test.secret")
))
except Exception as err:
print(str(""))
@@ -319,6 +319,58 @@ def test_case_clearify_main_b(self):
raise unittest.SkipTest("BETA. Experemental feature not ready yet.")
assert theResult
+ def test_case_clearify_read_write(self): # noqa C901
+ """Tests the helper function pack to file of keyring.clearify and then unpack"""
+ theResult = False
+ try:
+ from piaplib.keyring import clearify as clearify
+ if clearify.__name__ is None:
+ raise ImportError("Failed to import clearify")
+ from piaplib.keyring import rand as rand
+ if rand.__name__ is None:
+ raise ImportError("Failed to import rand")
+ sometestfile = str("./the_test_file.enc")
+ theteststore = clearify.makeKeystoreFile(
+ str("testkeyneedstobelong"),
+ str("./.weak_test_key_{}").format(rand.randInt(1, 10, 99))
+ )
+ self.assertIsNotNone(theteststore)
+ test_write = clearify.packToFile(
+ sometestfile,
+ str("This is a test Message"),
+ theteststore
+ )
+ self.assertTrue(test_write)
+ if (test_write is True):
+ test_read = clearify.unpackFromFile(sometestfile, theteststore)
+ try:
+ if isinstance(test_read, bytes):
+ test_read = test_read.decode('utf8')
+ except UnicodeDecodeError:
+ test_read = str(repr(bytes(test_read)))
+ self.assertIsNotNone(test_read)
+ if (str("This is a test Message") in str(test_read)):
+ theResult = True
+ else:
+ if sys.platform.startswith("linux"):
+ print(str(repr(bytes(test_read))))
+ theResult = False
+ else:
+ raise unittest.SkipTest("BETA. Experemental feature not ready yet.")
+ except Exception as err:
+ print(str(""))
+ print(str(type(err)))
+ print(str(err))
+ print(str((err.args)))
+ print(str(""))
+ err = None
+ del err
+ if sys.platform.startswith("linux"):
+ theResult = False
+ else:
+ raise unittest.SkipTest("BETA. Experemental feature not ready yet.")
+ assert theResult
+
if __name__ == u'__main__':
unittest.main()
diff --git a/tests/test_salt.py b/tests/test_salt.py
index 5dfdcde..4d0777a 100644
--- a/tests/test_salt.py
+++ b/tests/test_salt.py
@@ -146,17 +146,12 @@ def test_keyring_salt_test_entropy(self):
theResult = True
try:
from .context import piaplib
- if piaplib.__name__ is None:
- theResult = False
from piaplib import keyring as keyring
- if keyring.__name__ is None:
- theResult = False
from keyring import saltify as saltify
- if saltify.__name__ is None:
- theResult = False
from keyring import rand as rand
- if rand.__name__ is None:
- theResult = False
+ for depends in [piaplib, keyring, saltify, rand]:
+ if depends.__name__ is None:
+ theResult = False
randomSalt = str(rand.randStr(10))
space = str(""" """)
randomSalt_shift1 = randomSalt + space
diff --git a/tests/test_strings.py b/tests/test_strings.py
index 43928ac..b5367d8 100644
--- a/tests/test_strings.py
+++ b/tests/test_strings.py
@@ -238,14 +238,12 @@ def test_case_utils_base_literal_str(self):
]
for testcase in the_test_cases:
if theResult is True:
- if testcase[0] is None:
+ if testcase[0] is None or testcase[1] is None:
continue
- if testcase[1] is not None:
+ else:
theResult = (
testcase[0] in testcase[1]
)
- else:
- continue
if utils.literal_str(testcase[0]) is None:
continue
if utils.literal_str(testcase[1]) is not None:
@@ -406,14 +404,11 @@ def _test_case_utils_super_fuzz_literal_str(self):
theResult = True
try:
from piaplib import pku as pku
- if pku.__name__ is None:
- raise ImportError("Failed to import pku")
from pku import utils as utils
- if utils.__name__ is None:
- raise ImportError("Failed to import utils")
import os
- if os.__name__ is None:
- raise ImportError("Failed to import os. Are we alive?")
+ for depends in [os, utils, pku]:
+ if depends.__name__ is None:
+ raise ImportError("Failed to import dependancy")
for testrun in range(1000):
randomTest = os.urandom(10)
testcase = [str(randomTest), utils.literal_str(randomTest)]
diff --git a/tests/test_usage.py b/tests/test_usage.py
index e7fbf1e..7c63c7c 100644
--- a/tests/test_usage.py
+++ b/tests/test_usage.py
@@ -168,28 +168,16 @@ def test_actual_depends(self):
theResult = True
try:
import sys
- if sys.__name__ is None:
- theResult = False
import os
- if os.__name__ is None:
- theResult = False
import argparse
- if argparse.__name__ is None:
- theResult = False
- if subprocess.__name__ is None:
- theResult = False
+ import subprocess
import time
- if time.__name__ is None:
- theResult = False
import re
- if re.__name__ is None:
- theResult = False
import hashlib
- if hashlib.__name__ is None:
- theResult = False
import hmac
- if hmac.__name__ is None:
- theResult = False
+ for depends in [sys, os, argparse, subprocess, time, re, hashlib, hmac]:
+ if depends.__name__ is None:
+ theResult = False
except Exception as impErr:
print(str(""))
print(str(type(impErr)))
diff --git a/tests/test_utils.py b/tests/test_utils.py
index 461cb0a..13df45e 100644
--- a/tests/test_utils.py
+++ b/tests/test_utils.py
@@ -304,6 +304,135 @@ def test_case_utils_regex_tty_clients_output(self):
theResult = False
assert theResult
+ def test_case_utils_none_ext(self):
+ """Tests the addExtension when input is None"""
+ theResult = True
+ try:
+ from .context import piaplib as piaplib
+ if piaplib.__name__ is None:
+ raise ImportError("Failed to import pku")
+ from piaplib import pocket as pocket
+ if pocket.__name__ is None:
+ raise ImportError("Failed to import utils")
+ from piaplib import pku as pku
+ if pku.__name__ is None:
+ raise ImportError("Failed to import pku")
+ from pku import utils as utils
+ if utils.__name__ is None:
+ raise ImportError("Failed to import utils")
+ self.assertIsNone(utils.addExtension(None, None))
+ except Exception as err:
+ print(str(""))
+ print(str(type(err)))
+ print(str(err))
+ print(str((err.args)))
+ print(str(""))
+ err = None
+ del err
+ theResult = False
+ assert theResult
+
+ def test_case_utils_file_with_none_ext(self):
+ """Tests the addExtension when input is (test, None)"""
+ theResult = True
+ try:
+ from .context import piaplib as piaplib
+ if piaplib.__name__ is None:
+ raise ImportError("Failed to import pku")
+ from piaplib import pocket as pocket
+ if pocket.__name__ is None:
+ raise ImportError("Failed to import utils")
+ from piaplib import pku as pku
+ if pku.__name__ is None:
+ raise ImportError("Failed to import pku")
+ from pku import utils as utils
+ if utils.__name__ is None:
+ raise ImportError("Failed to import utils")
+ test = None
+ test_name = str("test_no_dot")
+ self.assertIsNone(test)
+ test = utils.addExtension(test_name, None)
+ self.assertIsNotNone(test)
+ self.assertEqual(test, test_name)
+ except Exception as err:
+ print(str(""))
+ print(str(type(err)))
+ print(str(err))
+ print(str((err.args)))
+ print(str(""))
+ err = None
+ del err
+ theResult = False
+ assert theResult
+
+ def test_case_utils_file_with_short_ext(self):
+ """Tests the addExtension when input is (test, txt)"""
+ theResult = True
+ try:
+ from .context import piaplib as piaplib
+ if piaplib.__name__ is None:
+ raise ImportError("Failed to import pku")
+ from piaplib import pocket as pocket
+ if pocket.__name__ is None:
+ raise ImportError("Failed to import utils")
+ from piaplib import pku as pku
+ if pku.__name__ is None:
+ raise ImportError("Failed to import pku")
+ from pku import utils as utils
+ if utils.__name__ is None:
+ raise ImportError("Failed to import utils")
+ test = None
+ test_name = str("test_no_dot")
+ test_ext = str("txt")
+ self.assertIsNone(test)
+ test = utils.addExtension(test_name, test_ext)
+ self.assertIsNotNone(test)
+ self.assertNotEqual(test, test_name)
+ except Exception as err:
+ print(str(""))
+ print(str(type(err)))
+ print(str(err))
+ print(str((err.args)))
+ print(str(""))
+ err = None
+ del err
+ theResult = False
+ assert theResult
+
+ def test_case_utils_file_with_long_ext(self):
+ """Tests the addExtension when input is (test, much_longer_extension)"""
+ theResult = True
+ try:
+ from .context import piaplib as piaplib
+ if piaplib.__name__ is None:
+ raise ImportError("Failed to import pku")
+ from piaplib import pocket as pocket
+ if pocket.__name__ is None:
+ raise ImportError("Failed to import utils")
+ from piaplib import pku as pku
+ if pku.__name__ is None:
+ raise ImportError("Failed to import pku")
+ from pku import utils as utils
+ if utils.__name__ is None:
+ raise ImportError("Failed to import utils")
+ test = None
+ test_name = str("test")
+ test_ext = str("much_longer_extension")
+ self.assertIsNone(test)
+ test = utils.addExtension(test_name, test_ext)
+ self.assertIsNotNone(test)
+ self.assertNotEqual(test, test_name)
+ except Exception as err:
+ print(str(""))
+ print(str(type(err)))
+ print(str(err))
+ print(str((err.args)))
+ print(str(""))
+ err = None
+ del err
+ theResult = False
+ assert theResult
+
def test_case_utils_read_write_file(self):
"""Tests the read and write functions"""
theResult = False
diff --git a/tox.ini b/tox.ini
index 9c2bfd6..d124789 100644
--- a/tox.ini
+++ b/tox.ini
@@ -180,6 +180,7 @@ basepython =
flake: python3
deps =
flake: flake8>=2.5.4
+ mccabe>=0.6.1
pyflakes>=1.1.0
pep8>=1.7.0
coverage>=4.4