diff --git a/s3scanner.py b/s3scanner.py index 7802335..6cb0f9b 100644 --- a/s3scanner.py +++ b/s3scanner.py @@ -18,52 +18,6 @@ currentVersion = '1.0.0' -def checkBucket(inBucket): - # Determine what kind of input we're given. Options: - # bucket name i.e. mybucket - # domain name i.e. flaws.cloud - # full S3 url i.e. flaws.cloud.s3-us-west-2.amazonaws.com - # bucket:region i.e. flaws.cloud:us-west-2 - if ".amazonaws.com" in inBucket: # We were given a full s3 url - bucket = inBucket[:inBucket.rfind(".s3")] - elif ":" in inBucket: # We were given a bucket in 'bucket:region' format - bucket = inBucket.split(":")[0] - else: # We were either given a bucket name or domain name - bucket = inBucket - - valid = s3.checkBucketName(bucket) - - if not valid: - message = "{0:>11} : {1}".format("[invalid]", bucket) - slog.error(message) - # continue - return - - if s3.awsCredsConfigured: - b = s3.checkAcl(bucket) - else: - a = s3.checkBucketWithoutCreds(bucket) - b = {"found": a, "acls": "unknown - no aws creds"} - - if b["found"]: - - size = s3.getBucketSize(bucket) # Try to get the size of the bucket - - message = "{0:>11} : {1}".format("[found]", bucket + " | " + size + " | ACLs: " + str(b["acls"])) - slog.info(message) - flog.debug(bucket) - - if args.dump: - if size not in ["AccessDenied", "AllAccessDisabled"]: - slog.info("{0:>11} : {1} - {2}".format("[found]", bucket, "Attempting to dump...this may take a while.")) - s3.dumpBucket(bucket) - if args.list: - if str(b["acls"]) not in ["AccessDenied", "AllAccessDisabled"]: - s3.listBucket(bucket) - else: - message = "{0:>11} : {1}".format("[not found]", bucket) - slog.error(message) - # We want to use both formatter classes, so a custom class it is class CustomFormatter(argparse.RawTextHelpFormatter, argparse.RawDescriptionHelpFormatter): @@ -149,7 +103,7 @@ class CustomFormatter(argparse.RawTextHelpFormatter, argparse.RawDescriptionHelp with open(args.buckets, 'r') as f: for line in f: line = line.rstrip() # Remove any extra whitespace - checkBucket(line) + s3.checkBucket(line, slog, flog, args.dump, args.list) else: # It's a single bucket - checkBucket(args.buckets) \ No newline at end of file + s3.checkBucket(args.buckets, slog, flog, args.dump, args.list) \ No newline at end of file diff --git a/s3utils.py b/s3utils.py index 876de03..321c754 100644 --- a/s3utils.py +++ b/s3utils.py @@ -68,6 +68,54 @@ def checkAwsCreds(): return True +def checkBucket(inBucket, slog, flog, argsDump, argsList): + # Determine what kind of input we're given. Options: + # bucket name i.e. mybucket + # domain name i.e. flaws.cloud + # full S3 url i.e. flaws.cloud.s3-us-west-2.amazonaws.com + # bucket:region i.e. flaws.cloud:us-west-2 + + if ".amazonaws.com" in inBucket: # We were given a full s3 url + bucket = inBucket[:inBucket.rfind(".s3")] + elif ":" in inBucket: # We were given a bucket in 'bucket:region' format + bucket = inBucket.split(":")[0] + else: # We were either given a bucket name or domain name + bucket = inBucket + + valid = checkBucketName(bucket) + + if not valid: + message = "{0:>11} : {1}".format("[invalid]", bucket) + slog.error(message) + # continue + return + + if awsCredsConfigured: + b = checkAcl(bucket) + else: + a = checkBucketWithoutCreds(bucket) + b = {"found": a, "acls": "unknown - no aws creds"} + + if b["found"]: + + size = getBucketSize(bucket) # Try to get the size of the bucket + + message = "{0:>11} : {1}".format("[found]", bucket + " | " + size + " | ACLs: " + str(b["acls"])) + slog.info(message) + flog.debug(bucket) + + if argsDump: + if size not in ["AccessDenied", "AllAccessDisabled"]: + slog.info("{0:>11} : {1} - {2}".format("[found]", bucket, "Attempting to dump...this may take a while.")) + dumpBucket(bucket) + if argsList: + if str(b["acls"]) not in ["AccessDenied", "AllAccessDisabled"]: + listBucket(bucket) + else: + message = "{0:>11} : {1}".format("[not found]", bucket) + slog.error(message) + + def checkBucketName(bucketName): """ Checks to make sure bucket names input are valid according to S3 naming conventions :param bucketName: Name of bucket to check diff --git a/test_scanner.py b/test_scanner.py index 69223ae..4c12e7f 100644 --- a/test_scanner.py +++ b/test_scanner.py @@ -4,6 +4,7 @@ import sys import shutil import time +import logging pyVersion = sys.version_info @@ -173,6 +174,55 @@ def test_checkAwsCreds(): # checkAwsCreds.1 assert s3.checkAwsCreds() == credsActuallyConfigured +def test_checkBucket(): + """ + checkBucket.1 - Bucket name + checkBucket.2 - Domain name + checkBucket.3 - Full s3 url + checkBucket.4 - bucket:region + """ + + test_setup() + + testFile = './test/test_checkBucket.txt' + + # Create file logger + flog = logging.getLogger('s3scanner-file') + flog.setLevel(logging.DEBUG) # Set log level for logger object + + # Create file handler which logs even debug messages + fh = logging.FileHandler(testFile) + fh.setLevel(logging.DEBUG) + + # Add the handler to logger + flog.addHandler(fh) + + # Create secondary logger for logging to screen + slog = logging.getLogger('s3scanner-screen') + slog.setLevel(logging.CRITICAL) + + try: + # checkBucket.1 + s3.checkBucket("flaws.cloud", slog, flog, False, False) + + # checkBucket.2 + s3.checkBucket("flaws.cloud.s3-us-west-2.amazonaws.com", slog, flog, False, False) + + # checkBucket.3 + s3.checkBucket("flaws.cloud:us-west-2", slog, flog, False, False) + + # Read in test loggin file and assert + f = open(testFile, 'r') + results = f.readlines() + f.close() + + assert results[0].rstrip() == "flaws.cloud" + assert results[1].rstrip() == "flaws.cloud" + assert results[2].rstrip() == "flaws.cloud" + + finally: + # Delete test file + os.remove(testFile) def test_checkBucketName(): """ @@ -211,7 +261,7 @@ def test_checkBucketName(): def test_checkBucketWithoutCreds(): """ - Scenario checkBucketwc.1 - Non-existent bucket + Scenario checkBucketwc.1 - Non-existent bucket Scenario checkBucketwc.2 - Good bucket Scenario checkBucketwc.3 - No public read perm """