Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion docs/changelog.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# Changelog

## 2.0.4.X - 2025-03-09
## 2.0.4.X - 2025
- Initial Beta release series to shakedown public release pipelines and
initial integrations.

Expand Down
8 changes: 4 additions & 4 deletions docs/index.md
Original file line number Diff line number Diff line change
@@ -1,15 +1,15 @@
# Planet Auth Utility Library

The Planet Auth Library provides generic authentication utilities for clients
and for services. For clients, it provides means to obtain access tokens that
and services. For clients, it provides the means to obtain access tokens that
can be used to access network services. For services, it provides tools to
validate the same access tokens.

The architecture of the code was driven by OAuth2, but is intended to be easily
extensible to new authentication protocols in the future. Since both clients
extensible to new authentication protocols in the future. Since clients
and resource servers are both themselves clients to authorization servers in
an OAuth2 deployment, this combining of resource client and resource server
concerns in a single library was seen as natural.
an OAuth2 deployment, this combining of client and server concerns in a single
library was seen as natural.

Currently, this library supports OAuth2, Planet's legacy proprietary
authentication protocols, and static API keys.
Expand Down
1 change: 1 addition & 0 deletions mkdocs.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ site_name: Planet Auth Library
site_description: Planet Auth Library
site_url: https://planet.com/
strict: true
dev_addr: 127.0.0.1:8001

