Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,9 @@
# Changelog

## 2.3.1 - 2025-12-10
- Fix a bug where sops protected files would be rewritten without preserving
their sops protection.

## 2.3.0 - 2025-10-20
- Improve the user experience around old stale sessions that appear to be
initialized, but are actually expired. This is done by providing the new
Expand Down
54 changes: 47 additions & 7 deletions src/planet_auth/storage_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import subprocess
import time
from abc import ABC, abstractmethod
from enum import Enum
from typing import Optional, Dict, Any

from planet_auth.auth_exception import AuthException
Expand Down Expand Up @@ -96,9 +97,15 @@ def _default_storage_provider():
class _SOPSAwareFilesystemObjectStorageProvider(ObjectStorageProvider):
"""
Storage provider geared around backing a single object in a single file
with paths take from the root of the local file system.
with paths taken from the root of the local file system.
"""

_STORAGE_TYPE_KEY = "___SOPSAwareFilesystemObjectStorageProvider__storage_type"

class _StorageType(Enum):
PLAINTEXT = "plaintext"
SOPS = "sops"

def __init__(self, root: Optional[pathlib.Path] = None):
if root:
self._storage_root = root
Expand All @@ -116,13 +123,18 @@ def _obj_filepath(self, obj_key):
return obj_path

@staticmethod
def _is_sops_path(file_path):
def _is_sops_path(file_path: pathlib.Path) -> bool:
# TODO: Could be ".json.sops", or ".sops.json", depending on file
# level or field level encryption, respectively. We currently
# only look for and support field level encryption in json
# files with a ".sops.json" suffix.
return bool(file_path.suffixes == [".sops", ".json"])

@staticmethod
def _filter_write_object(data: dict) -> dict:
final_data = {k: v for k, v in data.items() if not (isinstance(k, str) and k.startswith("__"))}
return final_data

@staticmethod
def _read_json(file_path: pathlib.Path):
auth_logger.debug(msg="Loading JSON data from file {}".format(file_path))
Expand All @@ -137,7 +149,7 @@ def _read_json_sops(file_path: pathlib.Path):

@staticmethod
def _write_json(file_path: pathlib.Path, data: dict):
auth_logger.debug(msg="Writing JSON data to file {}".format(file_path))
auth_logger.debug(msg="Writing JSON data to cleartext file {}".format(file_path))
with open(file_path, mode="w", encoding="UTF-8") as file_w:
os.chmod(file_path, stat.S_IREAD | stat.S_IWRITE)
_no_none_data = {key: value for key, value in data.items() if value is not None}
Expand All @@ -155,26 +167,54 @@ def _write_json_sops(file_path: pathlib.Path, data: dict):
# ['sops', '-e', '--input-type', 'json', '--output-type',
# 'json', '--output', file_path, '/dev/stdin'],
# stdin=data_f)
auth_logger.debug(msg="Writing JSON data to SOPS encrypted file {}".format(file_path))
_SOPSAwareFilesystemObjectStorageProvider._write_json(file_path, data)
auth_logger.debug(msg="Writing JSON data to SOPS encrypted file {}".format(file_path))
subprocess.check_call(["sops", "-e", "--input-type", "json", "--output-type", "json", "-i", file_path])

@staticmethod
def _load_file(file_path: pathlib.Path) -> dict:
if _SOPSAwareFilesystemObjectStorageProvider._is_sops_path(file_path):
new_data = _SOPSAwareFilesystemObjectStorageProvider._read_json_sops(file_path)
new_data[_SOPSAwareFilesystemObjectStorageProvider._STORAGE_TYPE_KEY] = (
_SOPSAwareFilesystemObjectStorageProvider._StorageType.SOPS.value
)
else:
new_data = _SOPSAwareFilesystemObjectStorageProvider._read_json(file_path)
new_data[_SOPSAwareFilesystemObjectStorageProvider._STORAGE_TYPE_KEY] = (
_SOPSAwareFilesystemObjectStorageProvider._StorageType.PLAINTEXT.value
)

return new_data

@staticmethod
def _do_sops(file_path: pathlib.Path, data: dict) -> bool:
if _SOPSAwareFilesystemObjectStorageProvider._is_sops_path(file_path):
return True
if (
data
and data.get(_SOPSAwareFilesystemObjectStorageProvider._STORAGE_TYPE_KEY)
== _SOPSAwareFilesystemObjectStorageProvider._StorageType.SOPS.value
):
auth_logger.warning(msg=f"Data sourced from SOPS being written cleartext to the file {file_path}.")
# Upgrading to SOPS would be great, but also problematic.
# The problem is that if we are writing to SOPS we should use a
# SOPS file name so that we know we should read it as a SOPS file
# later. We can't change the name here because the caller would
# not know what we did, and may not be able to find the object
# later.

