Skip to content

Commit

Permalink
simplified IQ service and handled authentication errors with the comm…
Browse files Browse the repository at this point in the history
…and line
  • Loading branch information
ajurgenson55 committed May 12, 2020
1 parent 6dd4454 commit d1331e5
Show file tree
Hide file tree
Showing 2 changed files with 16 additions and 19 deletions.
9 changes: 6 additions & 3 deletions jake/__main__.py
Expand Up @@ -310,9 +310,12 @@ def __setup_logger(verbose: bool):
def __iq_control_flow(args: dict, bom_str: bytes):
with yaspin(text="Loading", color="magenta") as spinner:
spinner.text = "Submitting to Sonatype IQ..."
iq_requests = IQ(args)
_id = iq_requests.get_internal_id()
status_url = iq_requests.submit_sbom(bom_str, _id)
try:
iq_requests = IQ(args)
except (ValueError) as e:
print(e)
_exit(1)
status_url = iq_requests.submit_sbom(bom_str)
spinner.ok("🐍 ")

with yaspin(text="Loading", color="magenta") as spinner:
Expand Down
26 changes: 10 additions & 16 deletions jake/iq/iq.py
Expand Up @@ -59,9 +59,7 @@ def __init__(self, args):
if self._iq_url is None:
self._iq_url = results['Server']

def get_url(self) -> (str):
"""gets url to use for IQ Server request"""
return self._iq_url
self._internal_id = self.get_internal_id()

def get_policy_action(self):
"""gets policy action from IQ Server result"""
Expand All @@ -71,10 +69,6 @@ def get_report_url(self):
"""gets report url from IQ Server result"""
return self._report_url

def get_headers(self) -> (dict):
"""gets headers to use for IQ Server request"""
return self._headers

def get_public_application_id(self) -> (str):
"""gets public application id to use for IQ Server request"""
return self._public_application_id
Expand All @@ -84,31 +78,31 @@ def get_internal_id(self) -> (str):
application id"""
response = requests.get(
'{0}/api/v2/applications?publicId={1}'.format(
self.get_url(),
self.get_public_application_id()),
self.get_headers(),
self._iq_url,
self._public_application_id),
self._headers,
auth=(self._user, self._password))
if response.ok:
res = json.loads(response.text)
if not res['applications']:
raise ValueError(
"The public application id \'"
"\nThe public application id \'"
+ self._public_application_id
+ "\' does not exist or is not accessible by the user.")
LOG.debug(res['applications'][0]['id'])
return res['applications'][0]['id']
raise ValueError(response.text)
raise ValueError('\n' + response.text + '\nSet your config with \'jake config iq\'')

def submit_sbom(self, sbom: str, internal_id: str) -> (str):
def submit_sbom(self, sbom: str) -> (str):
"""submits sbom (in str form) to IQ server, valid sbom should get
202 response. On valid response, sets status url for later polling"""
LOG.debug(sbom)
headers = self.get_headers()
headers = self._headers
headers['Content-Type'] = 'application/xml'
response = requests.post(
'{0}/api/v2/scan/applications/{1}/sources/jake?stageId={2}'.format(
self.get_url(),
internal_id,
self._iq_url,
self._internal_id,
self._stage),
data=sbom,
headers=headers,
Expand Down

0 comments on commit d1331e5

Please sign in to comment.