Skip to content

Commit

Permalink
feat: enhance cloudant (Yelp#219)
Browse files Browse the repository at this point in the history
* feat: enhance cloudant

* address comment

* fix broken buid
  • Loading branch information
XIANJUN ZHU committed Oct 30, 2019
1 parent 84a33ff commit 60547d8
Show file tree
Hide file tree
Showing 2 changed files with 70 additions and 47 deletions.
39 changes: 24 additions & 15 deletions cloudant.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,11 @@ class CloudantDetector(RegexBasedDetector):
secret_type = 'Cloudant Credentials'

# opt means optional
opt_quote = r'(?:"|\'|)'
opt_dashes = r'(?:--|)'
opt_dot = r'(?:\.|)'
dot = r'\.'
cl_account = r'[0-9a-z\-\_]+'
cl_account = r'[\w\-]+'
cl = r'(?:cloudant|cl|clou)'
opt_dash_undrscr = r'(?:_|-|)'
opt_api = r'(?:api|)'
cl_key_or_pass = opt_api + r'(?:key|pwd|pw|password|pass|token)'
opt_space = r'(?: |)'
assignment = r'(?:=|:|:=|=>)'
cl_pw = r'([0-9a-f]{64})'
cl_api_key = r'([a-z]{24})'
colon = r'\:'
Expand Down Expand Up @@ -69,7 +63,7 @@ class CloudantDetector(RegexBasedDetector):

def verify(self, token, content, potential_secret=None):

hosts = find_host(content)
hosts = find_account(content)
if not hosts:
return VerifiedResult.UNVERIFIED

Expand All @@ -79,20 +73,35 @@ def verify(self, token, content, potential_secret=None):
return VerifiedResult.VERIFIED_FALSE


def find_host(content):
def find_account(content):
opt_hostname_keyword = r'(?:hostname|host|username|id|user|userid|user-id|user-name|' \
'name|user_id|user_name|uname)'
hostname = r'(\w(?:\w|_|-)+)'
'name|user_id|user_name|uname|account)'
account = r'(\w[\w\-]*)'
opt_basic_auth = r'(?:[\w\-:%]*\@)?'

regex = RegexBasedDetector.assign_regex_generator(
prefix_regex=CloudantDetector.cl,
password_keyword_regex=opt_hostname_keyword,
password_regex=hostname,
regexes = (
RegexBasedDetector.assign_regex_generator(
prefix_regex=CloudantDetector.cl,
password_keyword_regex=opt_hostname_keyword,
password_regex=account,
),
re.compile(
r'{http}{opt_basic_auth}{cl_account}{dot}{cloudant_api_url}'.format(
http=CloudantDetector.http,
opt_basic_auth=opt_basic_auth,
cl_account=account,
cl_api_key=CloudantDetector.cl_api_key,
dot=CloudantDetector.dot,
cloudant_api_url=CloudantDetector.cloudant_api_url,
),
flags=re.IGNORECASE,
),
)

return [
match
for line in content.splitlines()
for regex in regexes
for match in regex.findall(line)
]

Expand Down
78 changes: 46 additions & 32 deletions cloudant_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,9 @@
from detect_secrets.core.constants import VerifiedResult
from detect_secrets.core.potential_secret import PotentialSecret
from detect_secrets.plugins.cloudant import CloudantDetector
from detect_secrets.plugins.cloudant import find_host
from detect_secrets.plugins.cloudant import find_account

CL_HOST = 'testy_test' # also called user
CL_ACCOUNT = 'testy_-test' # also called user
# only detecting 64 hex CL generated password
CL_PW = 'abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234'

Expand All @@ -24,33 +24,33 @@ class TestCloudantDetector(object):
'payload, should_flag',
[
(
'https://{cl_host}:{cl_pw}@{cl_host}.cloudant.com"'.format(
cl_host=CL_HOST, cl_pw=CL_PW,
'https://{cl_account}:{cl_pw}@{cl_account}.cloudant.com"'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
), True,
),
(
'https://{cl_host}:{cl_pw}@{cl_host}.cloudant.com/_api/v2/'.format(
cl_host=CL_HOST, cl_pw=CL_PW,
'https://{cl_account}:{cl_pw}@{cl_account}.cloudant.com/_api/v2/'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
), True,
),
(
'https://{cl_host}:{cl_pw}@{cl_host}.cloudant.com/_api/v2/'.format(
cl_host=CL_HOST, cl_pw=CL_PW,
'https://{cl_account}:{cl_pw}@{cl_account}.cloudant.com/_api/v2/'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
), True,
),
(
'https://{cl_host}:{cl_pw}@{cl_host}.cloudant.com'.format(
cl_host=CL_HOST, cl_pw=CL_PW,
'https://{cl_account}:{cl_pw}@{cl_account}.cloudant.com'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
), True,
),
(
'https://{cl_host}:{cl_api_key}@{cl_host}.cloudant.com'.format(
cl_host=CL_HOST, cl_api_key=CL_API_KEY,
'https://{cl_account}:{cl_api_key}@{cl_account}.cloudant.com'.format(
cl_account=CL_ACCOUNT, cl_api_key=CL_API_KEY,
), True,
),
(
'https://{cl_host}:{cl_pw}.cloudant.com'.format(
cl_host=CL_HOST, cl_pw=CL_PW,
'https://{cl_account}:{cl_pw}.cloudant.com'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
), False,
),
('cloudant_password=\'{cl_pw}\''.format(cl_pw=CL_PW), True),
Expand All @@ -70,8 +70,8 @@ def test_analyze_string(self, payload, should_flag):

@responses.activate
def test_verify_invalid_secret(self):
cl_api_url = 'https://{cl_host}:{cl_pw}@{cl_host}.cloudant.com'.format(
cl_host=CL_HOST, cl_pw=CL_PW,
cl_api_url = 'https://{cl_account}:{cl_pw}@{cl_account}.cloudant.com'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
)
responses.add(
responses.GET, cl_api_url,
Expand All @@ -80,13 +80,13 @@ def test_verify_invalid_secret(self):

assert CloudantDetector().verify(
CL_PW,
'cloudant_host={}'.format(CL_HOST),
'cloudant_host={}'.format(CL_ACCOUNT),
) == VerifiedResult.VERIFIED_FALSE

@responses.activate
def test_verify_valid_secret(self):
cl_api_url = 'https://{cl_host}:{cl_pw}@{cl_host}.cloudant.com'.format(
cl_host=CL_HOST, cl_pw=CL_PW,
cl_api_url = 'https://{cl_account}:{cl_pw}@{cl_account}.cloudant.com'.format(
cl_account=CL_ACCOUNT, cl_pw=CL_PW,
)
responses.add(
responses.GET, cl_api_url,
Expand All @@ -95,22 +95,22 @@ def test_verify_valid_secret(self):
potential_secret = PotentialSecret('test cloudant', 'test filename', CL_PW)
assert CloudantDetector().verify(
CL_PW,
'cloudant_host={}'.format(CL_HOST),
'cloudant_host={}'.format(CL_ACCOUNT),
potential_secret,
) == VerifiedResult.VERIFIED_TRUE
assert potential_secret.other_factors['hostname'] == CL_HOST
assert potential_secret.other_factors['hostname'] == CL_ACCOUNT

@responses.activate
def test_verify_unverified_secret(self):
assert CloudantDetector().verify(
CL_PW,
'cloudant_host={}'.format(CL_HOST),
'cloudant_host={}'.format(CL_ACCOUNT),
) == VerifiedResult.UNVERIFIED

def test_verify_no_secret(self):
assert CloudantDetector().verify(
CL_PW,
'no_un={}'.format(CL_HOST),
'no_un={}'.format(CL_ACCOUNT),
) == VerifiedResult.UNVERIFIED

@pytest.mark.parametrize(
Expand All @@ -120,19 +120,19 @@ def test_verify_no_secret(self):
textwrap.dedent("""
--cloudant-hostname = {}
""")[1:-1].format(
CL_HOST,
CL_ACCOUNT,
),
[CL_HOST],
[CL_ACCOUNT],
),
# With quotes
(
textwrap.dedent("""
cl_host = "{}"
cl_account = "{}"
""")[1:-1].format(
CL_HOST,
CL_ACCOUNT,
),
[CL_HOST],
[CL_ACCOUNT],
),
# multiple candidates
Expand All @@ -143,19 +143,33 @@ def test_verify_no_secret(self):
CLOUDANT_USERID = '{}'
cloudant-uname: {}
""")[1:-1].format(
CL_HOST,
CL_ACCOUNT,
'test2_testy_test',
'test3-testy-testy',
'notanemail',
),
[
CL_HOST,
CL_ACCOUNT,
'test2_testy_test',
'test3-testy-testy',
'notanemail',
],
),
# In URL
(
'https://{cl_account}:{cl_api_key}@{cl_account}.cloudant.com'.format(
cl_account=CL_ACCOUNT, cl_api_key=CL_API_KEY,
),
[CL_ACCOUNT],
),
(
'https://{cl_account}.cloudant.com'.format(
cl_account=CL_ACCOUNT,
),
[CL_ACCOUNT],
),
),
)
def test_find_host(self, content, expected_output):
assert find_host(content) == expected_output
def test_find_account(self, content, expected_output):
assert find_account(content) == expected_output

0 comments on commit 60547d8

Please sign in to comment.