return False

@staticmethod
def _save_file(file_path: pathlib.Path, data: dict):
file_path.parent.mkdir(parents=True, exist_ok=True)
if _SOPSAwareFilesystemObjectStorageProvider._is_sops_path(file_path):
_SOPSAwareFilesystemObjectStorageProvider._write_json_sops(file_path, data)
do_sops = _SOPSAwareFilesystemObjectStorageProvider._do_sops(file_path, data)
write_data = _SOPSAwareFilesystemObjectStorageProvider._filter_write_object(data)

if do_sops:
_SOPSAwareFilesystemObjectStorageProvider._write_json_sops(file_path, write_data)
else:
_SOPSAwareFilesystemObjectStorageProvider._write_json(file_path, data)
_SOPSAwareFilesystemObjectStorageProvider._write_json(file_path, write_data)

def load_obj(self, key: ObjectStorageProvider_KeyType) -> dict:
obj_filepath = self._obj_filepath(key)
Expand Down
2 changes: 1 addition & 1 deletion src/planet_auth_utils/commands/cli/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ def cmd_plauth_login(
)
print("Login succeeded.") # Errors should throw.

post_login_cmd_helper(override_auth_context=override_auth_context, use_sops=sops, prompt_pre_selection=yes)
post_login_cmd_helper(override_auth_context=override_auth_context, use_sops_opt=sops, prompt_pre_selection=yes)


cmd_plauth.add_command(cmd_oauth)
Expand Down
2 changes: 1 addition & 1 deletion src/planet_auth_utils/commands/cli/oauth_cmd.py
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ def cmd_oauth_login(
extra=login_extra,
)
print("Login succeeded.") # Errors should throw.
post_login_cmd_helper(override_auth_context=current_auth_context, use_sops=sops, prompt_pre_selection=yes)
post_login_cmd_helper(override_auth_context=current_auth_context, use_sops_opt=sops, prompt_pre_selection=yes)


@cmd_oauth.command("refresh")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ def cmd_pllegacy_login(ctx, username, password, sops, yes):
password=password,
)
print("Login succeeded.") # Errors should throw.
post_login_cmd_helper(override_auth_context=current_auth_context, use_sops=sops, prompt_pre_selection=yes)
post_login_cmd_helper(override_auth_context=current_auth_context, use_sops_opt=sops, prompt_pre_selection=yes)


@cmd_pllegacy.command("print-api-key")
Expand Down
22 changes: 21 additions & 1 deletion src/planet_auth_utils/commands/cli/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,18 @@
import json
from typing import List, Optional

import planet_auth.logging.auth_logger
import planet_auth
from planet_auth.constants import AUTH_CONFIG_FILE_SOPS, AUTH_CONFIG_FILE_PLAIN
from planet_auth.storage_utils import _SOPSAwareFilesystemObjectStorageProvider
from planet_auth.util import custom_json_class_dumper

from planet_auth_utils.builtins import Builtins
from planet_auth_utils.profile import Profile
from .prompts import prompt_and_change_user_default_profile_if_different

auth_logger = planet_auth.logging.auth_logger.getAuthLogger()


