Skip to content

Commit

Permalink
refactor: clean, lint
Browse files Browse the repository at this point in the history
  • Loading branch information
jjjermiah committed Feb 11, 2024
1 parent fee4ee0 commit 688a1a6
Show file tree
Hide file tree
Showing 10 changed files with 509 additions and 130 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -149,4 +149,6 @@ data/*
sandbox

docs/data/*
docs/autoapi
docs/autoapi
NBIA-Download
src/nbiatoolkit/.DS_Store
9 changes: 8 additions & 1 deletion src/nbiatoolkit/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,11 @@
from .utils.nbia_endpoints import NBIA_ENDPOINTS

# define the __all__ variable
__all__ = ["NBIAClient", "OAuth2", "setup_logger", "NBIA_ENDPOINTS", "version", "__version__"]
__all__ = [
"NBIAClient",
"OAuth2",
"setup_logger",
"NBIA_ENDPOINTS",
"version",
"__version__",
]
106 changes: 65 additions & 41 deletions src/nbiatoolkit/auth.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,46 @@
from cryptography.fernet import Fernet


def encrypt_credentials(key: bytes, username: str, password: str) -> Tuple[str, str]:
cipher_suite = Fernet(key=key)
encrypted_password = cipher_suite.encrypt(password.encode()).decode()
encrypted_username = cipher_suite.encrypt(username.encode()).decode()
# return the encrypted client_id and username
return encrypted_username, encrypted_password

def decrypt_credentials(key: bytes, encrypted_username: str, encrypted_password: str) -> tuple[str, str]:
cipher_suite = Fernet(key=key)
decrypted_username = cipher_suite.decrypt(encrypted_username.encode()).decode()
decrypted_password = cipher_suite.decrypt(encrypted_password.encode()).decode()
# return the decrypted client_id and username
return decrypted_username, decrypted_password
def encrypt_credentials(
key: bytes, username: str, password: str
) -> Tuple[str, str]:
"""
Encrypts the given username and password using the provided key.
Args:
key (bytes): The encryption key.
username (str): The username to be encrypted.
password (str): The password to be encrypted.
Returns:
Tuple[str, str]: A tuple containing the encrypted username and password.
"""
cipher_suite = Fernet(key=key)
encrypted_password = cipher_suite.encrypt(password.encode()).decode()
encrypted_username = cipher_suite.encrypt(username.encode()).decode()
return encrypted_username, encrypted_password


def decrypt_credentials(
key: bytes, encrypted_username: str, encrypted_password: str
) -> tuple[str, str]:
"""
Decrypts the encrypted username and password using the provided key.
Args:
key (bytes): The encryption key used to decrypt the credentials.
encrypted_username (str): The encrypted username.
encrypted_password (str): The encrypted password.
Returns:
tuple[str, str]: A tuple containing the decrypted username and password.
"""
cipher_suite = Fernet(key=key)
decrypted_username = cipher_suite.decrypt(encrypted_username.encode()).decode()
decrypted_password = cipher_suite.decrypt(encrypted_password.encode()).decode()
# return the decrypted client_id and username
return decrypted_username, decrypted_password


class OAuth2:
"""
Expand Down Expand Up @@ -89,7 +116,7 @@ def __init__(
password: str = "",
client_id: str = "NBIA",
base_url: Union[str, NBIA_ENDPOINTS] = NBIA_ENDPOINTS.BASE_URL,
):
) -> None:
"""
Initialize the OAuth2 class.
Expand All @@ -111,7 +138,9 @@ def __init__(
self._fernet_key: bytes = Fernet.generate_key()
self.username: str
self.password: str
self.username, self.password = encrypt_credentials(key=self.fernet_key, username=username, password=password)
self.username, self.password = encrypt_credentials(
key=self.fernet_key, username=username, password=password
)

if isinstance(base_url, NBIA_ENDPOINTS):
self.base_url = base_url.value
Expand All @@ -124,13 +153,18 @@ def __init__(
self.refresh_token = "" # Fix: Assign an empty string instead of None
self.scope = None


@property
def fernet_key(self) -> bytes:
return self._fernet_key

def is_logged_out(self) -> bool:
return (self._access_token is "" and self.username == "" and self.password == "" and self.client_id == "" and self.base_url == "")
return (
self._access_token == ""
and self.username == ""
and self.password == ""
and self.client_id == ""
and self.base_url == ""
)

@property
def access_token(self) -> str | None:
Expand Down Expand Up @@ -175,29 +209,25 @@ def _refresh_access_token(self) -> None:
token_data = response.json()
self.set_token_data(token_data)



def request_new_access_token(self):
# Implement logic to request a new access token using client credentials
# Set the new access token and update the expiration time
# Example:
# new_access_token, expires_in = your_token_request_logic()
# self.access_token = new_access_token
# self.token_expiration_time = time.time() + expires_in

# # Prepare the request data


data: dict[str, str] = {
"username": decrypt_credentials(key=self.fernet_key, encrypted_username=self.username, encrypted_password=self.password)[0],
"password": decrypt_credentials(key=self.fernet_key, encrypted_username=self.username, encrypted_password=self.password)[1],
"username": decrypt_credentials(
key=self.fernet_key,
encrypted_username=self.username,
encrypted_password=self.password,
)[0],
"password": decrypt_credentials(
key=self.fernet_key,
encrypted_username=self.username,
encrypted_password=self.password,
)[1],
"client_id": self.client_id,
"grant_type": "password",
}

token_url: str = self.base_url + "oauth/token"

response : requests.models.Response
response: requests.models.Response
response = requests.post(token_url, data=data)

try:
Expand All @@ -209,8 +239,6 @@ def request_new_access_token(self):
token_data = response.json()
self.set_token_data(token_data)



def set_token_data(self, token_data: dict):
self._access_token = token_data["access_token"]
self.expiry_time = time.time() + int(token_data.get("expires_in") or 0)
Expand All @@ -222,8 +250,8 @@ def set_token_data(self, token_data: dict):
def api_headers(self) -> dict[str, str]:
return {
"Authorization": f"Bearer {self.access_token}",
"Content-Type": "application/json",}

"Content-Type": "application/json",
}

@property
def token_expiration_time(self):
Expand All @@ -249,7 +277,6 @@ def __str__(self):
else:
return ""


def logout(self) -> None:
"""
Logs out the user and revokes the access token.
Expand Down Expand Up @@ -279,9 +306,6 @@ def logout(self) -> None:
self.refresh_expiry = None
self.refresh_token = ""
self.scope = None
self._fernet_key = b''
self._fernet_key = b""
self = None
return None



0 comments on commit 688a1a6

Please sign in to comment.