Skip to content

Commit

Permalink
Add cloudant detector (Yelp#191)
Browse files Browse the repository at this point in the history
* 1st pass cloudant tests and detector

* cleaning debugs

* whitelisting secret false positive

* correcting lint errors

* correct line break errors

* more lint

* more lint

* more lint

* more lint

* typo

* more lint

* more lint

* PR responses
  • Loading branch information
edwarj2 authored and Xianjun Zhu committed Oct 30, 2019
1 parent 9939cbb commit 96f621d
Show file tree
Hide file tree
Showing 2 changed files with 267 additions and 0 deletions.
125 changes: 125 additions & 0 deletions cloudant.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
from __future__ import absolute_import

import re

import requests

from .base import RegexBasedDetector
from detect_secrets.core.constants import VerifiedResult


class CloudantDetector(RegexBasedDetector):

secret_type = 'Cloudant Credentials'

# opt means optional
opt_quote = r'(?:"|\'|)'
opt_dashes = r'(?:--|)'
opt_dot = r'(?:\.|)'
dot = r'\.'
cl_account = r'[0-9a-z\-\_]*'
cl = r'(cloudant|cl|clou)'
opt_dash_undrscr = r'(?:_|-|)'
opt_api = r'(?:api|)'
cl_key_or_pass = cl + opt_dash_undrscr + r'(?:key|pwd|pw|password|pass|token)'
opt_space = r'(?: |)'
assignment = r'(?:=|:|:=|=>)'
cl_secret = r'[0-9a-f]{64}'
colon = r'\:'
at = r'\@'
http = r'(?:http\:\/\/|https\:\/\/)'
cloudant_api_url = r'cloudant\.com'
denylist = [
re.compile(
r'{cl_key_or_pass}{opt_space}{assignment}{opt_space}{opt_quote}{cl_secret}'.format(
cl_key_or_pass=cl_key_or_pass,
opt_quote=opt_quote,
cl_account=cl_account,
opt_dash_undrscr=opt_dash_undrscr,
opt_api=opt_api,
opt_space=opt_space,
assignment=assignment,
cl_secret=cl_secret,
), flags=re.IGNORECASE,
),
re.compile(
r'{http}{cl_account}{colon}{cl_secret}{at}{cl_account}{dot}{cloudant_api_url}'.format(
http=http,
colon=colon,
cl_account=cl_account,
cl_secret=cl_secret,
at=at,
dot=dot,
cloudant_api_url=cloudant_api_url,
),
flags=re.IGNORECASE,
),
]

def verify(self, token, content, potential_secret=None):

hosts = get_host(content)
if not hosts:
return VerifiedResult.UNVERIFIED

for host in hosts:
return verify_cloudant_key(host, token, potential_secret)

return VerifiedResult.VERIFIED_FALSE


def get_host(content):

# opt means optional
opt_quote = r'(?:"|\'|)'
opt_cl = r'(?:cloudant|cl|)'
opt_dash_undrscr = r'(?:_|-|)'
opt_hostname_keyword = r'(?:hostname|host|username|id|user|userid|user-id|user-name|' \
'name|user_id|user_name|uname)'
opt_space = r'(?: |)'
assignment = r'(?:\=|:|:=|=>)+'
hostname = r'(\w(?:\w|_|-)+)'
regex = re.compile(
r'{opt_quote}{opt_cl}{opt_dash_undrscr}{opt_hostname_keyword}{opt_space}{opt_quote}'
'{assignment}{opt_space}{opt_quote}{hostname}{opt_quote}'.format(
opt_quote=opt_quote,
opt_cl=opt_cl,
opt_dash_undrscr=opt_dash_undrscr,
opt_hostname_keyword=opt_hostname_keyword,
opt_space=opt_space,
hostname=hostname,
assignment=assignment,
), flags=re.IGNORECASE,
)

return [
match
for line in content.splitlines()
for match in regex.findall(line)
]


def verify_cloudant_key(hostname, token, potential_secret=None):
try:
headers = {'Content-type': 'application/json'}
request_url = 'https://{hostname}:' \
'{token}' \
'@{hostname}.' \
'cloudant.com/_api/v2'.format(
hostname=hostname,
token=token,
)

response = requests.get(
request_url,
headers=headers,
)

if response.status_code == 200:
if potential_secret:
potential_secret.other_factors['hostname'] = hostname
return VerifiedResult.VERIFIED_TRUE
else:
return VerifiedResult.VERIFIED_FALSE
except Exception:
return VerifiedResult.UNVERIFIED
142 changes: 142 additions & 0 deletions cloudant_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,142 @@
from __future__ import absolute_import

import textwrap

import pytest
import responses

from detect_secrets.core.constants import VerifiedResult
from detect_secrets.core.potential_secret import PotentialSecret
from detect_secrets.plugins.cloudant import CloudantDetector
from detect_secrets.plugins.cloudant import get_host

CL_HOST = 'testy_test' # also called user
# only detecting 64 hex
CL_TOKEN = 'abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234abcd1234'


class TestCloudantDetector(object):

@pytest.mark.parametrize(
'payload, should_flag',
[
(
'https://{cl_host}:{cl_token}@{cl_host}.cloudant.com"'.format(
cl_host=CL_HOST, cl_token=CL_TOKEN,
), True,
),
(
'https://{cl_host}:{cl_token}@{cl_host}.cloudant.com/_api/v2/'.format(
cl_host=CL_HOST, cl_token=CL_TOKEN,
), True,
),
(
'https://{cl_host}:{cl_token}.cloudant.com'.format(
cl_host=CL_HOST, cl_token=CL_TOKEN,
), False,
),
('cloudant_password=\'{cl_token}\''.format(cl_token=CL_TOKEN), True),
('cloudant_pw=\'{cl_token}\''.format(cl_token=CL_TOKEN), True),
('cloudant_pw="{cl_token}"'.format(cl_token=CL_TOKEN), True),
('clou_pw = "{cl_token}"'.format(cl_token=CL_TOKEN), True),
('cloudant_password = "a-fake-tooshort-key"', False),
],
)
def test_analyze_string(self, payload, should_flag):
logic = CloudantDetector()
output = logic.analyze_string(payload, 1, 'mock_filename')

assert len(output) == (1 if should_flag else 0)

@responses.activate
def test_verify_invalid_secret(self):
cl_api_url = 'https://{cl_host}:{cl_token}@{cl_host}.cloudant.com/_api/v2'.format(
cl_host=CL_HOST, cl_token=CL_TOKEN,
)
responses.add(
responses.GET, cl_api_url,
json={'error': 'Access denied. '}, status=401,
)

assert CloudantDetector().verify(
CL_TOKEN,
'cloudant_host={}'.format(CL_HOST),
) == VerifiedResult.VERIFIED_FALSE

@responses.activate
def test_verify_valid_secret(self):
cl_api_url = 'https://{cl_host}:{cl_token}@{cl_host}.cloudant.com/_api/v2'.format(
cl_host=CL_HOST, cl_token=CL_TOKEN,
)
responses.add(
responses.GET, cl_api_url,
json={'id': 1}, status=200,
)
potential_secret = PotentialSecret('test cloudant', 'test filename', CL_TOKEN)
assert CloudantDetector().verify(
CL_TOKEN,
'cloudant_host={}'.format(CL_HOST),
potential_secret,
) == VerifiedResult.VERIFIED_TRUE
assert potential_secret.other_factors['hostname'] == CL_HOST

@responses.activate
def test_verify_unverified_secret(self):
assert CloudantDetector().verify(
CL_TOKEN,
'cloudant_host={}'.format(CL_HOST),
) == VerifiedResult.UNVERIFIED

def test_verify_no_secret(self):
assert CloudantDetector().verify(
CL_TOKEN,
'no_un={}'.format(CL_HOST),
) == VerifiedResult.UNVERIFIED


@pytest.mark.parametrize(
'content, expected_output',
(
(
textwrap.dedent("""
--cloudant-hostname = {}
""")[1:-1].format(
CL_HOST,
),
[CL_HOST],
),
# With quotes
(
textwrap.dedent("""
cl_host = "{}"
""")[1:-1].format(
CL_HOST,
),
[CL_HOST],
),
# multiple candidates
(
textwrap.dedent("""
cloudant_id = '{}'
cl-user = '{}'
CLOUDANT_USERID = '{}'
cloudant-uname: {}
""")[1:-1].format(
CL_HOST,
'test2_testy_test',
'test3-testy-testy',
'notanemail',
),
[
CL_HOST,
'test2_testy_test',
'test3-testy-testy',
'notanemail',
],
),
),
)
def test_get_host(content, expected_output):
assert get_host(content) == expected_output

0 comments on commit 96f621d

Please sign in to comment.