diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml new file mode 100644 index 0000000..901bd69 --- /dev/null +++ b/.github/workflows/ci.yml @@ -0,0 +1,60 @@ +name: CI + +on: [ push, pull_request ] + +jobs: + test: + strategy: + fail-fast: false + matrix: + python-version: [ '3.10', '3.11' ] + os: [ ubuntu-latest ] + runs-on: ${{ matrix.os }} + steps: + - uses: actions/checkout@v3 + - name: Setup Python ${{ matrix.python-version }} + uses: actions/setup-python@v4 + with: + python-version: ${{ matrix.python-version }} + - name: Install dependencies + run: | + python -m pip install --upgrade hatch + - name: Run tests + run: | + hatch run +py=${{ matrix.py || matrix.python-version }} test:with-coverage + + lint: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + - name: Install Python dependencies + run: | + python -m pip install --upgrade hatch + - name: Check with black + isort + if: always() + run: hatch run style:format && git diff --exit-code + - name: Check with ruff + if: always() + run: hatch run style:lint + - name: Check with mypy + if: always() + run: hatch run types:check + + package: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + - name: Install Hatch + run: | + python -m pip install -U hatch + - name: Build package + run: | + hatch build \ No newline at end of file diff --git a/.github/workflows/deploy-release.yml b/.github/workflows/deploy-release.yml new file mode 100644 index 0000000..a460dda --- /dev/null +++ b/.github/workflows/deploy-release.yml @@ -0,0 +1,28 @@ +name: deploy-release + +on: + push: + tags: + - '*' + +jobs: + pypi: + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v3 + - name: Setup Python + uses: actions/setup-python@v4 + with: + python-version: '3.10' + - name: Install Hatch + run: | + python -m pip install -U hatch + - name: Build package + run: | + hatch build + - name: Publish + run: | + hatch publish + env: + HATCH_INDEX_USER: __token__ + HATCH_INDEX_AUTH: ${{ secrets.PYPI_PASSWORD }} \ No newline at end of file diff --git a/README.md b/README.md index 6e581cd..30daaea 100644 --- a/README.md +++ b/README.md @@ -1,5 +1,8 @@ # python-vaultwarden +[![PyPI Version][pypi-v-image]][pypi-v-link] +[![Build Status][GHAction-image]][GHAction-link] + A python library for vaultwarden ## Clients @@ -17,6 +20,23 @@ target user. The cryptographic part is handled by the [bitwardentools library](https://github.com/corpusops/bitwardentools). + + +[pypi-v-image]: https://img.shields.io/pypi/v/python-vaultwarden.svg + +[pypi-v-link]: https://pypi.org/project/python-vaultwarden/ + +[GHAction-image]: https://github.com/numberly/python-vaultwarden/workflows/CI/badge.svg?branch=main&event=push + +[GHAction-link]: https://github.com/numberly/python-vaultwarden/actions?query=event%3Apush+branch%3Amain + + +[Issue]: https://github.com/numberly/python-vaultwarden/issues + +[Discussions]: https://github.com/numberly/python-vaultwarden/discussions + +[PyPA Code of Conduct]: https://www.pypa.io/en/latest/code-of-conduct/ + ## License Python-vaultwarden is distributed under the terms of the [Apache](https://spdx.org/licenses/Apache-2.0.html) license. diff --git a/pyproject.toml b/pyproject.toml index 1335ddd..7166463 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,28 +1,148 @@ +[build-system] +requires = ["hatchling"] +build-backend = "hatchling.build" + [project] name = "python-vaultwarden" version = "0.7.0" -description = "The Vautlwarden Python Client" -requires-python = ">=3.11" +description = "Admin Vaultwarden and Simple Bitwarden Python Client" authors = [ { name = "Lyonel Martinez", email = "lyonel.martinez@numberly.com" }, { name = "Mathis Ribet", email = "mathis.ribet@numberly.com" }, ] +license = "Apache-2.0" +readme = "README.md" +repository = "https://github.com/numberly/python-vaultwarden" +documentation = "https://numberly.github.io/python-vaultwarden/" +packages = [ + { include = "vaultwarden", from = "src" }, +] classifiers = [ "Development Status :: 4 - Beta", - "Environment :: Web Environment", "Intended Audience :: Developers", + "Environment :: Web Environment", + "License :: OSI Approved :: Apache Software License", "Operating System :: OS Independent", + "Intended Audience :: Developers", + "Programming Language :: Python", + "Programming Language :: Python :: 3", + "Programming Language :: Python :: 3 :: Only", + "Programming Language :: Python :: 3.10", "Programming Language :: Python :: 3.11", + "Typing :: Typed", "Topic :: Internet :: WWW/HTTP", + # Include this classifier to prevent accidently publishing private code to PyPI. + # https://pypi.org/classifiers/ + "Private :: Do Not Upload", +] +requires-python = ">=3.10" +dependencies = [ + "bitwardentools >=1.0.55", + "httpx >=0.24.1", +] + +[tool.hatch.version] +path = "src/vaultwarden/__version__.py" + +[tool.hatch.build] +packages = [ + "src/vaultwarden", +] +include = [ + "/tests", +] + +[tool.hatch.build.targets.sdist] +include = ["/src/vaultwarden/**/*.py"] +[tool.hatch.build.targets.wheel] +packages = [ + "src/vaultwarden", +] + +[tool.hatch.envs.test] +dependencies = [ + "coverage", ] + +[tool.hatch.envs.test.scripts] +test = "coverage run --source=src/vaultwarden -m unittest discover -p 'test_*.py' tests --top-level-directory ." +_coverage = ["test", "coverage xml", "coverage report --show-missing"] +with-coverage = "test" +[[tool.hatch.envs.test.matrix]] +python = ["3.10", "3.11"] +type = ["default"] +[tool.hatch.envs.test.overrides] +matrix.type.scripts = [ + { key = "with-coverage", value = "_coverage", if = ["default"] }, +] + +[tool.hatch.envs.types] +dependencies = [ + "mypy", + "types-PyYAML", + "types-setuptools", + "typing-extensions", +] +[tool.hatch.envs.types.scripts] +check = "mypy src/vaultwarden" + +[tool.hatch.envs.style] +detached = true dependencies = [ - "bitwardentools==1.0.55", - "httpx==0.24.1", + "black", + "isort", + "ruff", +] +[tool.hatch.envs.style.scripts] +lint = [ + "ruff check --fix src/vaultwarden", +] +check = [ + "isort --check-only --diff src/vaultwarden", + "black -q --check --diff src/vaultwarden", + "ruff check src/vaultwarden", ] +format = [ + "isort -q src/vaultwarden", + "black -q src/vaultwarden", + "lint" +] + +[tool.ruff] +# Add "Q" to the list of enabled codes. +select = ["B", "E", "F", "I", "N", "Q", "RUF", "SIM", "TCH"] +fixable = ["ALL"] +src = ["src/vaultwarden", "tests"] +target-version = "py310" +line-length = 79 + +[tool.ruff.flake8-quotes] +docstring-quotes = "double" + +[tool.ruff.flake8-bugbear] +extend-immutable-calls = ["typer.Argument"] + +[tool.ruff.isort] +force-sort-within-sections = true + +[tool.black] +line-length = 79 +target-version = ["py310", "py311"] + +[tool.isort] +profile = "black" +line_length = 80 + +[tool.mypy] +ignore_missing_imports = true +warn_unreachable = true +no_implicit_optional = true +show_error_codes = true [tool.commitizen] version = "0.7.0" +tag_format = "$version" +update_changelog_on_bump = true version_files = [ - "pyproject.toml:version", - "vaultwarden/__version__.py", + "src/vaultwarden/__version__.py", ] diff --git a/vaultwarden/__init__.py b/src/vaultwarden/__init__.py similarity index 100% rename from vaultwarden/__init__.py rename to src/vaultwarden/__init__.py diff --git a/vaultwarden/__version__.py b/src/vaultwarden/__version__.py similarity index 100% rename from vaultwarden/__version__.py rename to src/vaultwarden/__version__.py diff --git a/vaultwarden/models/__init__.py b/src/vaultwarden/clients/__init__.py similarity index 100% rename from vaultwarden/models/__init__.py rename to src/vaultwarden/clients/__init__.py diff --git a/vaultwarden/clients/bitwarden.py b/src/vaultwarden/clients/bitwarden.py similarity index 77% rename from vaultwarden/clients/bitwarden.py rename to src/vaultwarden/clients/bitwarden.py index d14b856..ed7159a 100644 --- a/vaultwarden/clients/bitwarden.py +++ b/src/vaultwarden/clients/bitwarden.py @@ -1,18 +1,18 @@ # Class Bitwarden client with a httpx client -from typing import Optional, Literal, Any, List +from typing import Any, List, Literal, Optional from uuid import UUID from bitwardentools import caseinsentive_key_search -from bitwardentools.crypto import make_master_key, decrypt, encrypt -from httpx import Client, Response, HTTPError +from bitwardentools.crypto import decrypt, encrypt, make_master_key +from httpx import Client, HTTPError, Response -from ..models.api_models import ApiToken -from ..models.exception_models import BitwardenException -from ..utils.logger import logger -from ..utils.tools import ( - log_raise_for_status, +from vaultwarden.models.api_models import ApiToken +from vaultwarden.models.exception_models import BitwardenError +from vaultwarden.utils.logger import logger +from vaultwarden.utils.tools import ( get_collection_id_from_ditcs, get_matching_ids_from_ditcs, + log_raise_for_status, ) @@ -27,8 +27,10 @@ def __init__( device_id: UUID | str, ): # if one of the parameters is None, raise an exception - if not all([url, email, password, client_id, client_secret, device_id]): - raise BitwardenException("All parameters are required") + if not all( + [url, email, password, client_id, client_secret, device_id] + ): + raise BitwardenError("All parameters are required") self.email = email self.password = password self.client_id = client_id @@ -36,13 +38,20 @@ def __init__( self.device_id = device_id self.url = url.strip("/") self._http_client = Client( - base_url=f"{self.url}/", event_hooks={"response": [log_raise_for_status]} + base_url=f"{self.url}/", + event_hooks={"response": [log_raise_for_status]}, ) - self.api_token: Optional[ApiToken] = None + self._api_token: Optional[ApiToken] = None self.sync = None - def _get_api_token(self) -> Optional[ApiToken]: - return self.api_token + @property + def api_token(self) -> ApiToken: + assert self._api_token is not None + return self._api_token + + @api_token.setter + def api_token(self, value: ApiToken): + self._api_token = value # refresh api token if expired def _refresh_api_token(self) -> None: @@ -62,11 +71,10 @@ def _refresh_api_token(self) -> None: # login to api def _api_login(self) -> None: - token = self._get_api_token() - if token and not token.is_expired(): + if self._api_token and not self.api_token.is_expired(): return - if token and token.is_expired(): + if self._api_token and self.api_token.is_expired(): self._refresh_api_token() return @@ -92,10 +100,15 @@ def _api_login(self) -> None: salt=self.email, iterations=caseinsentive_key_search(json_resp, "KdfIterations"), ) - self.api_token = ApiToken(json_resp, master_key, json_resp["expires_in"]) + self._api_token = ApiToken( + json_resp, master_key, json_resp["expires_in"] + ) def _api_request( - self, method: Literal["GET", "POST", "DELETE", "PUT"], path: str, **kwargs: Any + self, + method: Literal["GET", "POST", "DELETE", "PUT"], + path: str, + **kwargs: Any, ) -> Response: self._api_login() headers = { @@ -103,7 +116,9 @@ def _api_request( "content-type": "application/json; charset=utf-8", "Accept": "*/*", } - return self._http_client.request(method, path, headers=headers, **kwargs) + return self._http_client.request( + method, path, headers=headers, **kwargs + ) def get_sync(self): if self.sync is None: @@ -112,15 +127,17 @@ def get_sync(self): return self.sync # Organization Management - def get_organization_user_details(self, organization_id: str, user_org_id: str): + def get_organization_user_details( + self, organization_id: str, user_org_id: str + ): return self._api_request( "GET", f"api/organizations/{organization_id}/users/{user_org_id}", params={"includeCollections": True, "includeGroups": True}, ).json() - # Get all users in an organization. Returns a dict with email as key and user details as value. - # If raw is True, returns the raw json response + # Get all users in an organization. Returns a dict with email as key and + # user details as value. If raw is True, returns the raw json response def get_organization_users(self, organization_id: str, raw=False): users = ( self._api_request( @@ -139,10 +156,10 @@ def get_org_key(self, org_id): sync = self.get_sync() profile = sync.get("Profile", None) if profile is None: - raise BitwardenException("No profile in Sync") + raise BitwardenError("No profile in Sync") orgs = profile.get("Organizations", None) if orgs is None: - raise BitwardenException("No Organizations in Sync[Profile]") + raise BitwardenError("No Organizations in Sync[Profile]") raw_key = None for org in orgs: if org.get("Id") == org_id: @@ -150,18 +167,19 @@ def get_org_key(self, org_id): break if raw_key is not None: return decrypt(raw_key, self.api_token.get("orgs_key")) - raise BitwardenException(f"No Organizations `{org_id}` found") + raise BitwardenError(f"No Organizations `{org_id}` found") def deduplicate_collection(self, organization_id): """ - Deduplicate collections with the same name in a given org, by moving users - and secrets into the bigger (by user count) collection + Deduplicate collections with the same name in a given org, by moving + users and secrets into the bigger (by user count) collection """ collections = self.get_organization_collections(organization_id) seen_names = {} duplicated = {} - # List duplicated collections, indexed on the name, referencing a list of collections id + # List duplicated collections, indexed on the name, referencing a + # list of collections id for name, collection in collections: if seen_names.get(name) is None: seen_names[name] = collection["Id"] @@ -175,8 +193,8 @@ def deduplicate_collection(self, organization_id): base_id = None nb_users = 0 users = {} - # Building user list by merging all accesses and choose which collection will not be deleted - # (based on most users) + # Building user list by merging all accesses and choose which + # collection will not be deleted (based on most users) for collection_id in id_list: coll_users = self.get_users_of_collection( organization_id, collection_id @@ -188,12 +206,14 @@ def deduplicate_collection(self, organization_id): users_list = list(users.values()) - # Removing the chosen collection from the list of collection to delete + # Removing the chosen collection from the list of collection to + # delete id_list.remove(base_id) self.set_users_of_collection(organization_id, base_id, users_list) for collection_id in id_list: - # List items inside the current collection and move these items to the chosen collection + # List items inside the current collection and move these + # items to the chosen collection ciphers = self.get_organizations_collection_items( organization_id, collection_id ) @@ -201,14 +221,20 @@ def deduplicate_collection(self, organization_id): cipher_collections = set(cipher.get("CollectionIds")) cipher_collections.remove(collection_id) cipher_collections.add(base_id) - self.change_collections_item(cipher["Id"], list(cipher_collections)) + self.change_collections_item( + cipher["Id"], list(cipher_collections) + ) # delete the current collection self.delete_collection(organization_id, collection_id) # Collections users Management def add_collection_to_user( - self, organization_id: str, user_org_id: str, accesses: dict, coll_id: str + self, + organization_id: str, + user_org_id: str, + accesses: dict, + coll_id: str, ): return self.add_collections_to_user( organization_id, user_org_id, accesses, [coll_id] @@ -228,7 +254,11 @@ def add_collections_to_user( ] logger.debug("User info | %s", accesses) known_collection = next( - (item for item in coll_list if item["Id"] not in accesses["Collections"]), + ( + item + for item in coll_list + if item["Id"] not in accesses["Collections"] + ), False, ) if known_collection is False: @@ -245,7 +275,11 @@ def add_collections_to_user( ) def remove_collection_to_user( - self, organization_id: str, user_org_id: str, accesses: dict, coll_id: str + self, + organization_id: str, + user_org_id: str, + accesses: dict, + coll_id: str, ): return self.remove_collections_to_user( organization_id, user_org_id, accesses, [coll_id] @@ -260,7 +294,12 @@ def remove_collections_to_user( ): data = {} known_collection = next( - (item for item in accesses["Collections"] if item["Id"] in coll_ids), False + ( + item + for item in accesses["Collections"] + if item["Id"] in coll_ids + ), + False, ) if known_collection is False: return @@ -288,10 +327,15 @@ def get_organization_items(self, organization_id, deleted=False): .get("Data") ) - def get_organizations_collection_items(self, organization_id, collections_id): + def get_organizations_collection_items( + self, organization_id, collections_id + ): ciphers = self.get_organization_items(organization_id) return list( - filter(lambda cipher: (collections_id in cipher["CollectionIds"]), ciphers) + filter( + lambda cipher: (collections_id in cipher["CollectionIds"]), + ciphers, + ) ) def change_collections_item(self, item_id, collection_ids): @@ -303,7 +347,11 @@ def change_collections_item(self, item_id, collection_ids): # Collections Management def create_collection( - self, org_id, collection_name, collections_names=None, collections_ids=None + self, + org_id, + collection_name, + collections_names=None, + collections_ids=None, ): if ( collections_names is not None @@ -326,9 +374,12 @@ def create_collection( collections_ids[data["Id"]] = data return data - # Return 2 dicts: one indexed by name, one indexed by id, both containing the collections details + # Return 2 dicts: one indexed by name, one indexed by id, + # both containing the collections details def get_organization_collections_dicts(self, org_id): - resp = self._api_request("GET", f"api/organizations/{org_id}/collections") + resp = self._api_request( + "GET", f"api/organizations/{org_id}/collections" + ) colls = resp.json().get("Data") res_by_name = {} res_by_id = {} @@ -344,7 +395,9 @@ def get_organization_collections_dicts(self, org_id): # Return a list of tuple (collection_name, collection_details) def get_organization_collections(self, org_id): - resp = self._api_request("GET", f"api/organizations/{org_id}/collections") + resp = self._api_request( + "GET", f"api/organizations/{org_id}/collections" + ) colls = resp.json().get("Data") res = [] org_key = self.get_org_key(org_id) @@ -354,7 +407,11 @@ def get_organization_collections(self, org_id): return res def get_collection_id_or_create( - self, org_id, collection_name, collections_names=None, collections_ids=None + self, + org_id, + collection_name, + collections_names=None, + collections_ids=None, ): if collections_names is None or collections_ids is None: ( @@ -372,11 +429,16 @@ def get_collection_id_or_create( def delete_collection(self, organization_id, collection_id): return self._api_request( - "DELETE", f"api/organizations/{organization_id}/collections/{collection_id}" + "DELETE", + f"api/organizations/{organization_id}/collections/{collection_id}", ) def get_matching_collections_ids_or_create( - self, org_id, collection_name, collections_names=None, collections_ids=None + self, + org_id, + collection_name, + collections_names=None, + collections_ids=None, ): if collections_names is None or collections_ids is None: ( @@ -393,7 +455,9 @@ def get_matching_collections_ids_or_create( return res def get_users_of_collection(self, organization_id, collection_id): - users = self.get_users_of_collection_raw(organization_id, collection_id) + users = self.get_users_of_collection_raw( + organization_id, collection_id + ) return {u["Id"]: u for u in users} def get_users_of_collection_raw(self, organization_id, collection_id): @@ -435,7 +499,11 @@ def invite_organisation_collection( "emails": [email], "Groups": [], "Collections": [ - {"hidePasswords": False, "id": collection_id, "readOnly": False} + { + "hidePasswords": False, + "id": collection_id, + "readOnly": False, + } ], "accessAll": False, "type": access_type, @@ -451,7 +519,11 @@ def invite_organisation_collections( "emails": [email], "Groups": [], "Collections": [ - {"hidePasswords": False, "id": collection_id, "readOnly": False} + { + "hidePasswords": False, + "id": collection_id, + "readOnly": False, + } for collection_id in collections_id ], "accessAll": False, @@ -473,7 +545,8 @@ def invite_organisation(self, org_id, user_accesses, email): "POST", f"api/organizations/{org_id}/users/invite", json=data ) - # Re-invite a user with the same accesses. Return True if at least one organization has been re-invited + # Re-invite a user with the same accesses. Return True if at least one + # organization has been re-invited def invite_with_accesses(self, organizations, email): logger.info("Re-invite with accesses") for org_id, accesses in organizations.items(): diff --git a/vaultwarden/clients/vaultwarden.py b/src/vaultwarden/clients/vaultwarden.py similarity index 73% rename from vaultwarden/clients/vaultwarden.py rename to src/vaultwarden/clients/vaultwarden.py index 2cdf4cf..25e6dc6 100644 --- a/vaultwarden/clients/vaultwarden.py +++ b/src/vaultwarden/clients/vaultwarden.py @@ -1,35 +1,37 @@ import http from http.cookiejar import Cookie -from typing import Optional, Literal, Any +from typing import Any, Literal, Optional -from httpx import Response, Client, HTTPStatusError +from httpx import Client, HTTPStatusError, Response -from .bitwarden import BitwardenClient -from ..models.api_models import VaultWardenUser -from ..models.exception_models import VaultwardenAdminException -from ..utils.logger import logger -from ..utils.tools import log_raise_for_status +from vaultwarden.clients.bitwarden import BitwardenClient +from vaultwarden.models.api_models import VaultWardenUser +from vaultwarden.models.exception_models import VaultwardenAdminError +from vaultwarden.utils.logger import logger +from vaultwarden.utils.tools import log_raise_for_status class VaultwardenAdminClient: def __init__(self, url: str, admin_secret_token: str, preload_users: bool): # If url or admin_secret_token is None, raise an exception if not url or not admin_secret_token: - raise VaultwardenAdminException("Missing url or admin_secret_token") + raise VaultwardenAdminError("Missing url or admin_secret_token") self.admin_secret_token = admin_secret_token self.url = url.strip("/") self._http_client = Client( base_url=f"{self.url}/admin/", event_hooks={"response": [log_raise_for_status]}, ) - self._id_mail_pool: Optional[dict[str, str]] = None + self._id_mail_pool: dict[str, str] = {} # Preload all users infos if preload_users: _ = self.get_all_users() def _get_admin_cookie(self) -> Optional[Cookie]: """Get the session cookie, required to authenticate requests""" - bw_cookies = (c for c in self._http_client.cookies.jar if c.name == "VW_ADMIN") + bw_cookies = ( + c for c in self._http_client.cookies.jar if c.name == "VW_ADMIN" + ) return next(bw_cookies, None) def _admin_login(self) -> None: @@ -46,12 +48,13 @@ def _admin_request( self, method: Literal["GET", "POST"], path: str, **kwargs: Any ) -> Response: self._admin_login() - return self._http_client.request(method, path, **kwargs) # type: ignore + return self._http_client.request(method, path, **kwargs) def _fill_id_mail_pool(self, users: list[VaultWardenUser]) -> None: """Cache the email->GUID mapping for the given users - Necessary since Vaultwarden does not offer a search or query-by-email endpoint + Necessary since Vaultwarden does not offer a search or + query-by-email endpoint """ self._id_mail_pool |= {u["Email"]: u["Id"] for u in users} @@ -82,23 +85,25 @@ def set_user_enabled(self, identifier: str, enabled: bool) -> None: def remove_2fa(self, email: str) -> None: user = self.get_user(email) if user is None: - raise VaultwardenAdminException(f"User {email} not found") - self._admin_request("POST", f"users/{email}/remove-2fa").raise_for_status() + raise VaultwardenAdminError(f"User {email} not found") + self._admin_request( + "POST", f"users/{email}/remove-2fa" + ).raise_for_status() - def get_user(self, search: str) -> Any | None: + def get_user(self, search: str) -> VaultWardenUser: """Search term is either an email in cache or a UUID. For textual search, use get_all_resources (expensive)""" assert isinstance(search, str) - if self._id_mail_pool is None: + if not self._id_mail_pool: self.get_all_users() if search in self._id_mail_pool: search = self._id_mail_pool[search] elif "@" in search: - # search is not a UUID (probably an email) but wasn't found in cache - return None + # search is not UUID (probably an email) but wasn't found in cache + raise VaultwardenAdminError(f"User {search} not found") # else assume it's a UUID resp = self._admin_request("GET", f"users/{search}") resp.raise_for_status() @@ -106,7 +111,9 @@ def get_user(self, search: str) -> Any | None: return resp.json() def get_all_users(self) -> list[VaultWardenUser]: - users: list[VaultWardenUser] = self._admin_request("GET", "users").json() + users: list[VaultWardenUser] = self._admin_request( + "GET", "users" + ).json() self._fill_id_mail_pool(users) return users @@ -117,26 +124,32 @@ def reset_account(self, email: str, bitwarden_client: BitwardenClient): ) if warning: check = input( - "WARNING: A organisation where you where present is not maintain by SOC account\n" + "WARNING: A organisation where you where present is not " + "maintain by SOC account\n" "Press 'yes' if you still want to reset the account" ) if check != "yes": logger.warning("Cancelling the reset") return logger.warning( - f"Doing reset on {email} despite having not complete information on its accesses" + f"Doing reset on {email} despite having not complete " + f"information on its accesses" ) - self.delete(user.get("Id")) + self.delete(user["Id"]) bitwarden_client.invite_with_accesses(accesses, user.get("Email")) return None def transfer_account_rights( - self, previous_email: str, new_email: str, bitwarden_client: BitwardenClient + self, + previous_email: str, + new_email: str, + bitwarden_client: BitwardenClient, ): res = True user: VaultWardenUser = self.get_user(previous_email) accesses, warning = bitwarden_client.get_user_org_accesses( - user_email=previous_email, user_organization_ids=user.get("Organizations") + user_email=previous_email, + user_organization_ids=user.get("Organizations"), ) if warning: logger.warning( @@ -148,5 +161,5 @@ def transfer_account_rights( else: res = bitwarden_client.invite_with_accesses(accesses, new_email) if res: - self.set_user_enabled(user.get("Id"), enabled=False) + self.set_user_enabled(user["Id"], enabled=False) return res diff --git a/vaultwarden/utils/__init__.py b/src/vaultwarden/models/__init__.py similarity index 100% rename from vaultwarden/utils/__init__.py rename to src/vaultwarden/models/__init__.py diff --git a/vaultwarden/models/api_models.py b/src/vaultwarden/models/api_models.py similarity index 89% rename from vaultwarden/models/api_models.py rename to src/vaultwarden/models/api_models.py index d5fa538..191a178 100644 --- a/vaultwarden/models/api_models.py +++ b/src/vaultwarden/models/api_models.py @@ -1,5 +1,5 @@ import time -from typing import TypedDict, Literal +from typing import Literal, TypedDict from bitwardentools.crypto import decrypt @@ -12,7 +12,9 @@ def __init__(self, token, master_key, expires_in: int): self.token = token self.expires = time.time() + expires_in self.token["user_key"] = decrypt(token["Key"], master_key) - self.token["orgs_key"] = decrypt(token["PrivateKey"], self.token["user_key"]) + self.token["orgs_key"] = decrypt( + token["PrivateKey"], self.token["user_key"] + ) def is_expired(self, now=None): if now is None: @@ -56,4 +58,3 @@ class VaultWardenUser(TypedDict): Organizations: list[VaultWardenOrg] UserEnabled: bool TwoFactorEnabled: bool - CreatedAt: str diff --git a/vaultwarden/models/exception_models.py b/src/vaultwarden/models/exception_models.py similarity index 58% rename from vaultwarden/models/exception_models.py rename to src/vaultwarden/models/exception_models.py index 0cfd9b4..eb39bb8 100644 --- a/vaultwarden/models/exception_models.py +++ b/src/vaultwarden/models/exception_models.py @@ -1,10 +1,10 @@ -class BitwardenException(Exception): +class BitwardenError(Exception): """BitwardenClient Exception class""" pass -class VaultwardenAdminException(Exception): +class VaultwardenAdminError(Exception): """VaultwardenAdminClient Exception class""" pass diff --git a/src/vaultwarden/utils/__init__.py b/src/vaultwarden/utils/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/vaultwarden/utils/logger.py b/src/vaultwarden/utils/logger.py similarity index 100% rename from vaultwarden/utils/logger.py rename to src/vaultwarden/utils/logger.py diff --git a/vaultwarden/utils/tools.py b/src/vaultwarden/utils/tools.py similarity index 79% rename from vaultwarden/utils/tools.py rename to src/vaultwarden/utils/tools.py index 3ace545..21d58c7 100644 --- a/vaultwarden/utils/tools.py +++ b/src/vaultwarden/utils/tools.py @@ -1,5 +1,3 @@ -from httpx import Response - from .logger import logger @@ -17,9 +15,11 @@ def get_matching_ids_from_ditcs(collections_names, collection_name): return res -def log_raise_for_status(response: Response) -> None: +def log_raise_for_status(response) -> None: if response.status_code == 403: - logger.error("Error: 403 Forbidden. Given Account has not access the data.") + logger.error( + "Error: 403 Forbidden. Given Account has not access the data." + ) if response.status_code >= 400: logger.error(f"Error: {response.status_code}") response.raise_for_status() diff --git a/tests/__init__.py b/tests/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/tests/utils/__init__.py b/tests/utils/__init__.py new file mode 100644 index 0000000..e69de29