-
Notifications
You must be signed in to change notification settings - Fork 14
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Separate IIB classes to the new files [CLOUDDST-3064] (#25)
* Separate IIB classes to the new files * Separate test and modify docs
- Loading branch information
Showing
13 changed files
with
1,013 additions
and
889 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,106 @@ | ||
import os | ||
import kerberos | ||
import subprocess | ||
import tempfile | ||
|
||
|
||
# pylint: disable=bad-option-value,useless-object-inheritance | ||
class IIBAuth(object): | ||
def __init__(self): | ||
raise NotImplementedError | ||
|
||
def make_auth(self, iib_session): # pragma: no cover | ||
raise NotImplementedError | ||
|
||
|
||
class IIBBasicAuth(IIBAuth): | ||
"""Basic Auth provider to IIBClient.""" | ||
|
||
# pylint: disable=super-init-not-called | ||
def __init__(self, user, password): | ||
""" | ||
Args: | ||
user (str) | ||
Basic auth user name | ||
password (str) | ||
Basic auth password | ||
""" | ||
self.user = user | ||
self.password = password | ||
|
||
def make_auth(self, iib_session): | ||
"""Setup IIBSession with basic auth. | ||
Args: | ||
iib_session (IIBSession) | ||
IIBSession instance | ||
""" | ||
|
||
iib_session.session.headers["auth"] = (self.user, self.password) | ||
|
||
|
||
class IIBKrbAuth(IIBAuth): | ||
"""Kerberos authentication support for IIBClient""" | ||
|
||
# pylint: disable=super-init-not-called | ||
def __init__(self, krb_princ, service, ktfile=None): | ||
""" | ||
Args: | ||
krb_princ (str) | ||
Kerberos principal for obtaining ticket | ||
ktfile (str) | ||
Kerberos client keytab file | ||
gssapi_name_type (str) | ||
GSSAPI name type for creating credentials | ||
""" | ||
self.krb_princ = krb_princ | ||
self.ktfile = ktfile | ||
self.service = service | ||
|
||
def _krb_auth_header(self): | ||
retcode = subprocess.Popen( | ||
["klist"], stdout=subprocess.PIPE, stderr=subprocess.PIPE | ||
).wait() | ||
krb5ccname = None | ||
if retcode or self.ktfile: | ||
# can I define old_krb5ccname on the higher level out of if? | ||
old_krb5ccname = os.environ.get("KRB5CCNAME", "") | ||
_, krb5ccname = tempfile.mkstemp(prefix="krb5cc") | ||
if self.ktfile: | ||
retcode = subprocess.Popen( | ||
[ | ||
"kinit", | ||
self.krb_princ, | ||
"-k", | ||
"-t", | ||
self.ktfile, | ||
"-c", | ||
krb5ccname, | ||
], | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE, | ||
).wait() | ||
else: | ||
# If keytab path wasn't provided, default location will be attempted | ||
retcode = subprocess.Popen( | ||
["kinit", self.krb_princ, "-k", "-c", krb5ccname], | ||
stdout=subprocess.PIPE, | ||
stderr=subprocess.PIPE, | ||
).wait() | ||
try: | ||
if krb5ccname: | ||
os.environ["KRB5CCNAME"] = krb5ccname | ||
__, krb_context = kerberos.authGSSClientInit("HTTP@%s" % self.service) | ||
kerberos.authGSSClientStep(krb_context, "") | ||
self._krb_context = krb_context | ||
auth_header = "Negotiate " + kerberos.authGSSClientResponse(krb_context) | ||
finally: | ||
if krb5ccname: | ||
os.environ["KRB5CCNAME"] = old_krb5ccname | ||
os.unlink(krb5ccname) | ||
|
||
return auth_header | ||
|
||
def make_auth(self, iib_session): | ||
"""Setup IIBSession with kerberos authentication""" | ||
iib_session.session.headers["Authorization"] = self._krb_auth_header() |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,135 @@ | ||
class IIBBuildDetailsModel(object): | ||
"""Model class handling data about index build task""" | ||
|
||
def __init__( | ||
self, | ||
_id, | ||
state, | ||
reason, | ||
state_history, | ||
from_index, | ||
from_index_resolved, | ||
bundles, | ||
removed_operators, | ||
organization, | ||
binary_image, | ||
binary_image_resolved, | ||
index_image, | ||
request_type, | ||
arches, | ||
bundle_mapping, | ||
omps_operator_version, | ||
): | ||
""" | ||
Args: | ||
_id (int) | ||
Id of build | ||
state (str) | ||
State of build | ||
state (str) | ||
Reason for state change | ||
from_index (str) | ||
Reference of index image used as source for rebuild | ||
from_index_resolved (str) | ||
Reference of new index image | ||
bundles (list) | ||
List of bundles to be added to index image | ||
removed_operators (list) | ||
List of operators to be removed from index image | ||
organization (str) | ||
Name of organization to push to in the legacy app registry | ||
binary_image (str) | ||
Reference of binary image used for rebuilding | ||
binary_image_resolved (str) | ||
Checksum reference of binary image that was used for rebuilding | ||
index_image (str) | ||
Reference of index image to rebuild | ||
request_type (str) | ||
Type of iib build task (add or remove) | ||
arches (list) | ||
List of architectures supported in new index image | ||
bundle_mapping (dict) | ||
Operator names in "bundles" map to: list of "bundles" which | ||
map to the operator key | ||
omps_operator_version (dict) | ||
Operator version returned from OMPS API call used for Add request | ||
""" | ||
self.id = _id | ||
self.state = state | ||
self.reason = reason | ||
self.state_history = state_history | ||
self.from_index = from_index | ||
self.from_index_resolved = from_index_resolved | ||
self.bundles = bundles | ||
self.removed_operators = removed_operators | ||
self.organization = organization | ||
self.binary_image = binary_image | ||
self.binary_image_resolved = binary_image_resolved | ||
self.index_image = index_image | ||
self.request_type = request_type | ||
self.arches = arches | ||
self.bundle_mapping = bundle_mapping | ||
self.omps_operator_version = omps_operator_version | ||
|
||
@classmethod | ||
def from_dict(cls, data): | ||
return cls( | ||
data["id"], | ||
data["state"], | ||
data["state_reason"], | ||
data.get("state_history", []), | ||
data["from_index"], | ||
data["from_index_resolved"], | ||
data.get("bundles", []), | ||
data.get("removed_operators", []), | ||
data.get("organization"), | ||
data["binary_image"], | ||
data["binary_image_resolved"], | ||
data["index_image"], | ||
data["request_type"], | ||
data["arches"], | ||
data["bundle_mapping"], | ||
data.get("omps_operator_version", {}), | ||
) | ||
|
||
def to_dict(self): | ||
return { | ||
"id": self.id, | ||
"state": self.state, | ||
"state_reason": self.reason, | ||
"state_history": self.state_history, | ||
"from_index": self.from_index, | ||
"from_index_resolved": self.from_index, | ||
"bundles": self.bundles, | ||
"removed_operators": self.removed_operators, | ||
"organization": self.organization, | ||
"binary_image": self.binary_image, | ||
"binary_image_resolved": self.binary_image_resolved, | ||
"index_image": self.index_image, | ||
"request_type": self.request_type, | ||
"arches": self.arches, | ||
"bundle_mapping": self.bundle_mapping, | ||
"omps_operator_version": self.omps_operator_version, | ||
} | ||
|
||
def __eq__(self, other): | ||
if ( | ||
self.id == other.id | ||
and self.state == other.state | ||
and self.reason == other.reason | ||
and self.state_history == other.state_history | ||
and self.from_index == other.from_index | ||
and self.from_index_resolved == other.from_index_resolved | ||
and self.bundles == other.bundles | ||
and self.removed_operators == other.removed_operators | ||
and self.organization == other.organization | ||
and self.binary_image == other.binary_image | ||
and self.binary_image_resolved == other.binary_image_resolved | ||
and self.index_image == other.index_image | ||
and self.request_type == other.request_type | ||
and self.arches == other.arches | ||
and self.bundle_mapping == other.bundle_mapping | ||
and self.omps_operator_version == other.omps_operator_version | ||
): | ||
return True | ||
return False |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,54 @@ | ||
from .iib_build_details_model import IIBBuildDetailsModel | ||
|
||
|
||
class IIBBuildDetailsPager(object): | ||
def __init__(self, iibclient, page): | ||
""" | ||
Args: | ||
iibclient (IIBClient) | ||
IIBClient instance | ||
page (int) | ||
page where start listing items | ||
""" | ||
self.page = page | ||
self.iibclient = iibclient | ||
self._items = [] | ||
self.meta = {} | ||
|
||
def reload_page(self): | ||
"""Reload items for current page""" | ||
|
||
ret = self.iibclient.get_builds(self.page, raw=True) | ||
self.meta = ret["meta"] | ||
self._items = [IIBBuildDetailsModel.from_dict(x) for x in ret["items"]] | ||
|
||
def next(self): | ||
"""Load items for next page and set it as current""" | ||
|
||
self.page += 1 | ||
self.reload_page() | ||
|
||
def prev(self): | ||
"""Load items for previous page and set it as current""" | ||
|
||
if self.page > 1: | ||
self.page -= 1 | ||
self.reload_page() | ||
|
||
def items(self): | ||
"""Return items for current page""" | ||
return self._items | ||
|
||
@classmethod | ||
def from_dict(cls, iibclient, _dict): | ||
ret = cls(iibclient, _dict["meta"]["page"]) | ||
ret.meta = _dict["meta"] | ||
ret._items = [IIBBuildDetailsModel.from_dict(x) for x in _dict["items"]] | ||
return ret | ||
|
||
def __eq__(self, other): | ||
return ( | ||
self._items == other._items # can I rather use function self.items? | ||
and self.iibclient == other.iibclient | ||
and self.meta == other.meta | ||
) |
Oops, something went wrong.