-
Notifications
You must be signed in to change notification settings - Fork 7
/
sw360_api.py
160 lines (132 loc) · 5.15 KB
/
sw360_api.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
# -------------------------------------------------------------------------------
# Copyright (c) 2019-2024 Siemens
# All Rights Reserved.
# Authors: thomas.graf@siemens.com, gernot.hillier@siemens.com
#
# Licensed under the MIT license.
# SPDX-License-Identifier: MIT
# -------------------------------------------------------------------------------
"""Python interface to the Siemens SW360 platform"""
from typing import Any, Dict, Optional
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from .attachments import AttachmentsMixin
from .clearing import ClearingMixin
from .components import ComponentsMixin
from .license import LicenseMixin
from .project import ProjectMixin
from .releases import ReleasesMixin
from .sw360error import SW360Error
from .vendor import VendorMixin
from .vulnerabilities import VulnerabilitiesMixin
# Retry mechanism for rate limiting
adapter = HTTPAdapter(max_retries=Retry(
total=5,
status_forcelist=[429, 500, 502, 503, 504],
respect_retry_after_header=True,
backoff_factor=30,
allowed_methods=["HEAD", "GET", "OPTIONS", "POST", "PUT", "PATCH"]
))
session_default = requests.Session()
session_default.mount("http://", adapter)
session_default.mount("https://", adapter)
class SW360(
AttachmentsMixin,
ClearingMixin,
ComponentsMixin,
LicenseMixin,
ProjectMixin,
ReleasesMixin,
VendorMixin,
VulnerabilitiesMixin
):
"""Python interface to the Siemens SW360 platform
Authentication against a running SW360 instance is performed using an API token.
The token will be sent as HTTP header using the format
`Authorization: <token_type> <token>`. Check your SW360 REST API
documentation for details on needed type and how to get the token.
token_type is "Bearer" for an OAuth workflow and "Token" for tokens
generated via the SW360 UI.
:param url: URL of the SW360 instance
:param token: The SW360 REST API token (the cryptic string without
"Authorization:" and `token_type`).
:param oauth2: flag indicating whether this is an OAuth2 token
:type url: string
:type token: string
:type oauth2: boolean
"""
def __init__(
self,
url: str,
token: str,
oauth2: bool = False,
session: Optional[requests.Session] = session_default
) -> None:
"""Constructor"""
if url[-1] != "/":
url += "/"
self.url: str = url
self.session: Optional[requests.Session] = session
if oauth2:
self.api_headers = {"Authorization": "Bearer " + token}
else:
self.api_headers = {"Authorization": "Token " + token}
self.force_no_session = False
def login_api(self, token: str = "") -> bool:
"""Login to SW360 REST API. This used to have a `token` parameter
due to historic reasons which is ignored.
You need to call this before any other method accessing SW360.
:raises SW360Error: if the login fails
"""
if not self.force_no_session:
self.session.headers = self.api_headers.copy() # type: ignore
url = self.url + "resource/api/"
try:
if self.force_no_session:
resp = requests.get(url, headers=self.api_headers)
else:
if self.session:
resp = self.session.get(url)
except Exception as ex:
raise SW360Error(None, url, message="Unable to login: " + repr(ex))
if resp.ok:
return True
else:
raise SW360Error(resp, url, message="Unable to login")
def close_api(self) -> None:
"""A keep-alive HTTP session is used to access the SW360 REST API.
This method allows to explicitly close the connection at a defined
time. Normally, you don't need to call it - session is cleaned up
automatically when needed."""
if self.session:
self.session.close()
self.session = None
def api_get_raw(self, url: str = "") -> str:
"""Request `url` from REST API and return raw result.
:param url: the url to be requested
:type url: string
:return: the HTTP response
:rtype: string
:raises SW360Error: if there is a negative HTTP response
"""
if (not self.force_no_session) and self.session is None:
raise SW360Error(message="login_api needs to be called first")
if self.force_no_session:
response = requests.get(url, headers=self.api_headers)
else:
if self.session:
response = self.session.get(url)
if response.ok:
return response.text
raise SW360Error(response, url)
# ----- Health -------------------------------------------------------
def get_health_status(self) -> Optional[Dict[str, Any]]:
"""Get information about the service's health.
API endpoint: GET /health
:return: service health status
:rtype: JSON health status object
:raises SW360Error: if there is a negative HTTP response
"""
resp = self.api_get(self.url + "resource/health/")
return resp