Skip to content

Commit

Permalink
Move meat of processing to utils and add tests
Browse files Browse the repository at this point in the history
  • Loading branch information
sa7mon committed Jun 5, 2018
1 parent 120791e commit 4956054
Show file tree
Hide file tree
Showing 3 changed files with 101 additions and 49 deletions.
50 changes: 2 additions & 48 deletions s3scanner.py
Expand Up @@ -18,52 +18,6 @@
currentVersion = '1.0.0'


def checkBucket(inBucket):
# Determine what kind of input we're given. Options:
# bucket name i.e. mybucket
# domain name i.e. flaws.cloud
# full S3 url i.e. flaws.cloud.s3-us-west-2.amazonaws.com
# bucket:region i.e. flaws.cloud:us-west-2
if ".amazonaws.com" in inBucket: # We were given a full s3 url
bucket = inBucket[:inBucket.rfind(".s3")]
elif ":" in inBucket: # We were given a bucket in 'bucket:region' format
bucket = inBucket.split(":")[0]
else: # We were either given a bucket name or domain name
bucket = inBucket

valid = s3.checkBucketName(bucket)

if not valid:
message = "{0:>11} : {1}".format("[invalid]", bucket)
slog.error(message)
# continue
return

if s3.awsCredsConfigured:
b = s3.checkAcl(bucket)
else:
a = s3.checkBucketWithoutCreds(bucket)
b = {"found": a, "acls": "unknown - no aws creds"}

if b["found"]:

size = s3.getBucketSize(bucket) # Try to get the size of the bucket

message = "{0:>11} : {1}".format("[found]", bucket + " | " + size + " | ACLs: " + str(b["acls"]))
slog.info(message)
flog.debug(bucket)

if args.dump:
if size not in ["AccessDenied", "AllAccessDisabled"]:
slog.info("{0:>11} : {1} - {2}".format("[found]", bucket, "Attempting to dump...this may take a while."))
s3.dumpBucket(bucket)
if args.list:
if str(b["acls"]) not in ["AccessDenied", "AllAccessDisabled"]:
s3.listBucket(bucket)
else:
message = "{0:>11} : {1}".format("[not found]", bucket)
slog.error(message)


# We want to use both formatter classes, so a custom class it is
class CustomFormatter(argparse.RawTextHelpFormatter, argparse.RawDescriptionHelpFormatter):
Expand Down Expand Up @@ -149,7 +103,7 @@ class CustomFormatter(argparse.RawTextHelpFormatter, argparse.RawDescriptionHelp
with open(args.buckets, 'r') as f:
for line in f:
line = line.rstrip() # Remove any extra whitespace
checkBucket(line)
s3.checkBucket(line, slog, flog, args.dump, args.list)
else:
# It's a single bucket
checkBucket(args.buckets)
s3.checkBucket(args.buckets, slog, flog, args.dump, args.list)
48 changes: 48 additions & 0 deletions s3utils.py
Expand Up @@ -68,6 +68,54 @@ def checkAwsCreds():
return True


def checkBucket(inBucket, slog, flog, argsDump, argsList):
# Determine what kind of input we're given. Options:
# bucket name i.e. mybucket
# domain name i.e. flaws.cloud
# full S3 url i.e. flaws.cloud.s3-us-west-2.amazonaws.com
# bucket:region i.e. flaws.cloud:us-west-2

if ".amazonaws.com" in inBucket: # We were given a full s3 url
bucket = inBucket[:inBucket.rfind(".s3")]
elif ":" in inBucket: # We were given a bucket in 'bucket:region' format
bucket = inBucket.split(":")[0]
else: # We were either given a bucket name or domain name
bucket = inBucket

valid = checkBucketName(bucket)

if not valid:
message = "{0:>11} : {1}".format("[invalid]", bucket)
slog.error(message)
# continue
return

if awsCredsConfigured:
b = checkAcl(bucket)
else:
a = checkBucketWithoutCreds(bucket)
b = {"found": a, "acls": "unknown - no aws creds"}

if b["found"]:

size = getBucketSize(bucket) # Try to get the size of the bucket

message = "{0:>11} : {1}".format("[found]", bucket + " | " + size + " | ACLs: " + str(b["acls"]))
slog.info(message)
flog.debug(bucket)

if argsDump:
if size not in ["AccessDenied", "AllAccessDisabled"]:
slog.info("{0:>11} : {1} - {2}".format("[found]", bucket, "Attempting to dump...this may take a while."))
dumpBucket(bucket)
if argsList:
if str(b["acls"]) not in ["AccessDenied", "AllAccessDisabled"]:
listBucket(bucket)
else:
message = "{0:>11} : {1}".format("[not found]", bucket)
slog.error(message)


def checkBucketName(bucketName):
""" Checks to make sure bucket names input are valid according to S3 naming conventions
:param bucketName: Name of bucket to check
Expand Down
52 changes: 51 additions & 1 deletion test_scanner.py
Expand Up @@ -4,6 +4,7 @@
import sys
import shutil
import time
import logging


pyVersion = sys.version_info
Expand Down Expand Up @@ -173,6 +174,55 @@ def test_checkAwsCreds():
# checkAwsCreds.1
assert s3.checkAwsCreds() == credsActuallyConfigured

def test_checkBucket():
"""
checkBucket.1 - Bucket name
checkBucket.2 - Domain name
checkBucket.3 - Full s3 url
checkBucket.4 - bucket:region
"""

test_setup()

testFile = './test/test_checkBucket.txt'

# Create file logger
flog = logging.getLogger('s3scanner-file')
flog.setLevel(logging.DEBUG) # Set log level for logger object

# Create file handler which logs even debug messages
fh = logging.FileHandler(testFile)
fh.setLevel(logging.DEBUG)

# Add the handler to logger
flog.addHandler(fh)

# Create secondary logger for logging to screen
slog = logging.getLogger('s3scanner-screen')
slog.setLevel(logging.CRITICAL)

try:
# checkBucket.1
s3.checkBucket("flaws.cloud", slog, flog, False, False)

# checkBucket.2
s3.checkBucket("flaws.cloud.s3-us-west-2.amazonaws.com", slog, flog, False, False)

# checkBucket.3
s3.checkBucket("flaws.cloud:us-west-2", slog, flog, False, False)

# Read in test loggin file and assert
f = open(testFile, 'r')
results = f.readlines()
f.close()

assert results[0].rstrip() == "flaws.cloud"
assert results[1].rstrip() == "flaws.cloud"
assert results[2].rstrip() == "flaws.cloud"

finally:
# Delete test file
os.remove(testFile)

def test_checkBucketName():
"""
Expand Down Expand Up @@ -211,7 +261,7 @@ def test_checkBucketName():

def test_checkBucketWithoutCreds():
"""
Scenario checkBucketwc.1 - Non-existent bucket
Scenario checkBucketwc.1 - Non-existent bucket
Scenario checkBucketwc.2 - Good bucket
Scenario checkBucketwc.3 - No public read perm
"""
Expand Down

0 comments on commit 4956054

Please sign in to comment.