Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

Importing Freds changes.

  • Loading branch information...
commit 86668d98b31c5e72d4fbf5562f2ece266cc76b64 1 parent 03cb29b
@ygjb ygjb authored
View
240 Garmr/corechecks.py
@@ -1,7 +1,6 @@
-from urlparse import urlparse
+from urlparse import urlparse, urljoin
import requests
-from scanner import ActiveTest, PassiveTest, Scanner, get_url
-
+from scanner import ActiveTest, PassiveTest, HtmlTest, Scanner
class HttpOnlyAttributePresent(PassiveTest):
description = "Inspect the Set-Cookie: header and determine if the HttpOnly attribute is present."
@@ -12,11 +11,11 @@ def analyze(self, response):
if "httponly" in response.headers[cookieheader].lower():
result = self.result("Pass", "HttpOnly is set", response.headers[cookieheader])
else:
- result = self.result("Fail", "HttpOnly is not set", response.headers[cookieheader])
+ result = self.result("Fail", "HttpOnly is not set", response.headers[cookieheader])
else:
result = self.result("Skip", "No cookie is set by this response.", None)
return result
-
+
class SecureAttributePresent(PassiveTest):
description = "Inspect the Set-Cookie: header and determine if the Secure attribute is present."
def analyze(self, response):
@@ -24,20 +23,20 @@ def analyze(self, response):
cookieheader = "Set-Cookie"
has_cookie = cookieheader in response.headers
if has_cookie:
- if "httponly" in response.headers[cookieheader].lower():
+ if "secure" in response.headers[cookieheader].lower():
if url.scheme == "https":
- result = self.result("Pass", "HttpOnly is set", response.headers[cookieheader])
+ result = self.result("Pass", "Secure cookie attribute is set", response.headers[cookieheader])
else:
- result = self.result("Fail", "HttpOnly should only be set for cookies sent over SSL.", response.headers[cookieheader])
+ result = self.result("Fail", "Secure cookie attribute should only be set for cookies sent over SSL.", response.headers[cookieheader])
else:
if url.scheme == "https":
- result = self.result("Fail", "HttpOnly is not set", response.headers[cookieheader])
+ result = self.result("Fail", "Secure cookie attribute is not set", response.headers[cookieheader])
else:
- result = self.result("Pass", "The secure attribute is not set (expected for HTTP)", response.headers[cookieheader])
+ result = self.result("Pass", "The secure attribute is not set (expected for HTTP)", response.headers[cookieheader])
else:
result = self.result("Skip", "No cookie is set by this response.", None)
return result
-
+
class StrictTransportSecurityPresent(PassiveTest):
secure_only = True
@@ -66,7 +65,8 @@ class Http200Check(ActiveTest):
run_passives = True
description = "Make a GET request to the specified URL, reporting success only on a 200 response without following redirects"
def do_test(self, url):
- response = get_url(url, False)
+ sess = self.sessions[self.url]
+ response = sess.get(url, allow_redirects=False)
if response.status_code == 200:
result = self.result("Pass", "The request returned an HTTP 200 response.", None)
else:
@@ -77,57 +77,211 @@ class WebTouch(ActiveTest):
run_passives = True
description = "Make a GET request to the specified URL, and check for a 200 response after resolving redirects."
def do_test(self, url):
- response = requests.get(url)
+ sess = self.sessions[self.url]
+ response = sess.get(url)
if response.status_code == 200:
result = self.result("Pass", "The request returned an HTTP 200 response.", None)
else:
result = self.result("Fail", "The response code was %s" % response.status_code, None)
return (result, response)
-
-class StsUpgradeCheck(ActiveTest):
+
+
+class StsPresentCheck(ActiveTest):
+ insecure_only = False
+ run_passives = True
+ description = "Inspect the second response in the Strict-Transport-Security redirect process according to http://tools.ietf.org/html/draft-hodges-strict-transport-sec"
+ events = {}
+ def do_test(self, url):
+ stsheader = "Strict-Transport-Security"
+ #XXX hack: we should take response isntead
+ url = url.replace('http:', 'https:')
+ #XXX end of hack
+ sess = self.sessions[self.url]
+ response = sess.get(url, allow_redirects=False)
+ if stsheader in response.headers:
+ result = self.result('Pass', 'Subsequential HTTPS Response for STS contained corresponding STS header', None)
+ else:
+ result = self.result('Fail', 'Subsequential HTTPS Response did not contain STS header', None)
+ return (result, response)
+
+class StsRedirectCheck(ActiveTest):
insecure_only = True
- run_passives = False
- description = "Inspect the Strict-Transport-Security redirect process according to http://tools.ietf.org/html/draft-hodges-strict-transport-sec"
-
+ run_passives = True
+ description = "Inspect the first response in the Strict-Transport-Security redirect process according to http://tools.ietf.org/html/draft-hodges-strict-transport-sec"
+ events = { "Pass": StsPresentCheck,
+ "Error": None,
+ "Fail": None }
+
def do_test(self, url):
stsheader = "Strict-Transport-Security"
u = urlparse(url)
if u.scheme == "http":
- correct_header = False
+ sess = self.sessions[self.url]
+ response = sess.get(url, allow_redirects=False)
+ invalid_header = stsheader in response.headers
+ is_redirect = response.status_code == 301
bad_redirect = False
- response1 = get_url(url, False)
- invalid_header = stsheader in response1.headers
- is_redirect = response1.status_code == 301
if is_redirect == True:
- redirect = response1.headers["location"]
- r = urlparse(redirect)
- if r.scheme == "https":
- response2 = get_url(redirect, False)
- correct_header = stsheader in response2.headers
+ redirect = response.headers['location']
+ r = urlparse(redirect) #XXX do we need to check for same-domain? see sts draft!
+ if r.scheme != 'https':
+ pass
else:
bad_redirect = True
-
- success = invalid_header == False and is_redirect == True and correct_header == True
- if success == True:
+
+ #continue w/ Pass to see if next location contains stsheader?
+ next_test = (invalid_header == False) and (is_redirect == True) and (bad_redirect == False)
+ if next_test == True:
message = "The STS upgrade occurs properly (no STS header on HTTP, a 301 redirect, and an STS header in the subsequent request."
else:
- message = "%s%s%s%s" % (
+ message = "%s%s%s" % (
"The initial HTTP response included an STS header (RFC violation)." if invalid_header else "",
"" if is_redirect else "The initial HTTP response should be a 301 redirect (RFC violation see ).",
- "" if correct_header else "The followup to the 301 redirect must include the STS header.",
"The 301 location must use the https scheme." if bad_redirect else ""
)
- result = self.result("Pass" if success else "Fail", message, None)
- return (result, response1)
-
-
+ result = self.result('Pass' if next_test else 'Fail', message, None)
+ return (result, response)
+
+ else:
+ #XXX maybe just /change/ the scheme to enforce checking?
+ result = self.result('Skip', 'Not checking for STS-Upgrade on already-secure connection', None)
+ return result, None
+
+
+
+class CSPPolicyCheck(ActiveTest):
+ insecure_only = False
+ run_passives = True
+ description = 'checks if the policy is present'
+ def do_test(self, url, pred):
+ cspheader = "X-Content-Security-Policy"
+ csproheader = 'X-Content-Security-Policy-Report-Only'
+ response = pred['response']
+ if cspheader in response.headers or csproheader in response.headers:
+ if cspheader in response.headers:
+ h = response.headers[cspheader]
+ elif csproheader in response.headers:
+ h = response.headers[csproheader]
+ harr = h.split(' ')
+ if harr[0].lower() == 'policy-uri':
+ url = urljoin(response.url, harr[1]) # join previous URL with the one coming from the header
+ sess = self.sessions[self.url]
+ resp = sess.get(url) # allow_redirects=False?
+ if resp.status_code == 200:
+ result = self.result('Pass', 'Policy file present', resp)
+ else:
+ result = self.result('Fail', 'Policy file not found', resp)
+ return result, resp
+
+
+
+
+class CSPHeaderCheck(ActiveTest):
+ # please revise after another readthrough of https://wiki.mozilla.org/Security/CSP/Specification#Sample_Policy_Definitions necessary
+ insecure_only = False
+ run_passives = True
+ description = "Checks if the CSP Header is present and links to a policy. If it does, we will forward to another test to check if it present"
+ events = {'Pass': CSPPolicyCheck}
+ def do_test(self, url):
+ cspheader = "X-Content-Security-Policy"
+ csproheader = 'X-Content-Security-Policy-Report-Only'
+ #x-content-security-policy-report-only: policy-uri /services/csp/policy?build=1919
+
+ sess = self.sessions[self.url]
+ response = sess.get(url, allow_redirects=False)
+ if cspheader in response.headers or csproheader in response.headers:
+ if cspheader in response.headers:
+ h = response.headers[cspheader]
+ elif csproheader in response.headers:
+ h = response.headers[csproheader]
+ harr = h.split(' ')
+ if harr[0].lower() == 'policy-uri':
+ result = self.result('Pass', 'CSP Header present and points to a policy file', None)
+ #XXX is this line correct here??
+ else:
+ result = self.result('Fail', 'No %s or %s in headers' % (cspheader, csproheader), None)
+ return (result, response)
+
+
+class HttpsLoginForm(HtmlTest):
+ description = "Check that html forms with password-type inputs point to https"
+ def analyze_html(self, response, soup):
+ url = urlparse(response.url)
+ forms = soup.findAll('form')
+ # look only at those form elements that have password type input elements as children
+ forms = filter(lambda x: x.findChildren("input", type="password") ,forms)
+ if len(forms) == 0:
+ result = self.result("Skip", "There are no login forms on this page", None)
+ return result
+ failforms = []
+ for form in forms:
+ if url.scheme == "https":
+ if form['action'].startswith('http:'):
+ failforms.append(form)
+ else:
+ if not form['action'].startswith('https'):
+ failforms.append(form)
+ if len(failforms) == 0:
+ result = self.result("Pass", "All login forms point to secure resources", forms)
+ else:
+ result = self.result("Fail", "There are login forms pointing to insecure locations", failforms)
+ return result
+
+
+class HttpsResourceOnHttpsLink(HtmlTest):
+ # also called 'mixed content'
+ description = "Check if all external resources are pointing to https links, when on https page"
+ secure_only = True
+ def analyze_html(self, response, soup):
+ ''' there is a list on stackoverflow[1] which claims to contain
+ all possible attributes hat may carry a URL. is
+ there a way to confirm this list is exhaustive?
+ I have removed attributes which are just links/pointers,
+ we only want those attributes to resources, the browser
+ downloads automatically
+ [1] http://stackoverflow.com/questions/2725156/complete-list-of-html-tag-attributes-which-have-a-url-value/2725168#2725168
+ '''
+ attrlist = ['codebase', 'background', 'src', 'usemap', 'data', 'icon', 'manifest', 'poster', 'archive']
+ failtags = []
+ for tag in soup.findAll(True):
+ for attr in attrlist:
+ if tag.has_key(attr):
+ val = tag[attr]
+ if val.startswith('http:'):
+ failtags.append(tag)
+ if len(failtags) == 0:
+ result = self.result("Pass", "All external resources are https", None)
+ else:
+ result = self.result("Fail", "There are links to insecure locations", failtags)
+ return result
+
+class InlineJS(HtmlTest):
+ description = "complain about inline JS to improve migration to CSP"
+ def analyze_html(self, response, soup):
+ url = urlparse(response.url)
+ scripts = soup.findAll('script')
+ if len(scripts) == 0:
+ result = self.result ("Skip", "There are no script tags.", None)
+ return result
+ inlinescripts = filter(lambda x: len(x.text) > 0, scripts)
+ if len(inlinescripts) == 0:
+ result = self.result("Pass", "No inline JavaScript found", None)
+ else:
+ result = self.result("Fail", "Inline JavaScript found", inlinescripts)
+ return result
+
+
def configure(scanner):
if isinstance(scanner, Scanner) == False:
raise Exception("Cannot configure a non-scanner object!")
- scanner.register_check(Http200Check())
- scanner.register_check(WebTouch())
- scanner.register_check(StrictTransportSecurityPresent())
- scanner.register_check(XFrameOptionsPresent())
- scanner.register_check(StsUpgradeCheck())
- scanner.register_check(HttpOnlyAttributePresent())
- scanner.register_check(SecureAttributePresent())
+ scanner.register_check(Http200Check)
+ scanner.register_check(WebTouch)
+ scanner.register_check(StrictTransportSecurityPresent)
+ scanner.register_check(XFrameOptionsPresent)
+ scanner.register_check(StsRedirectCheck)
+ scanner.register_check(HttpOnlyAttributePresent)
+ scanner.register_check(SecureAttributePresent)
+ scanner.register_check(HttpsLoginForm)
+ scanner.register_check(HttpsResourceOnHttpsLink)
+ scanner.register_check(InlineJS)
+ scanner.register_check(CSPHeaderCheck)
View
38 Garmr/garmr.py
@@ -12,6 +12,7 @@ def main():
parser = argparse.ArgumentParser("Runs a set of tests against the set of provided URLs")
parser.add_argument("-u", "--url", action="append", dest="targets", help="Add a target to test")
parser.add_argument("-f", "--target-file", action="append", dest="target_files", help="File with URLs to test")
+ parser.add_argument("-S", "--new-sessions", action="store_true", default=False, dest="new_sessions", help="Create new Session for each test")
parser.add_argument("-m", "--module", action="append", default = [], dest="modules", help="Load an extension module")
parser.add_argument("-D", "--disable-core", action="store_true", default = False, dest="disable_core", help="Disable corechecks")
@@ -22,19 +23,28 @@ def main():
parser.add_argument("-c", "--check", action="append", dest="opts", help="Set a parameter for a check (check:opt=value)" )
parser.add_argument("-e", "--exclude", action="append", dest="exclusions", help="Prevent a check from being run/processed")
parser.add_argument("--save", action="store", dest="dump_path", help="Write out a configuration file based on parameters (won't run scan)")
-
+
args = parser.parse_args()
scanner = Scanner()
-
+
scanner.force_passives = args.force_passives
scanner.resolve_target = args.resolve_target
scanner.output = args.output
-
+
+
+
+ # Configure new-ssion item
+ if args.new_sessions:
+ Scanner.logger.info('Enforcing new sessions for each test')
+ # for each test, not for
+ ActiveTest.new_session = True
+
+
# Start building target list.
if args.targets != None:
for target in args.targets:
scanner.register_target(target)
-
+
# Add targets from files to the list.
if args.target_files != None:
for targets in args.target_files:
@@ -46,13 +56,13 @@ def main():
scanner.register_target(t)
except:
Scanner.logger.error("Unable to process the target list in: %s", targets)
-
+
# Load built-in modules if required.
if args.disable_core == False:
corechecks.configure(scanner)
-
+
# Configure modules.
- # TODO: change the module loading to scan the list of classes in a module and automagically
+ # TODO: change the module loading to scan the list of classes in a module and automagically
# detect any tests defined.
if args.modules != None:
for module in args.modules:
@@ -63,7 +73,7 @@ def main():
except Exception, e:
Scanner.logger.fatal("Unable to load the requested module [%s]: %s", module, e)
quit()
-
+
# Set up the reporter (allow it to load from modules that are configured)
try:
reporter = args.report.split('.')
@@ -77,12 +87,12 @@ def main():
except Exception, e:
Scanner.logger.fatal("Unable to use the reporter class [%s]: %s", args.report, e)
quit()
-
+
# Disable excluded checks.
if args.exclusions != None:
for exclude in args.exclusions:
scanner.disable_check(exclude)
-
+
# Configure checks
if args.opts != None:
for opt in args.opts:
@@ -92,13 +102,13 @@ def main():
scanner.configure_check(check, key, value)
except Exception, e:
Scanner.logger.fatal("Invalid check option: %s (%s)", opt, e)
-
+
if args.dump_path != None:
scanner.save_configuration(args.dump_path)
return
-
+
scanner.run_scan()
-
-
+
+
if __name__ == "__main__":
main()
View
59 Garmr/reporter.py
@@ -4,57 +4,57 @@ class Reporter():
reporters = {}
def start_report(self):
return None
-
+
def start_targets(self):
return None
-
+
def write_target(self, target):
return None
-
+
def start_actives(self):
return None
-
+
def write_active(self, test):
return None
-
+
def start_passives(self):
return None
-
+
def write_passive(self, target):
return None
-
+
def end_passives(self):
return None
-
+
def end_actives(self):
return None
-
+
def end_targets(self):
return None
-
+
def end_report(self):
return "This reporter is unimplemented!"
-
+
class DetailReporter(Reporter):
# TODO Implement detailed reporter
def end_report(self):
return "This reporter should emit an XML report that includes all of the the details for each test, including captured data"
-
+
class AntXmlReporter(Reporter):
-
+
def __init__(self):
self.report = ""
self.errtypes = { 'Error' : "error", 'Fail' : "failure", 'Skip' : "skipped"}
def start_report(self):
self.report = '<?xml version="1.0" encoding="utf-8"?>\n'
-
+
return None
-
+
def start_targets(self):
self.report += "<testsuites>\n"
return None
-
+
def write_target(self, target):
self.states = {}
self.states["Skip"] = 0
@@ -65,25 +65,24 @@ def write_target(self, target):
self.current_target = target
self.lines = ""
return None
-
+
def start_actives(self):
return None
-
+
def write_active(self, test, result):
self.states[result["state"]] += 1
self.checks += 1
- print test
module, check = ("%s" % test ).split('.')[-2:]
self.lines += '\t\t<testcase classname="%s" name="%s" time="%s"' % (module, check, result["duration"])
if result["state"] == "Pass":
self.lines += " />\n"
- else:
- self.lines += '>\n\t\t\t<{errtype}>{message}</{errtype}>\n\t\t</testcase>\n'.format(errtype=self.errtypes[result["state"]], message=result["message"])
+ else:
+ self.lines += '>\n\t\t\t<{errtype}>{message}</{errtype}>\n\t\t</testcase>\n'.format(errtype=self.errtypes[result["state"]], message=result["message"])
return None
-
+
def start_passives(self):
return None
-
+
def write_passive(self, test, result):
self.states[result["state"]] += 1
self.checks += 1
@@ -91,25 +90,25 @@ def write_passive(self, test, result):
self.lines += '\t\t<testcase classname="%s" name="%s" time="%s"' % (module, check, result["duration"])
if result["state"] == "Pass":
self.lines += " />\n"
- else:
+ else:
self.lines += '>\n\t\t\t<{errtype}>{message}</{errtype}>\n\t\t</testcase>\n'.format(errtype=self.errtypes[result["state"]], message=result["message"])
return None
-
+
def end_passives(self):
return None
-
+
def end_actives(self):
self.report+= '\t<testsuite name="{target}" errors="{errors}" failures="{failures}" skips="{skips}" tests="{checks}" time="{duration}">\n{lines}\t</testsuite>\n'.format(
target = self.current_target, errors=self.states["Error"], failures = self.states["Fail"],
skips = self.states["Skip"], checks = self.checks, duration=100, lines=self.lines)
return None
-
+
def end_targets(self):
self.report += "</testsuites>\n"
return None
-
+
def end_report(self):
return self.report
-
+
Reporter.reporters['xml'] = AntXmlReporter()
-
+
View
287 Garmr/scanner.py
@@ -1,78 +1,129 @@
from datetime import datetime
from reporter import Reporter
from urlparse import urlparse
+from BeautifulSoup import BeautifulSoup
import ConfigParser
import logging
import requests
import socket
import traceback
+from inspect import getargspec
+import subprocess
+import json
-def clean_headers(self, response_headers):
- headers = {}
- for head in response_headers:
- lst = head.strip(" \r\n").split(":")
- headers[lst[0]] = lst[1].strip()
- return headers
-def get_url(url, status = True):
- r = requests.get(url, allow_redirects = False)
- if status:
- r.raise_for_status()
- return r
+
+def exec_helper(cmd, args=None):
+ '''Use this function to call helpers, not to perform checks!
+ The example use-case would a Testcase that required to get an
+ authentication tokenout of a mailbox to complete a login procedure.
+ The email-fetching would be done as a helper
+ '''
+ if not args:
+ params = cmd
+ else:
+ params = [cmd] + args # becomes [cmd, arg1, arg2]
+ try:
+ output = subprocess.Popen(params, stdout=subprocess.PIPE).communicate()[0] # use Popen instead of subprocess.get_output for Python2.6 compatibility
+ try:
+ res = json.loads(output)
+ except ValueError:
+ return {"result":"Fail", "message":"Invalid JSON data", "data":""}
+ if 'result' in res and 'message' in res and 'data' in res:
+ return res
+ else:
+ return {"result":"Fail", "message":"Incomplete JSON data. Your helper should return a dict with the keys result, message and data.", "data":""}
+ except subprocess.CalledProcessError as e: # raised when? for Popen?
+ return {"result":"Fail", "message":"The helper script returned with a non-zero returnvalue", "data": e.output}
class PassiveTest():
secure_only = False
insecure_only = False
-
+
def analyze(self, response, results):
return None
-
+
def result(self, state, message, data):
return {'state' : state, 'message' : message, 'data' : data }
-
-class ActiveTest():
-
+
+class ActiveTest():
+ new_session = False # enable (e.g. from cli) to enforce new session generation
secure_only = False
insecure_only = False
run_passives = True
description = "The base class for an Active Test."
-
+ sessions = {}
+
def __init__(self):
if hasattr(self, "setup"):
self.setup()
-
- def execute(self, url):
- try:
- result = self.do_test(url)
+
+ #def get_url(self, url, status = True):
+ # try:
+ # sess = self.sessions[self.url]
+ # except KeyError:
+ # sess = requests.session()
+ # #print "Issue request towards %s using %s" % (url, sess.cookies)
+ # r = sess.get(url, allow_redirects = False)
+ # print url, r.status_code, status
+ # if status:
+ # r.raise_for_status()
+ # return r
+
+ def execute(self, url, predecessor=None):
+ self.url = url
+ if self.url not in self.sessions or self.new_session:
+ self.sessions[url] = requests.session() # Create per-target session
+ try:
+ if "pred" in getargspec(self.do_test).args:
+ resulttuple = self.do_test(url, predecessor)
+ else:
+ resulttuple = self.do_test(url)
except Exception, e:
tb = traceback.format_exc()
- result = (ActiveTest().result("Error", e, tb), None)
-
- return result
-
+ resulttuple = (ActiveTest().result("Error", e, tb), None)
+
+ return resulttuple
+
def result(self, state, message, data):
return { 'state' : state, 'message' : message, 'data' : data, 'passive' : {}}
-
+
+class HtmlTest(PassiveTest):
+ description = 'allow easy analysis of html source code'
+ def analyze(self, response):
+ if 'text/html' in response.headers['content-type']:
+ soup = BeautifulSoup(response.content)
+ return self.analyze_html(response, soup)
+ else:
+ result = self.result("Skip", "Content-type is not html "+ response.headers['content-type'], None)
+ return result
+
+ def analyze_html(self, response, soup):
+ """ implement this method in subclass"""
+ pass
+
class Scanner():
logging.basicConfig(format='%(asctime)s %(levelname)-8s %(message)s')
logger = logging.getLogger("Garmr-Scanner")
logger.setLevel(logging.DEBUG)
-
+
def __init__(self):
self.resolve_target = True
self.force_passives = False
- self._passive_tests_ = {}
- self._active_tests_ = {}
+ self._disabled_tests_ = []
+ self._passive_tests_ = []
+ self._active_tests_ = []
+ self._finished_active_tests_ = []
self._targets_ = []
self._protos_ = ["http", "https"]
Scanner.logger.debug("Scanner initialized.")
self.reporter = Reporter()
self.modules = []
-
- def do_passive_scan(self, passive, is_ssl, response):
- if passive.secure_only and not is_ssl:
- Scanner.logger.debug("\t\t[%s] Skip Test invalid for http scheme" % passive.__class__)
+
+ def do_passive_scan(self, passiveclass, is_ssl, response):
+ if passiveclass.secure_only and not is_ssl:
+ Scanner.logger.debug("\t\t[%s] Skip Test invalid for http scheme" % passiveclass)
passive_result = PassiveTest().result("Skip", "This check is only applicable to SSL requests.", None)
start = datetime.now()
passive_result['start'] = start
@@ -80,65 +131,91 @@ def do_passive_scan(self, passive, is_ssl, response):
passive_result["duration"] = 0
else:
start = datetime.now()
+ passive = passiveclass()
passive_result = passive.analyze(response)
end = datetime.now()
td = end - start
passive_result['start'] = start
passive_result['end'] = end
passive_result['duration'] = float((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)) / 10**6
- Scanner.logger.info("\t\t[%s] %s %s" % (passive.__class__, passive_result['state'], passive_result['message']))
+ Scanner.logger.info("\t\t[%s] %s %s" % (passiveclass, passive_result['state'], passive_result['message']))
return passive_result
-
- def do_active_scan(self, test, is_ssl, target):
- if (test.secure_only and not is_ssl):
- Scanner.logger.info("\t[Skip] [%s] (reason: secure_only)" % test.__class__)
+
+ def do_active_scan(self, testclass, is_ssl, target):
+ ''' instantiate the class and run it against the specified target, if applicable '''
+ if (testclass.secure_only and not is_ssl):
+ Scanner.logger.info("\t[Skip] [%s] (reason: secure_only)" % testclass)
result = ActiveTest().result("Skip", "This check is only applicable to SSL requests", None)
result['start'] = datetime.now()
result['end'] = result['start']
result['duration'] = 0
return result
- elif (test.insecure_only and is_ssl):
- Scanner.logger.info("\t[Skip] [%s] (reason: insecure_only)" % test.__class__)
+ elif (testclass.insecure_only and is_ssl):
+ Scanner.logger.info("\t[Skip] [%s] (reason: insecure_only)" % testclass)
result = ActiveTest().result("Skip", "This check is only applicable to SSL requests", None)
result['start'] = datetime.now()
result['end'] = result['start']
result['duration'] = 0
return result
+ elif str(testclass).split('.')[-1] in self._disabled_tests_:
+ Scanner.logger.info("\t[Skip] [%s] (reason: disabled)" % testclass)
+ result = ActiveTest().result("Skip", "This check was marked as disabled.", None)
+ result['start'] = datetime.now()
+ result['end'] = result['start']
+ result['duration'] = 0
+ return result
start = datetime.now()
- result, response = test.execute(target)
+ test = testclass() # from now on we have an instance of the class
+ if "pred" in getargspec(test.do_test).args:
+ # Check if class accepts this parameter. avoids rewriting.
+ predecessor_results = self.results[self._finished_active_tests_[-1]]
+ result, response = test.execute(target, predecessor=predecessor_results)
+ else:
+ result, response = test.execute(target)
end = datetime.now()
td = end - start
+ result['response'] = response
result['start'] = start
result['end'] = end
result['duration'] = float((td.microseconds + (td.seconds + td.days * 24 * 3600) * 10**6)) / 10**6
- Scanner.logger.info("\t[%s] %s %s" % (test.__class__, result['state'], result['message']))
- self.reporter.write_active(test.__class__, result)
+ Scanner.logger.info("\t[%s] %s %s" % (testclass, result['state'], result['message']))
+ self.reporter.write_active(testclass, result)
if (result['state'] == "Error"):
Scanner.logger.error(result['data'])
if response != None and test.run_passives:
result['passive'] = {}
self.reporter.start_passives()
- for passive_key in self._passive_tests_.keys():
- passive = self._passive_tests_[passive_key]["test"]
- result["passive"][passive.__class__] = self.do_passive_scan(passive, is_ssl, response)
- self.reporter.write_passive(passive.__class__, result["passive"][passive.__class__])
+ for passive_testclass in self._passive_tests_:
+ result["passive"][passive_testclass] = self.do_passive_scan(passive_testclass, is_ssl, response)
+ self.reporter.write_passive(passive_testclass, result["passive"][passive_testclass])
self.reporter.end_passives()
return result
-
+
def scan_target(self, target):
+ ''' iterate over registered tests and deligate for scan '''
self.reporter.write_target(target)
Scanner.logger.info("[%s] scanning:" % target)
url = urlparse(target)
is_ssl = url.scheme == "https"
- results = {}
+ self.results = {}
self.reporter.start_actives()
- for key in self._active_tests_.keys():
- test = self._active_tests_[key]["test"]
- results[test.__class__] = self.do_active_scan(test, is_ssl, target)
+ self.active_tests_stack = self._active_tests_
+ while len(self.active_tests_stack) > 0:
+ testclass = self.active_tests_stack[0]
+ self.active_tests_stack = self.active_tests_stack[1:]
+ self.results[testclass] = self.do_active_scan(testclass, is_ssl, target)
+ if hasattr(testclass, 'events'): #TODO enforce every test to have event dict present?
+ events_lower = dict([(k.lower(),v) for k,v in testclass.events.items()])
+ if self.results[testclass]['state'].lower() in events_lower and events_lower[self.results[testclass]['state'].lower()] != None:
+ nexttest = events_lower[self.results[testclass]['state'].lower()]
+ Scanner.logger.info("\t[%s] Instantiated because %s declares it as its successor (the event was '%s')" % (nexttest, testclass, self.results[testclass]['state']))
+ self.active_tests_stack.append(nexttest) # we have to hand over the response!!1, # important: we hand over an instance, not the class
+ self._finished_active_tests_.append(testclass)
self.reporter.end_actives()
- return results
-
+ return self.results
+
def run_scan(self):
+ ''' iterate over target and deligate to list of tests '''
results = {}
self.reporter.start_report()
self.reporter.start_targets()
@@ -155,12 +232,13 @@ def run_scan(self):
file.write(self.reporter.end_report())
file.close()
-
+
def register_target(self, url):
+ ''' add target to the scanning engine '''
u = urlparse(url)
valid = u.netloc != "" and u.scheme in self._protos_
reason = "%s%s" % ("[bad netloc]" if u.netloc == "" else "", "" if u.scheme in self._protos_ else "[bad scheme]")
-
+
# todo - support ipv6 urls
host = u.netloc.split(':')[0]
if (self.resolve_target):
@@ -176,12 +254,14 @@ def register_target(self, url):
Scanner.logger.debug("[target]: %s" % url)
return
Scanner.logger.error("%s is not a valid target (reason: %s)" % (url, reason))
-
+
def configure_check(self, check_name, key, value):
- if self._active_tests_.has_key(check_name):
- check = self._active_tests_[check_name]["test"]
- elif self._passive_tests_.has_key(check_name):
- check = self._passive_tests_[check_name]["test"]
+ if check_name in map(lambda x: str(x), self._active_tests_):
+ index = map(lambda x: str(x), self._active_tests_).index(check_name)
+ check = self._active_tests_[index]
+ if check_name in map(lambda x: str(x), self._passive_tests_):
+ index = map(lambda x: str(x), self._active_tests_).index(check_name)
+ check = self._active_tests_[index]
else:
raise Exception("The requested check is not available (%s)" % check_name)
if hasattr(check, "config") == False:
@@ -190,34 +270,36 @@ def configure_check(self, check_name, key, value):
raise Exception("%s is not a valid configuration for %s", key, check_name)
check.config[key] = value
Scanner.logger.debug("\t%s.%s=%s" % (check_name, key, value))
-
+
def disable_check(self, check_name):
- if self._active_tests_.has_key(check_name):
- self._active_tests_[check_name]["enabled"] = False
- elif self._passive_tests_.has_key(check_name):
- self._passive_tests_[check_name]["enabled"] = False
+ ''' add a previously added test to a blacklist of test that are to be skipped '''
+ if check_name in map(lambda x: str(x).split('.')[-1], self._active_tests_) or check_name in map(lambda x: str(x).split('.')[-1], self._passive_tests_):
+ self._disabled_tests_.append(check_name)
+ Scanner.logger.debug("\t%s disabled.", check_name)
else:
- raise Exception("The requested check is not available (%s)" % check_name)
- Scanner.logger.debug("\t%s disabled.", check_name)
-
+ print "The requested check is not available (%s)" % check_name
+ print "The list of available checks is %s for actives and %s for passives" % (map(lambda x: str(x).split('.')[-1], self._active_tests_), map(lambda x: str(x).split('.')[-1], self._passive_tests_))
+ Scanner.logger.debug("\t%s NOT disabled, because it could not be found.", check_name)
+
def register_check(self, test):
- module = test.__class__.__module__
-
+ ''' add a test to the scanner '''
+ module = test.__module__
+
if module not in self.modules:
self.modules.append(module)
-
- key = "%s" % test.__class__
- if isinstance(test, ActiveTest):
- self._active_tests_[key]= { "test" : test , "enabled" : True}
- Scanner.logger.debug("Added %s to active tests." % test.__class__)
+
+ if hasattr(test, "execute"):
+ self._active_tests_.append( test)
+ Scanner.logger.debug("Added %s to active tests." % test)
return len(self._active_tests_)
- if isinstance(test, PassiveTest):
- self._passive_tests_[key]= { "test" : test, "enabled" : True}
- Scanner.logger.debug("Added %s to passive tests." % test.__class__)
+ if hasattr(test, "analyze"):
+ self._passive_tests_.append( test)
+ Scanner.logger.debug("Added %s to passive tests." % test)
return len(self._passive_tests_)
raise Exception('test is not a valid test type')
-
+
def save_configuration(self, path):
+ pass #XXX defunct
# write out a configuration file.
config = ConfigParser.RawConfigParser()
config.add_section("Garmr")
@@ -226,29 +308,36 @@ def save_configuration(self, path):
config.set("Garmr", "reporter", self.reporter.__class__)
config.set("Garmr", "output", self.output)
config.set("Garmr", "dns", self.resolve_target)
-
+
if len(self._targets_) > 0:
config.add_section("Targets")
- i = 0
- for target in self._targets_:
+ for i,target in enumerate(self._targets_):
config.set("Targets", "%s"%i, target)
-
- for check in self._active_tests_.keys():
- config.add_section(check)
- config.set(check, "enabled", self._active_tests_[check]["enabled"])
- if hasattr(self._active_tests_[check]["test"], "config"):
- for key in self._active_tests_[check]["test"].config.keys():
- config.set(check, key, self._active_tests_[check]["test"].config[key])
-
- for check in self._passive_tests_.keys():
+
+ for index, check in enumerate(self._active_tests_):
+ check = str(check)
config.add_section(check)
- config.set(check, "enabled", self._passive_tests_[check]["enabled"])
- if hasattr(self._passive_tests_[check]["test"], "config"):
- for key in self._passive_tests_[check]["test"].config.keys():
- config.set(check, key, self._passive_tests_[check]["test"].config[key])
+ if check not in self._disabled_tests_:
+ config.set(check, "enabled", True)
+ else:
+ config.set(check, "enabled", False)
+ if hasattr(self._active_tests_[index], "config"):
+ for key in self._active_tests_[index].config.keys():
+ config.set(check, key, self._active_tests_[index].config[key])
+
+ for index, check in enumerate(self._passive_tests_):
+ check = str(check)
+ config.add_section(str(check))
+ if check not in self._disabled_tests_:
+ config.set(check, "enabled", True)
+ else:
+ config.set(check, "enabled", False)
+ if hasattr(self._passive_tests_[index], "config"):
+ for key in self._passive_tests_[index].config.keys():
+ config.set(check, key, self._passive_tests_[index].config[key])
+
-
with open(path, 'w') as configfile:
config.write(configfile)
-
-
+
+
View
105 README.md
@@ -3,67 +3,64 @@
Garmr is a tool to inspect the responses from websites for basic security requirements.
Garmr includes a set of core test cases implemented in corechecks that are derived from
-the Secure Coding Guidelines that can be found at [https://wiki.mozilla.org/WebAppSec/Secure_Coding_Guidelines]
+the [Mozilla Secure Coding Guidelines](https://wiki.mozilla.org/WebAppSec/Secure_Coding_Guidelines)
## Installation
-This version of Garmr requires Requests > 0.6.1
+This version of Garmr requires Requests > 0.8.3
-git clone https://github.com/ygjb/Garmr.git
-cd Garmr
-sudo python setup.py install
-garmr -u http://my.target.app
+ git clone https://github.com/freddyb/Garmr.git
+ cd Garmr
+ sudo python setup.py install
+ garmr -u http://my.target.app
## Usage
-
-usage: Runs a set of tests against the set of provided URLs
- [-h] [-u TARGETS] [-f TARGET_FILES] [-m MODULES] [-D] [-p] [-d]
+ usage: Runs a set of tests against the set of provided URLs
+ [-h] [-u TARGETS] [-f TARGET_FILES] [-S] [-m MODULES] [-D] [-p] [-d]
[-r REPORT] [-o OUTPUT] [-c OPTS] [-e EXCLUSIONS] [--save DUMP_PATH]
-optional arguments:
- -h, --help show this help message and exit
- -u TARGETS, --url TARGETS
- Add a target to test
- -f TARGET_FILES, --target-file TARGET_FILES
- File with URLs to test
- -m MODULES, --module MODULES
- Load an extension module
- -D, --disable-core Disable corechecks
- -p, --force-passive Force passives to be run for each active test
- -d, --dns Skip DNS resolution when registering a target
- -r REPORT, --report REPORT
- Load a reporter e.g. -r reporter.AntXmlReporter
- -o OUTPUT, --output OUTPUT
- Default output is garmr-results.xml
- -c OPTS, --check OPTS
- Set a parameter for a check (check:opt=value)
- -e EXCLUSIONS, --exclude EXCLUSIONS
- Prevent a check from being run/processed
- --save DUMP_PATH Write out a configuration file based on parameters
- (won't run scan)
-
-A TARGET is an http or https scheme url to execute tests against.
- e.g. garmr -u http://localhost
-
-A MODULE is the name of a module; resolving this path needs to be improved
- e.g. garmr -m djangochecks
-
-An OPTS field contains the path and name of the option to set
- e.g. garmr -m webchecks -c webchecks.RobotsTest:save_contents=True
-
-A REPORT is the namespace qualified name of a reporter object or a valid alias (xml is the only current valid alias, and the default)
- e.g. garmr -r xml
-
-An EXCLUSION prevents a check from being executed
- e.g. garmr -e Garmr.corechecks.WebTouch
-
-Disable core checks will prevent all of the checks in corechecks from being loaded; this is useful to limit the scope of testing.
+ optional arguments:
+ -h, --help show this help message and exit
+ -u TARGETS, --url TARGETS
+ Add a target to test
+ -f TARGET_FILES, --target-file TARGET_FILES
+ File with URLs to test
+ -S, --new-sessions Create new Session for each test
+ -m MODULES, --module MODULES
+ Load an extension module
+ -D, --disable-core Disable corechecks
+ -p, --force-passive Force passives to be run for each active test
+ -d, --dns Skip DNS resolution when registering a target
+ -r REPORT, --report REPORT
+ Load a reporter e.g. -r reporter.AntXmlReporter
+ -o OUTPUT, --output OUTPUT
+ Default output is garmr-results.xml
+ -c OPTS, --check OPTS
+ Set a parameter for a check (check:opt=value)
+ -e EXCLUSIONS, --exclude EXCLUSIONS
+ Prevent a check from being run/processed
+ --save DUMP_PATH Write out a configuration file based on parameters
+ (won't run scan)
+
+
+ A TARGET is an http or https scheme url to execute tests against.
+ e.g. garmr -u http://localhost
+
+ A MODULE is the name of a module; resolving this path needs to be improved
+ e.g. garmr -m djangochecks (Experimental)
+
+ An OPTS field contains the path and name of the option to set
+ e.g. garmr -m webchecks -c webchecks.RobotsTest:save_contents=True
+
+ A REPORT is the namespace qualified name of a reporter object or a valid alias (xml is the only current valid alias, and the default)
+ e.g. garmr -r xml
+
+ An EXCLUSION prevents a check from being executed
+ e.g. garmr -e WebTouch
+
+ Disable core checks will prevent all of the checks in corechecks from being loaded; this is useful to limit the scope of testing.
## Tasks
- * less noisy CLI
- * proxy support (already supported in requests)
- * sessions (controlled; sequence for active tests, with a cookie jar that is propagated through the session)
- * detailed reporting, including the ability to record all HTTP requests and responses generated
- * the ability to filter which passive checks are run by check name or by check type (i.e. cookies, headers, content-type, etc)
- * support for additional protocols (websockets, spdy)
- * Implement instances of each test case for each target scanned to allow them to retain state as a set of tests progresses.
+See [Issues on Github](https://github.com/freddyb/garmr/issues)
+
+
View
58 authchecks.py
@@ -0,0 +1,58 @@
+from urlparse import urlparse
+import requests
+from Garmr.scanner import ActiveTest, PassiveTest, Scanner
+
+
+class SessionTest(ActiveTest):
+ pass
+
+class CaptchaTest(ActiveTest):
+ pass
+
+
+class LoginTest(ActiveTest):
+ '''04:43:14 PM) Yvan Boily: so here is an example; this provides a basic configurable authentication check; the username and password fields are configurable, as are the username and password.
+ the post data is assembled using a built in format that will work with many authentication forms, and the test assumes (naively) that a 200 response is a successful login
+(04:43:27 PM) Yvan Boily: (the description needs to be updated :P)
+(04:44:22 PM) Yvan Boily: the do_test method on ActiveTest could be simply extended to accept a second and third paramter:
+(04:44:43 PM) Yvan Boily: def do_test(self, url, state, preserve):
+(04:45:25 PM) Yvan Boily: the state object would be passed in, and the preserve parameter indicates that the test should not modify the state object if it is set to true
+(04:45:30 PM) freddy: we could just change it to do_test(self, url, *args) and be more precise in the subclass
+(04:46:14 PM) Yvan Boily: it is possible to do that, but I am not a fan of that style. I don't have a better argument than that, so if you want to go that route, feel free :D'''
+
+ run_passives = True
+ description = "check if login works"
+ config = {
+ "uid_field" : "username",
+ "pwd_field" : "password",
+ "username" : "admin",
+ "password" : "admin",
+ "format" : "%s=%s&%s=%s"
+ }
+
+ # eventing needs to be implemented
+ events = { "Pass": SessionTest,
+ "Error": CaptchaTest,
+ "Fail": CaptchaTest }
+
+ def do_test(self, url):
+ u = urlparse(url)
+ post_data = config['format'] % (config["uid_field"] , config["username"], config["pwd_field"], config["password"])
+ response = requests.post(url, post_data)
+ if "Login successful" in response.content:
+ # scrape response for indicators of a successful login
+ result = self.result("Pass", "Authentication was successful", None)
+ else:
+ result = self.result("Fail", "Authentication failed", None)
+ return (result, response)
+
+
+
+
+
+def configure(scanner):
+ #if isinstance(scanner, Scanner) == False:
+ # raise Exception("Cannot configure a non-scanner object!")
+ scanner.register_check(LoginTest())
+
+
View
32 djangochecks.py
@@ -1,16 +1,17 @@
from urlparse import urlparse
import requests
-from Garmr.scanner import ActiveTest, PassiveTest, Scanner, get_url
+from Garmr.scanner import ActiveTest, PassiveTest, Scanner, HtmlTest
class AdminAvailable(ActiveTest):
run_passives = True
config = {"path" : "admin"}
-
+
def do_test(self, url):
u = urlparse(url)
adminurl="%s://%s/%s" % (u.scheme, u.netloc, self.config["path"])
- response = requests.get(adminurl)
+ sess = self.sessions[self.url]
+ response = sess.get(adminurl)
if response.status_code == 200:
result = self.result("Pass", "Django admin page is present at %s." % adminurl, response.content)
else:
@@ -18,8 +19,31 @@ def do_test(self, url):
return (result, response);
+class ProvokeError404(ActiveTest):
+ run_passives = True # we need IsDebugModeReallyEnabled
+ def do_test(self, url):
+ sess = self.sessions(url)
+ url += '76976cd1a3cbadaf77533a' #random garbage
+ response = sess.get(url)
+ result = self.result('Skip', 'This test cannot Pass or Fail, because it relies on the subsequent passive IsDebugModeReallyEnabled test', response)
+ return result, response
+
+class IsDebugModeReallyEnabled(HtmlTest):
+ description = ''
+ secure_only = False
+ def analyze_html(self, response, soup):
+ # we dont really analye the soup, but that's ok;p
+ error_str = "You're seeing this error because you have" #from django source django/views/debug.py - maybe subject to change
+ if error_str in response.content:
+ result = self.result('Fail', 'Typical string of echnical 404/500 error page found', None)
+ else:
+ result = self.result('Pass', 'Debug strings not found', response)
+ return result
+
+
def configure(scanner):
if isinstance(scanner, Scanner) == False:
raise Exception("Cannot configure a non-scanner object!")
+ raise Exception("Cannot configure a non-scanner object!")
scanner.register_check(AdminAvailable())
-
+
View
7 webchecks.py
@@ -1,6 +1,6 @@
from urlparse import urlparse
import requests
-from Garmr.scanner import ActiveTest, PassiveTest, Scanner, get_url
+from Garmr.scanner import ActiveTest, PassiveTest, Scanner
class RobotsTest(ActiveTest):
@@ -10,9 +10,10 @@ class RobotsTest(ActiveTest):
def do_test(self, url):
u = urlparse(url)
roboturl="%s://%s/robots.txt" % (u.scheme, u.netloc)
- response = requests.get(roboturl)
+ sess = self.sessions[self.url]
+ response = sess.get(roboturl)
if response.status_code == 200:
- result = self.result("Pass", "A robots.txt file is present on the server",
+ result = self.result("Pass", "A robots.txt file is present on the server",
response.content if self.config["save_contents"].lower() == "true" else None)
else:
result = self.result("Fail", "No robots.txt file was found.", None)
Please sign in to comment.
Something went wrong with that request. Please try again.