def monkeypatch_hide_click_cmd_options(cmd, hide_options: List[str]):
"""
Expand Down Expand Up @@ -68,7 +72,7 @@ def print_obj(obj):


def post_login_cmd_helper(
override_auth_context: planet_auth.Auth, use_sops, prompt_pre_selection: Optional[bool] = None
override_auth_context: planet_auth.Auth, use_sops_opt, prompt_pre_selection: Optional[bool] = None
):
override_profile_name = override_auth_context.profile_name()
if not override_profile_name:
Expand All @@ -88,6 +92,22 @@ def post_login_cmd_helper(
# helping CLI commands, we can be more opinionated about what is to
# be done.

# If the profile was read from sops, selected sops even if wasn't an
# explict command line option.
use_sops = use_sops_opt
if not use_sops:
# Here in the CLI we can assume that the default storage provider
# _SOPSAwareFilesystemObjectStorageProvider is being used. Unlike general
# library use, we do not support caller provided custom storage providers
# in the CLI tools at this time.
_config_data = override_auth_context.auth_client().config().data() or {}
if (
_config_data.get(_SOPSAwareFilesystemObjectStorageProvider._STORAGE_TYPE_KEY)
== _SOPSAwareFilesystemObjectStorageProvider._StorageType.SOPS.value
):
auth_logger.debug(msg="Implicitly selecting SOPS based on data originating from SOPS protected storage.")
use_sops = True

# Don't clobber built-in profiles.
if not Builtins.is_builtin_profile(override_profile_name):
if use_sops:
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,13 @@ def setUp(self):
self.tmp_dir = tempfile.TemporaryDirectory()
self.tmp_dir_path = pathlib.Path(self.tmp_dir.name)

@staticmethod
def _filter_spdata(data: dict) -> dict:
# Storage providers are known to add "__" keys for their own use.
# These should not be considered part of the general data.
filtered_data = {k: v for k, v in data.items() if not (isinstance(k, str) and k.startswith("__"))}
return filtered_data

def under_test_happy_path(self):
credential_path = self.tmp_dir_path / "refreshing_oidc_authenticator_test_token__with_refresh.json"
test_credential = self.mock_auth_login_and_command_initialize(
Expand Down Expand Up @@ -485,7 +492,7 @@ def test_side_band_update_credential_data(self):
# It should not be necessary to simulate an API call for everything to be set
current_credential_data = under_test._credential.data()
self.assertNotEqual(current_credential_data, initial_credential_data)
self.assertEqual(current_credential_data, sideband_credential.data())
self.assertEqual(self._filter_spdata(current_credential_data), sideband_credential.data())
self.assertEqual(under_test._token_body, sideband_credential.access_token())
self.assertNotEqual(0, under_test._refresh_at)
self.assertNotEqual(0, under_test._credential._load_time)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,13 @@ def setUp(self):
# This seems to help.
os.environ["TZ"] = "UTC"

@staticmethod
def _filter_spdata(data: dict) -> dict:
# Storage providers are known to add "__" keys for their own use.
# These should not be considered part of the general data.
filtered_data = {k: v for k, v in data.items() if not (isinstance(k, str) and k.startswith("__"))}
return filtered_data

def test_set_data_asserts_valid(self):
under_test = Credential(data=None, file_path=None)
with self.assertRaises(FileBackedJsonObjectException):
Expand Down Expand Up @@ -129,14 +136,14 @@ def test_load_and_failed_reload(self):
# Load works when we have a valid file
under_test.set_path(tdata_resource_file_path("keys/base_test_credential.json"))
under_test.load()
self.assertEqual({"test_key": "test_value"}, under_test.data())
self.assertEqual({"test_key": "test_value"}, self._filter_spdata(under_test.data()))

# A subsequent failed load should throw, but leave the data unchanged.
under_test.set_path(tdata_resource_file_path("keys/FILE_DOES_NOT_EXIST.json"))
with self.assertRaises(FileNotFoundError):
under_test.load()

self.assertEqual({"test_key": "test_value"}, under_test.data())
self.assertEqual({"test_key": "test_value"}, self._filter_spdata(under_test.data()))

def test_load_file_not_found(self):
under_test = Credential(data=None, file_path=tdata_resource_file_path("keys/FILE_DOES_NOT_EXIST.json"))
Expand Down Expand Up @@ -166,7 +173,7 @@ def test_lazy_load(self):
under_test = Credential(data=None, file_path=tdata_resource_file_path("keys/base_test_credential.json"))
self.assertIsNone(under_test.data())
under_test.lazy_load()
self.assertEqual({"test_key": "test_value"}, under_test.data())
self.assertEqual({"test_key": "test_value"}, self._filter_spdata(under_test.data()))

# if the path is invalid, it should error.
under_test = Credential(data=None, file_path=tdata_resource_file_path("keys/FILE_DOES_NOT_EXIST.json"))
Expand Down Expand Up @@ -204,7 +211,7 @@ def test_lazy_reload_initial_load_behavior(self):
under_test = Credential(data=None, file_path=tdata_resource_file_path("keys/base_test_credential.json"))
self.assertIsNone(under_test.data())
under_test.lazy_reload()
self.assertEqual({"test_key": "test_value"}, under_test.data())
self.assertEqual({"test_key": "test_value"}, self._filter_spdata(under_test.data()))

# if the path is invalid, it should error.
under_test = Credential(data=None, file_path=tdata_resource_file_path("keys/FILE_DOES_NOT_EXIST.json"))
Expand Down Expand Up @@ -299,7 +306,7 @@ def test_save(self):
under_test.save()
test_reader = Credential(data=None, file_path=test_path)
test_reader.load()
self.assertEqual(test_data, test_reader.data())
self.assertEqual(test_data, self._filter_spdata(test_reader.data()))

def test_getters_setters(self):
test_path = pathlib.Path("/test/test_credential.json")
Expand Down
2 changes: 1 addition & 1 deletion version.txt
Original file line number Diff line number Diff line change
@@ -1 +1 @@
2.3.0
2.3.1