-
Notifications
You must be signed in to change notification settings - Fork 48
/
search.py
75 lines (61 loc) · 2.36 KB
/
search.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
from gato.github import Api
import time
import logging
logger = logging.getLogger(__name__)
class Search():
"""Search utility for GH api in order to find public repos that may have
security issues.
"""
def __init__(self, api_accessor: Api):
"""Initialize class to call GH search methods. Due to the late limiting
associated with these API calls, this class will run the enumeration
in a thread.
Args:
api_accesor (Api): API accesor to use when making GitHub
API requests.
"""
self.api_accessor = api_accessor
def search_enumeration(self, organization: str):
"""Search for self-hosted in yml files within a given organization.
Args:
organization (str): Name of the github organization.
Returns:
set: Set containing repositories that are of interest.
"""
query = {
'q': f'self-hosted org:{organization} language:yaml',
'sort': 'indexed',
'per_page': '100',
"page": 1
}
result = self.api_accessor.call_get('/search/code', params=query)
if result.status_code == 200:
query['page'] += 1
code = result.json()
candidates = []
while len(code['items']) >= 1:
for entry in code['items']:
# Only return non-forks
if ".github/workflows" in entry['path'] and \
not entry['repository']['fork']:
candidates.append(entry['repository']['full_name'])
time.sleep(60)
result = self.api_accessor.call_get(
'/search/code',
params=query
)
if result.status_code == 200:
query['page'] += 1
code = result.json()
elif result.status_code == 403:
print(
'[-] Secondary rate limit hit! Sleeping 3 minutes!')
time.sleep(180)
elif result.status_code == 422:
print('[-] Reached search cap!')
break
return set(candidates)
else:
print('[-] Secondary rate limit hit!')
# TODO: Check for auth issues here too!
return set()