#watch:
# - src
Expand Down
2 changes: 1 addition & 1 deletion src/planet_auth/oidc/token_validator.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ def validate_token(
return validated_claims

@staticmethod
def unverified_decode(token_str):
def hazmat_unverified_decode(token_str):
# WARNING: Treat unverified token claims like toxic waste.
# Nothing can be trusted until the token is verified.
unverified_complete = jwt.decode_complete(token_str, options={"verify_signature": False}) # nosemgrep
Expand Down
14 changes: 14 additions & 0 deletions src/planet_auth_utils/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,12 +51,18 @@
cmd_profile_set,
cmd_profile_show,
)
from .commands.cli.jwt_cmd import (
cmd_jwt,
cmd_jwt_decode,
cmd_jwt_validate_oauth,
)
from .commands.cli.options import (
opt_api_key,
opt_audience,
opt_client_id,
opt_client_secret,
opt_human_readable,
opt_issuer,
opt_loglevel,
opt_long,
opt_open_browser,
Expand All @@ -68,8 +74,10 @@
opt_scope,
opt_show_qr_code,
opt_sops,
opt_token,
opt_token_file,
opt_username,
opt_yes_no,
)
from .commands.cli.util import recast_exceptions_to_click
from planet_auth_utils.constants import EnvironmentVariables
Expand All @@ -81,6 +89,9 @@
__all__ = [
"cmd_plauth_embedded",
"cmd_plauth_login",
"cmd_jwt",
"cmd_jwt_decode",
"cmd_jwt_validate_oauth",
"cmd_oauth",
"cmd_oauth_login",
"cmd_oauth_refresh",
Expand Down Expand Up @@ -110,6 +121,7 @@
"opt_client_id",
"opt_client_secret",
"opt_human_readable",
"opt_issuer",
"opt_loglevel",
"opt_long",
"opt_open_browser",
Expand All @@ -121,8 +133,10 @@
"opt_scope",
"opt_show_qr_code",
"opt_sops",
"opt_token",
"opt_token_file",
"opt_username",
"opt_yes_no",
"recast_exceptions_to_click",
#
"Builtins",
Expand Down
253 changes: 253 additions & 0 deletions src/planet_auth_utils/commands/cli/jwt_cmd.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
# Copyright 2025 Planet Labs PBC.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import click
import json
import pathlib
import sys
import textwrap
import time
import typing

from planet_auth import (
AuthException,
TokenValidator,
OidcMultiIssuerValidator,
)
from planet_auth.util import custom_json_class_dumper

from .options import (
opt_audience,
opt_issuer,
opt_token,
opt_token_file,
opt_human_readable,
)
from .util import recast_exceptions_to_click


class _jwt_human_dumps:
"""
Wrapper object for controlling the json.dumps behavior of JWTs so that
we can display a version different from what is stored in memory.

For pretty printing JWTs, we convert timestamps into
human-readable strings.
"""

def __init__(self, data):
self._data = data

def __json_pretty_dumps__(self):
def _human_timestamp_iso(d):
for key, value in list(d.items()):
if key in ["iat", "exp", "nbf"] and isinstance(value, int):
fmt_time = time.strftime("%Y-%m-%dT%H:%M:%S%z", time.localtime(value))
if (key == "exp") and (d[key] < time.time()):
fmt_time += " (Expired)"
d[key] = fmt_time
elif isinstance(value, dict):
_human_timestamp_iso(value)
return d

json_dumps = self._data.copy()
_human_timestamp_iso(json_dumps)
return json_dumps


def json_dumps_for_jwt_dict(data: dict, human_readable: bool, indent: int = 2):
if human_readable:
return json.dumps(_jwt_human_dumps(data), indent=indent, sort_keys=True, default=custom_json_class_dumper)
else:
return json.dumps(data, indent=2, sort_keys=True)


def print_jwt_parts(raw, header, body, signature, human_readable):
if raw:
print(f"RAW:\n {raw}\n")

if header:
print(
f'HEADER:\n{textwrap.indent(json_dumps_for_jwt_dict(data=header, human_readable=human_readable), prefix=" ")}\n'
)

if body:
print(
f'BODY:\n{textwrap.indent(json_dumps_for_jwt_dict(body, human_readable=human_readable), prefix=" ")}\n'
)

if signature:
pretty_hex_signature = ""
i = 0
for c in signature:
if i == 0:
pass
elif (i % 16) != 0:
pretty_hex_signature += ":"
else:
pretty_hex_signature += "\n"

pretty_hex_signature += "{:02x}".format(c)
i += 1

print(f'SIGNATURE:\n{textwrap.indent(pretty_hex_signature, prefix=" ")}\n')


def hazmat_print_jwt(token_str, human_readable):
print("UNTRUSTED JWT Decoding\n")
if token_str:
(hazmat_header, hazmat_body, hazmat_signature) = TokenValidator.hazmat_unverified_decode(token_str)
print_jwt_parts(
raw=token_str,
header=hazmat_header,
body=hazmat_body,
signature=hazmat_signature,
human_readable=human_readable,
)


@click.group("jwt", invoke_without_command=True)
@click.pass_context
def cmd_jwt(ctx):
"""
JWT utility for working with tokens. These functions are primarily targeted
towards debugging usage. Many of the functions do not perform token validation.
THE CONTENTS OF UNVALIDATED TOKENS MUST BE TREATED AS UNTRUSTED AND POTENTIALLY
MALICIOUS.
"""
if ctx.invoked_subcommand is None:
click.echo(ctx.get_help())
sys.exit(0)


def _get_token_or_fail(token_opt: typing.Optional[str], token_file_opt: typing.Optional[pathlib.Path]):
if token_opt:
token = token_opt
elif token_file_opt:
with open(token_file_opt, mode="r", encoding="UTF-8") as file_r:
token = file_r.read()
else:
# click.echo(ctx.get_help())
# click.echo()
raise click.UsageError("A token must be provided.")
return token


@cmd_jwt.command("decode")
@click.pass_context
@opt_human_readable
@opt_token
@opt_token_file
@recast_exceptions_to_click(AuthException, FileNotFoundError)
def cmd_jwt_decode(ctx, token: str, token_file: pathlib.Path, human_readable):
"""
Decode a JWT token WITHOUT PERFORMING ANY VALIDATION.
"""
token_to_print = _get_token_or_fail(token_opt=token, token_file_opt=token_file)
hazmat_print_jwt(token_str=token_to_print, human_readable=human_readable)


@cmd_jwt.command("validate-oauth")
@click.pass_context
@opt_human_readable
@opt_token
@opt_token_file
@opt_audience()
@opt_issuer()
@recast_exceptions_to_click(AuthException, FileNotFoundError)
def cmd_jwt_validate_oauth(ctx, token, token_file, audience, issuer, human_readable):
"""
Perform signature validation on an RFC 9068 compliant JWT token.
The `iss` and `aud` claims will be used to look up signing keys
using OAuth2/OIDC discovery protocols and perform basic validation
checks.

This command performs only basic signature verification and token validity
checks. For checks against auth server token revocation lists, see the `oauth`
command. For deeper checks specific to the claims and structure of
Identity or Access tokens, see the `oauth` command.

WARNING:\n
THIS TOOL IS ABSOLUTELY INAPPROPRIATE FOR PRODUCTION TRUST USAGE. This is a
development and debugging utility. The default behavior to inspect the token
for issuer and audience information used to validate the token is wholly
incorrect for a production use case. The decision of which issuers to
trust with which audiences MUST be controlled by the service operator.
"""
token_to_validate = _get_token_or_fail(token_opt=token, token_file_opt=token_file)
(hazmat_header, hazmat_body, hazmat_signature) = TokenValidator.hazmat_unverified_decode(token_to_validate)

if issuer:
validation_iss = issuer
else:
if not hazmat_body.get("iss"):
raise click.BadParameter(
"The provided token does not contain an `iss` claim. Is the provided JWT RFC 9068 compliant?"
)
validation_iss = hazmat_body.get("iss")

if audience:
validation_aud = audience
else:
if not hazmat_body.get("aud"):
raise click.BadParameter(
"The provided token does not contain an `aud` claim. Is the provided JWT RFC 9068 compliant?"
)
hazmat_aud = hazmat_body.get("aud")
if isinstance(hazmat_aud, list):
validation_aud = hazmat_aud[0]
else:
validation_aud = hazmat_aud

validator = OidcMultiIssuerValidator.from_auth_server_urls(
trusted_auth_server_urls=[validation_iss], audience=validation_aud, log_result=False
)
validated_body, _ = validator.validate_access_token(token_to_validate, do_remote_revocation_check=False)
# Validation throws on error
click.echo("TOKEN OK")
print_jwt_parts(
raw=token_to_validate,
header=hazmat_header,
body=validated_body,
signature=hazmat_signature,
human_readable=human_readable,
)


@cmd_jwt.command("validate-rs256")
@click.pass_context
@opt_human_readable
@opt_token
@opt_token_file
@recast_exceptions_to_click(AuthException, FileNotFoundError, NotImplementedError)
def cmd_jwt_validate_rs256(ctx, token, token_file, human_readable):
"""
Validate a JWT signed with a RS256 signature
"""
# token_to_validate = _get_token_or_fail(token_opt=token, token_file_opt=token_file)
raise NotImplementedError("Command not implemented")


@cmd_jwt.command("validate-hs512")
@click.pass_context
@opt_human_readable
@opt_token
@opt_token_file
@recast_exceptions_to_click(AuthException, FileNotFoundError, NotImplementedError)
def cmd_jwt_validate_hs512(ctx, token, token_file, human_readable):
"""
Validate a JWT signed with a HS512 signature
"""
# token_to_validate = _get_token_or_fail(token_opt=token, token_file_opt=token_file)
raise NotImplementedError("Command not implemented")
Loading