diff --git a/Dockerfile b/Dockerfile index 61e4d03..7ad6823 100644 --- a/Dockerfile +++ b/Dockerfile @@ -6,7 +6,7 @@ WORKDIR /app # Copy the necessary files COPY src/ /app/src/ -COPY requirements.txt phase_kubernetes_operator.py /app/ +COPY requirements.txt /app/ # Install required Python packages RUN pip install --no-cache-dir -r requirements.txt @@ -19,4 +19,4 @@ RUN adduser -D operator-usr USER operator-usr # Run the operator script using Kopf -CMD ["kopf", "run", "/app/phase_kubernetes_operator.py"] +CMD ["kopf", "run", "/app/main.py"] diff --git a/README.md b/README.md index adb4f2f..a902d6f 100644 --- a/README.md +++ b/README.md @@ -1,6 +1,7 @@ # Phase Kubernetes Operator -### Securely manage and sync environment variables with Phase in your Kubernetes cluster. +### Securely sync secrets and environment variables with Phase in your Kubernetes cluster. + ``` ⠀⠀⠀⠀⠀⠀⠀⠀⠀⢠⠔⠋⣳⣖⠚⣲⢖⠙⠳⡄⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ ⠀⠀⠀⠀⠀⠀⠀⠀⡴⠉⢀⡼⠃⢘⣞⠁⠙⡆⠀⠘⡆⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀⠀ @@ -21,12 +22,168 @@ ``` ## Features -// To DO -## Installation -// To DO +- Automatically sync secrets to your Kubernetes cluster +- End-to-End encryption +- Automatically redeploy deployments when a secret is updated +- Sync secrets based on environment (dev, staging, prod) and tags +- Transform secrets via secret processors + +```yaml +metadata: + annotations: + secrets.phase.dev/redeploy: "true" +``` + +## Installation: + +### 1. Install the Operator via Helm + +Add the Phase Helm repository and update it: + +```fish +helm repo add phase https://helm.phase.dev && helm repo update +``` + +Install the Phase Secrets Operator: + +```fish +helm install phase-secrets-operator phase/phase-kubernetes-operator --set image.tag=v0.1.0 +``` + + It's best practice to specify the version in production environments to avoid + unintended upgrades. Find available versions on our [GitHub + releases](https://github.com/phasehq/kubernetes-secrets-operator/releases). + +### 2. Create a Service Token Secret in Kubernetes + +Securely create a Service Token Secret using `read` (recommended for more security as it avoids writing the token to disk or shell history) + +Run this command, paste the Phase Service Token and hit enter: + +```fish +read -s TOKEN +kubectl create secret generic phase-service-token \ + --from-literal=token=$TOKEN \ + --type=Opaque \ + --namespace=default +unset TOKEN +``` + +Alternatively, create it directly using `kubectl`: + +```fish +kubectl create secret generic phase-service-token \ + --from-literal=token= \ + --type=Opaque \ + --namespace=default +``` + +### 3. Deploy the Phase Secrets Operator CR (Custom Resource) + +Create a custom resource file: `phase-secrets-operator-cr.yaml` + +```yaml +apiVersion: secrets.phase.dev/v1alpha1 +kind: PhaseSecret +metadata: + name: example-phase-secret + namespace: default +spec: + phaseApp: "the-name-of-your-phase-app" # The name of your Phase application + phaseAppEnv: "prod" # OPTIONAL - The Phase application environment to fetch secrets from + phaseHost: "https://console.phase.dev" # OPTIONAL - URL of the Phase Console instance + authentication: + serviceToken: + serviceTokenSecretReference: + secretName: "phase-service-token" # Name of the service token with access to your Phase application + secretNamespace: "default" + managedSecretReferences: + - secretName: "my-application-secret" # Name of the Kubernetes managed secret that Phase will sync + secretNamespace: "default" +``` + +Deploy the custom resource: + +```fish +kubectl apply -f phase-secrets-operator-cr.yaml +``` + +Watch for `my-application-secret` managed secret being created: + +```fish +watch kubectl get secrets +``` + +View the secrets: + +```fish +kubectl get secret my-application-secret -o yaml +``` + + The operator automatically synchronizes secrets every 60 seconds. -## Usage -// To DO +[Phase Kubernetes Operator - Docs](https://docs.phase.dev/integrations/platforms/kubernetes) -Development: +## Development: + +1. Install python dependencies + +``` +pip3 install -r requirements.txt +``` + +2. Create a local kind cluster (skip if you have one already setup) + +```fish +kind create cluster +``` + +3. Export kindconfig + +``` +kind get kubeconfig --name "kind" > ~/.kube/config +``` + +Verify that the cluster is up: + +``` +kubectl get nodes +``` + +4. Create a copy of the CR (Custom Resource) and CRD (Custom Resource Definition): + +``` +cp cr-template.yaml dev-cr.yaml +``` + +``` +cp crd-template.yaml dev-crd.yaml +``` + +Feel free to make changes + +5. Create a secret in kubernetes containing the Phase Service Token + +```fish +kubectl create secret generic phase-service-token \ + --from-literal=token= \ + --type=Opaque \ + --namespace=default +``` + +6. Apply the CRD and CR respectively + +```fish +kubectl apply -f dev-crd.yaml +``` + +```fish +kubectl apply -f dev-cr.yaml +``` + +7. Start the operator via Kopf + +```fish +kopf run src/main.py +``` diff --git a/cr-template.yaml b/cr-template.yaml index fe31d38..bf4ef4e 100644 --- a/cr-template.yaml +++ b/cr-template.yaml @@ -4,7 +4,9 @@ metadata: name: example-phase-secret namespace: default spec: - phaseAppEnv: "prod" # OPTIONAL - The Phase application environment to fetch secrets from + phaseApp: "your-phase-application" # The name of your Phase application + phaseAppEnv: "production" # OPTIONAL The Phase application environment to fetch secrets from + phaseAppEnvTag: "certs" # OPTIONAL Tag for filtering secrets in the specified Phase app environment. phaseHost: "https://console.phase.dev" # OPTIONAL - URL of a Phase Console instance authentication: serviceToken: @@ -12,5 +14,5 @@ spec: secretName: "phase-service-token" # Name of the Phase service token with access to your application secretNamespace: "default" managedSecretReferences: # Managed secrets in Kubernetes that Phase will sync secrets with - - secretName: "my-application-secret" # Name of the managed secret in Kubernetes that will be consumed by your application + - secretName: "my-application-secret" # Name of the managed secret in Kubernetes that will be consumed by your application secretNamespace: "default" diff --git a/crd-template.yaml b/crd-template.yaml index c8f23ec..3a654af 100644 --- a/crd-template.yaml +++ b/crd-template.yaml @@ -18,25 +18,40 @@ spec: description: PhaseSecret is the Schema for the phasesecrets API properties: apiVersion: - description: APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. + description: APIVersion defines the versioned schema of this representation of an object. type: string kind: - description: Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. + description: Kind is a string value representing the REST resource this object represents. type: string metadata: type: object spec: + type: object description: PhaseSecretSpec defines the desired state of PhaseSecret properties: + phaseApp: + description: The Phase application to fetch secrets from. + type: string phaseAppEnv: description: The environment variable representing the app environment in Phase. type: string - default: "development" + default: "production" + phaseAppEnvTag: + description: Tag for filtering secrets in the specified Phase app environment. + type: string authentication: + type: object properties: serviceToken: + type: object + required: + - serviceTokenSecretReference properties: serviceTokenSecretReference: + type: object + required: + - secretName + - secretNamespace properties: secretName: description: The name of the Kubernetes Secret. @@ -44,14 +59,6 @@ spec: secretNamespace: description: The namespace where the Kubernetes Secret is located. type: string - required: - - secretName - - secretNamespace - type: object - required: - - serviceTokenSecretReference - type: object - type: object phaseHost: description: Phase host to pull secrets from. type: string @@ -60,6 +67,11 @@ spec: description: References to multiple managed Kubernetes Secrets. type: array items: + type: object + required: + - secretName + - secretNamespace + - secretType properties: secretName: description: The name of the Kubernetes Secret. @@ -67,68 +79,100 @@ spec: secretNamespace: description: The namespace where the Kubernetes Secret is located. type: string - required: - - secretName - - secretNamespace - type: object + secretType: + description: The type of the Kubernetes Secret. + type: string + default: "Opaque" + enum: + - "Opaque" + - "kubernetes.io/tls" + - "kubernetes.io/service-account-token" + - "kubernetes.io/dockercfg" + - "kubernetes.io/dockerconfigjson" + - "kubernetes.io/basic-auth" + - "kubernetes.io/ssh-auth" + - "bootstrap.kubernetes.io/token" + processors: + description: Processors to transform the data during ingestion. + type: object + additionalProperties: + type: object + properties: + asName: + description: The mapped name of the field in the managed secret. + type: string + nameTransformer: + description: The format for transforming the secret key name. + type: string + enum: + - "camel" + - "upper-camel" + - "lower-snake" + - "tf-var" + - "dotnet-env" + - "lower-kebab" + type: + description: The type of process to be performed. + type: string + enum: + - "plain" + - "base64" + default: "plain" pollingInterval: description: Interval at which to poll for secret updates. type: integer default: 60 - minimum: 15 required: - - phaseAppEnv - - managedSecretReferences - - phaseHost - - pollingInterval - type: object + - phaseApp + - managedSecretReferences + - phaseHost status: description: PhaseSecretStatus defines the observed state of PhaseSecret + type: object properties: conditions: + description: Conditions representing the current state of the resource. + type: array items: - description: Condition contains details for one aspect of the current state of this API Resource. + type: object + required: + - lastTransitionTime + - message + - reason + - status + - type properties: lastTransitionTime: - description: lastTransitionTime is the last time the condition transitioned from one status to another. - format: date-time + description: Last time the condition transitioned from one status to another. type: string + format: date-time message: - description: message is a human readable message indicating details about the transition. - maxLength: 32768 + description: Human-readable message indicating details about the transition. type: string + maxLength: 32768 observedGeneration: - description: observedGeneration represents the .metadata.generation that the condition was set based upon. + description: Generation that the condition was set based upon. + type: integer format: int64 minimum: 0 - type: integer reason: - description: reason contains a programmatic identifier indicating the reason for the conditions last transition. + description: Programmatic identifier for the reason of the condition's last transition. + type: string maxLength: 1024 minLength: 1 - type: string status: - description: status of the condition, one of True, False, Unknown. - enum: - - "True" - - "False" - - "Unknown" + description: Status of the condition. type: string + enum: + - "True" + - "False" + - "Unknown" type: - description: type of condition in CamelCase or in foo.example.com/CamelCase. - maxLength: 316 + description: Type of condition in CamelCase. type: string - required: - - lastTransitionTime - - message - - reason - - status - - type - type: object - type: array + maxLength: 316 required: - - conditions - type: object + - conditions served: true storage: true subresources: diff --git a/phase-kubernetes-operator/Chart.yaml b/phase-kubernetes-operator/Chart.yaml index 0d53cb3..d17cf6c 100644 --- a/phase-kubernetes-operator/Chart.yaml +++ b/phase-kubernetes-operator/Chart.yaml @@ -4,10 +4,10 @@ description: A Helm chart for deploying the Phase Kubernetes Operator type: application # Version of the chart -version: 0.1.0 +version: 1.0.0 # Version of the application (operator) that is being deployed -appVersion: "0.1.0" +appVersion: "1.0.0" # Keywords, maintainers, and source URLs can also be added here keywords: diff --git a/phase-kubernetes-operator/crds/crd-template.yaml b/phase-kubernetes-operator/crds/crd-template.yaml index c8f23ec..3a654af 100644 --- a/phase-kubernetes-operator/crds/crd-template.yaml +++ b/phase-kubernetes-operator/crds/crd-template.yaml @@ -18,25 +18,40 @@ spec: description: PhaseSecret is the Schema for the phasesecrets API properties: apiVersion: - description: APIVersion defines the versioned schema of this representation of an object. Servers should convert recognized schemas to the latest internal value, and may reject unrecognized values. + description: APIVersion defines the versioned schema of this representation of an object. type: string kind: - description: Kind is a string value representing the REST resource this object represents. Servers may infer this from the endpoint the client submits requests to. Cannot be updated. In CamelCase. + description: Kind is a string value representing the REST resource this object represents. type: string metadata: type: object spec: + type: object description: PhaseSecretSpec defines the desired state of PhaseSecret properties: + phaseApp: + description: The Phase application to fetch secrets from. + type: string phaseAppEnv: description: The environment variable representing the app environment in Phase. type: string - default: "development" + default: "production" + phaseAppEnvTag: + description: Tag for filtering secrets in the specified Phase app environment. + type: string authentication: + type: object properties: serviceToken: + type: object + required: + - serviceTokenSecretReference properties: serviceTokenSecretReference: + type: object + required: + - secretName + - secretNamespace properties: secretName: description: The name of the Kubernetes Secret. @@ -44,14 +59,6 @@ spec: secretNamespace: description: The namespace where the Kubernetes Secret is located. type: string - required: - - secretName - - secretNamespace - type: object - required: - - serviceTokenSecretReference - type: object - type: object phaseHost: description: Phase host to pull secrets from. type: string @@ -60,6 +67,11 @@ spec: description: References to multiple managed Kubernetes Secrets. type: array items: + type: object + required: + - secretName + - secretNamespace + - secretType properties: secretName: description: The name of the Kubernetes Secret. @@ -67,68 +79,100 @@ spec: secretNamespace: description: The namespace where the Kubernetes Secret is located. type: string - required: - - secretName - - secretNamespace - type: object + secretType: + description: The type of the Kubernetes Secret. + type: string + default: "Opaque" + enum: + - "Opaque" + - "kubernetes.io/tls" + - "kubernetes.io/service-account-token" + - "kubernetes.io/dockercfg" + - "kubernetes.io/dockerconfigjson" + - "kubernetes.io/basic-auth" + - "kubernetes.io/ssh-auth" + - "bootstrap.kubernetes.io/token" + processors: + description: Processors to transform the data during ingestion. + type: object + additionalProperties: + type: object + properties: + asName: + description: The mapped name of the field in the managed secret. + type: string + nameTransformer: + description: The format for transforming the secret key name. + type: string + enum: + - "camel" + - "upper-camel" + - "lower-snake" + - "tf-var" + - "dotnet-env" + - "lower-kebab" + type: + description: The type of process to be performed. + type: string + enum: + - "plain" + - "base64" + default: "plain" pollingInterval: description: Interval at which to poll for secret updates. type: integer default: 60 - minimum: 15 required: - - phaseAppEnv - - managedSecretReferences - - phaseHost - - pollingInterval - type: object + - phaseApp + - managedSecretReferences + - phaseHost status: description: PhaseSecretStatus defines the observed state of PhaseSecret + type: object properties: conditions: + description: Conditions representing the current state of the resource. + type: array items: - description: Condition contains details for one aspect of the current state of this API Resource. + type: object + required: + - lastTransitionTime + - message + - reason + - status + - type properties: lastTransitionTime: - description: lastTransitionTime is the last time the condition transitioned from one status to another. - format: date-time + description: Last time the condition transitioned from one status to another. type: string + format: date-time message: - description: message is a human readable message indicating details about the transition. - maxLength: 32768 + description: Human-readable message indicating details about the transition. type: string + maxLength: 32768 observedGeneration: - description: observedGeneration represents the .metadata.generation that the condition was set based upon. + description: Generation that the condition was set based upon. + type: integer format: int64 minimum: 0 - type: integer reason: - description: reason contains a programmatic identifier indicating the reason for the conditions last transition. + description: Programmatic identifier for the reason of the condition's last transition. + type: string maxLength: 1024 minLength: 1 - type: string status: - description: status of the condition, one of True, False, Unknown. - enum: - - "True" - - "False" - - "Unknown" + description: Status of the condition. type: string + enum: + - "True" + - "False" + - "Unknown" type: - description: type of condition in CamelCase or in foo.example.com/CamelCase. - maxLength: 316 + description: Type of condition in CamelCase. type: string - required: - - lastTransitionTime - - message - - reason - - status - - type - type: object - type: array + maxLength: 316 required: - - conditions - type: object + - conditions served: true storage: true subresources: diff --git a/phase_kubernetes_operator.py b/phase_kubernetes_operator.py deleted file mode 100644 index 25fac81..0000000 --- a/phase_kubernetes_operator.py +++ /dev/null @@ -1,70 +0,0 @@ -import kopf -import base64 -import kubernetes.client -from kubernetes.client.rest import ApiException -from src.cmd.secrets.export import phase_secrets_env_export - -@kopf.timer('secrets.phase.dev', 'v1alpha1', 'phasesecrets', interval=60) -def sync_secrets(spec, name, namespace, logger, **kwargs): - try: - # Extract information from the spec - managed_secret_references = spec.get('managedSecretReferences', []) - phase_host = spec.get('phaseHost', 'https://console.phase.dev') - phase_app_env = spec.get('phaseAppEnv', 'production') - - # Initialize Kubernetes client - api_instance = kubernetes.client.CoreV1Api() - - # Fetch and process the Phase service token from the Kubernetes managed secret - service_token_secret_name = spec.get('authentication', {}).get('serviceToken', {}).get('serviceTokenSecretReference', {}).get('secretName', 'phase-service-token') - api_response = api_instance.read_namespaced_secret(service_token_secret_name, namespace) - token = api_response.data['token'] - service_token = base64.b64decode(token).decode('utf-8') - - # Fetch secrets from the Phase application - fetched_secrets_dict = phase_secrets_env_export( - phase_service_token=service_token, - phase_service_host=phase_host, - env_name=phase_app_env, - export_type='k8' - ) - - # Update the Kubernetes managed secrets -- update if: available, create if: unavailable. - for secret_reference in managed_secret_references: - secret_name = secret_reference['secretName'] - secret_namespace = secret_reference.get('secretNamespace', namespace) - - try: - # Check if the secret exists in Kubernetes - api_instance.read_namespaced_secret(name=secret_name, namespace=secret_namespace) - - # Update the secret with the new data - api_instance.replace_namespaced_secret( - name=secret_name, - namespace=secret_namespace, - body=kubernetes.client.V1Secret( - metadata=kubernetes.client.V1ObjectMeta(name=secret_name), - data=fetched_secrets_dict - ) - ) - logger.info(f"Updated secret {secret_name} in namespace {secret_namespace}") - except ApiException as e: - if e.status == 404: - # Secret does not exist in kubernetes, create it - api_instance.create_namespaced_secret( - namespace=secret_namespace, - body=kubernetes.client.V1Secret( - metadata=kubernetes.client.V1ObjectMeta(name=secret_name), - data=fetched_secrets_dict - ) - ) - logger.info(f"Created secret {secret_name} in namespace {secret_namespace}") - else: - logger.error(f"Failed to update secret {secret_name} in namespace {secret_namespace}: {e}") - - logger.info(f"Secrets for PhaseSecret {name} have been successfully updated in namespace {namespace}") - - except ApiException as e: - logger.error(f"Failed to fetch secrets for PhaseSecret {name} in namespace {namespace}: {e}") - except Exception as e: - logger.error(f"Unexpected error when handling PhaseSecret {name} in namespace {namespace}: {e}") diff --git a/requirements.txt b/requirements.txt index 7b27e3e..8e80979 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,7 +1,7 @@ -keyring==24.2.0 -questionary==2.0.0 cffi==1.15.1 requests==2.31.0 PyNaCl==1.5.0 kopf==1.36.2 -kubernetes==28.1.0 \ No newline at end of file +kubernetes==28.1.0 +pytest==7.3.1 +pyyaml==6.0.1 \ No newline at end of file diff --git a/src/cmd/init.py b/src/cmd/init.py deleted file mode 100644 index 6206b85..0000000 --- a/src/cmd/init.py +++ /dev/null @@ -1,78 +0,0 @@ -import os -import sys -import json -import questionary -from src.utils.phase_io import Phase - -from src.utils.const import PHASE_ENV_CONFIG - -# Initializes a .phase.json in the root of the dir of where the command is run -def phase_init(): - """ - Initializes the Phase application by linking the user's project to a Phase app. - """ - # Initialize the Phase class - phase = Phase() - - try: - data = phase.init() - except ValueError as err: - print(err) - return - - try: - # Present a list of apps to the user and let them choose one - app_choices = [app['name'] for app in data['apps']] - app_choices.append('Exit') # Add Exit option at the end - - selected_app_name = questionary.select( - 'Select an App:', - choices=app_choices - ).ask() - - # Check if the user selected the "Exit" option - if selected_app_name == 'Exit': - sys.exit(0) - - # Find the selected app's details - selected_app_details = next( - (app for app in data['apps'] if app['name'] == selected_app_name), - None - ) - - # Check if selected_app_details is None (no matching app found) - if selected_app_details is None: - sys.exit(1) - - # Extract the default environment ID for the environment named "Development" - default_env = next( - (env_key for env_key in selected_app_details['environment_keys'] if env_key['environment']['name'] == 'Development'), - None - ) - - if not default_env: - raise ValueError("No 'Development' environment found.") - - # Save the selected app’s environment details to the .phase.json file - phase_env = { - "version": "1", - "phaseApp": selected_app_name, - "appId": selected_app_details['id'], # Save the app id - "defaultEnv": default_env['environment']['name'], - } - - # Create .phase.json - with open(PHASE_ENV_CONFIG, 'w') as f: - json.dump(phase_env, f, indent=2) - os.chmod(PHASE_ENV_CONFIG, 0o600) - - print("✅ Initialization completed successfully.") - - except KeyboardInterrupt: - # Handle the Ctrl+C event quietly - sys.exit(0) - except Exception as e: - # Handle other exceptions if needed - print(e) - sys.exit(1) - diff --git a/src/cmd/secrets/create.py b/src/cmd/secrets/create.py deleted file mode 100644 index b69d3ee..0000000 --- a/src/cmd/secrets/create.py +++ /dev/null @@ -1,43 +0,0 @@ -import sys -import getpass -from src.utils.phase_io import Phase -from src.cmd.secrets.list import phase_list_secrets - -def phase_secrets_create(key=None, env_name=None, phase_app=None): - """ - Creates a new secret, encrypts it, and saves it in PHASE_SECRETS_DIR. - - Args: - key (str, optional): The key of the new secret. Defaults to None. - env_name (str, optional): The name of the environment where the secret will be created. Defaults to None. - phase_app (str, optional): The name of the Phase application. Defaults to None. - """ - - # Initialize the Phase class - phase = Phase() - - # If the key is not passed as an argument, prompt user for input - if key is None: - key = input("🗝️ Please enter the key: ") - key = key.upper() - - # Check if input is being piped - if sys.stdin.isatty(): - value = getpass.getpass("✨ Please enter the value (hidden): ") - else: - value = sys.stdin.read().strip() - - try: - # Encrypt and send secret to the backend using the `create` method - response = phase.create(key_value_pairs=[(key, value)], env_name=env_name, app_name=phase_app) - - # Check the response status code - if response.status_code == 200: - # Call the phase_list_secrets function to list the secrets - phase_list_secrets(show=False, env_name=env_name) - else: - # Print an error message if the response status code indicates an error - print(f"Error: Failed to create secret. HTTP Status Code: {response.status_code}") - - except ValueError as e: - print(e) diff --git a/src/cmd/secrets/delete.py b/src/cmd/secrets/delete.py deleted file mode 100644 index 49ee53d..0000000 --- a/src/cmd/secrets/delete.py +++ /dev/null @@ -1,38 +0,0 @@ -from src.utils.phase_io import Phase -from src.cmd.secrets.list import phase_list_secrets - -# Deletes encrypted secrets based on key value pairs -def phase_secrets_delete(keys_to_delete=[], env_name=None, phase_app=None): - """ - Deletes encrypted secrets based on key values. - - Args: - keys_to_delete (list, optional): List of keys to delete. Defaults to empty list. - env_name (str, optional): The name of the environment from which secrets will be deleted. Defaults to None. - phase_app (str, optional): The name of the Phase application. Defaults to None. - """ - # Initialize the Phase class - phase = Phase() - - # If keys_to_delete is empty, request user input - if not keys_to_delete: - keys_to_delete_input = input("Please enter the keys to delete (separate multiple keys with a space): ") - keys_to_delete = keys_to_delete_input.split() - - # Convert each key to uppercase - keys_to_delete = [key.upper() for key in keys_to_delete] - - try: - # Delete keys and get the list of keys not found - keys_not_found = phase.delete(env_name=env_name, keys_to_delete=keys_to_delete, app_name=phase_app) - - if keys_not_found: - print(f"⚠️ Warning: The following keys were not found: {', '.join(keys_not_found)}") - else: - print("Successfully deleted the secrets.") - - # List remaining secrets (censored by default) - phase_list_secrets(show=False, env_name=env_name) - - except ValueError as e: - print(e) \ No newline at end of file diff --git a/src/cmd/secrets/export.py b/src/cmd/secrets/export.py deleted file mode 100644 index e8e8633..0000000 --- a/src/cmd/secrets/export.py +++ /dev/null @@ -1,62 +0,0 @@ -import sys -import re -import base64 -from src.utils.phase_io import Phase -from src.utils.const import cross_env_pattern, local_ref_pattern - -def phase_secrets_env_export(phase_service_token=None, phase_service_host=None, env_name=None, phase_app=None, keys=None, export_type='plain'): - """ - Decrypts and exports secrets to a specified format based on the provided environment and keys. - The function also resolves any references to other secrets, whether they are within the same environment - (local references) or from a different environment (cross-environment references). - - Args: - phase_service_token (str): The service token for authentication. - phase_service_host (str): The Phase service host URL. - env_name (str, optional): The name of the environment from which secrets are fetched. - phase_app (str, optional): The name of the Phase application. - keys (list, optional): List of keys for which to fetch the secrets. - export_type (str, optional): The export type, either 'plain' for .env format or 'k8' for Kubernetes format. - """ - - phase = Phase(init=False, pss=phase_service_token, host=phase_service_host) - - try: - secrets = phase.get(env_name=env_name, keys=keys, app_name=phase_app) - except ValueError as e: - print(f"Failed to fetch secrets: The environment '{env_name}' either does not exist or you do not have access to it.") - sys.exit(1) - - secrets_dict = {secret["key"]: secret["value"] for secret in secrets} - - for key, value in secrets_dict.items(): - cross_env_matches = re.findall(cross_env_pattern, value) - for ref_env, ref_key in cross_env_matches: - try: - ref_secret = phase.get(env_name=ref_env, keys=[ref_key], app_name=phase_app)[0] - resolved_value = ref_secret['value'] - if export_type == 'k8': - resolved_value = base64.b64encode(resolved_value.encode()).decode() - value = value.replace(f"${{{ref_env}.{ref_key}}}", resolved_value) - except ValueError as e: - print(f"# Warning: The environment '{ref_env}' for key '{key}' either does not exist or you do not have access to it.") - - local_ref_matches = re.findall(local_ref_pattern, value) - for ref_key in local_ref_matches: - resolved_value = secrets_dict.get(ref_key, "") - if export_type == 'k8': - resolved_value = base64.b64encode(resolved_value.encode()).decode() - value = value.replace(f"${{{ref_key}}}", resolved_value) - - # Encode values if Kubernetes format is selected - if export_type == 'k8': - secrets_dict[key] = base64.b64encode(value.encode()).decode() - else: - secrets_dict[key] = value - - # Return the dictionary for Kubernetes, or print for .env - if export_type == 'k8': - return secrets_dict - else: - for key, value in secrets_dict.items(): - print(f'{key}="{value}"') \ No newline at end of file diff --git a/src/cmd/secrets/fetch.py b/src/cmd/secrets/fetch.py new file mode 100644 index 0000000..a6b18a4 --- /dev/null +++ b/src/cmd/secrets/fetch.py @@ -0,0 +1,18 @@ +import sys +from utils.phase_io import Phase + +def phase_secrets_fetch(phase_service_token=None, phase_service_host=None, env_name=None, phase_app=None, keys=None, tags=None): + """ + Fetch and return secrets based on the provided environment, keys, and tags. + """ + + phase = Phase(init=False, pss=phase_service_token, host=phase_service_host) + + try: + secrets = phase.get(env_name=env_name, keys=keys, tag=tags, app_name=phase_app) + except ValueError as e: + print(f"Failed to fetch secrets: {e}") + sys.exit(1) + + # Return secrets as a dictionary + return {secret["key"]: secret["value"] for secret in secrets} diff --git a/src/cmd/secrets/get.py b/src/cmd/secrets/get.py deleted file mode 100644 index 50edb9c..0000000 --- a/src/cmd/secrets/get.py +++ /dev/null @@ -1,34 +0,0 @@ -from src.utils.phase_io import Phase -from src.utils.misc import render_table - -def phase_secrets_get(key, env_name=None, phase_app=None): - """ - Fetch and print a single secret based on a given key. - - :param key: The key associated with the secret to fetch. - :param env_name: The name of the environment, if any. Defaults to None. - """ - - # Initialize the Phase class - phase = Phase() - - try: - key = key.upper() - # Here we wrap the key in a list since the get method now expects a list of keys - secrets_data = phase.get(env_name=env_name, keys=[key], app_name=phase_app) - - # Find the specific secret for the given key - secret_data = next((secret for secret in secrets_data if secret["key"] == key), None) - - # Check that secret_data was found and is a dictionary - if not secret_data: - print("🔍 Secret not found...") - return - if not isinstance(secret_data, dict): - raise ValueError("Unexpected format: secret data is not a dictionary") - - # Print the secret data in a table-like format - render_table([secret_data], show=True) - - except ValueError as e: - print(e) \ No newline at end of file diff --git a/src/cmd/secrets/import_env.py b/src/cmd/secrets/import_env.py deleted file mode 100644 index 89054cc..0000000 --- a/src/cmd/secrets/import_env.py +++ /dev/null @@ -1,52 +0,0 @@ -import sys -from src.utils.phase_io import Phase -from src.utils.misc import render_table, get_default_user_id, sanitize_value - -def phase_secrets_env_import(env_file, env_name=None, phase_app=None): - """ - Imports existing environment variables and secrets from a user's .env file. - - Args: - env_file (str): Path to the .env file. - env_name (str, optional): The name of the environment to which secrets should be saved. Defaults to None. - phase_app (str, optional): The name of the Phase application. Defaults to None. - - Raises: - FileNotFoundError: If the provided .env file is not found. - """ - # Initialize the Phase class - phase = Phase() - - # Parse the .env file - secrets = [] - try: - with open(env_file) as f: - for line in f: - # Ignore lines that start with a '#' or don't contain an '=' - line = line.strip() - if line.startswith('#') or '=' not in line: - continue - key, _, value = line.partition('=') - secrets.append((key.strip().upper(), sanitize_value(value.strip()))) - - except FileNotFoundError: - print(f"Error: The file {env_file} was not found.") - sys.exit(1) - - try: - # Encrypt and send secrets to the backend using the `create` method - response = phase.create(key_value_pairs=secrets, env_name=env_name, app_name=phase_app) - - # Check the response status code - if response.status_code == 200: - print(f"Successfully imported and encrypted {len(secrets)} secrets.") - if env_name == None: - print("To view them please run: phase secrets list") - else: - print(f"To view them please run: phase secrets list --env {env_name}") - else: - # Print an error message if the response status code indicates an error - print(f"Error: Failed to import secrets. HTTP Status Code: {response.status_code}") - - except ValueError as e: - print(e) \ No newline at end of file diff --git a/src/cmd/secrets/list.py b/src/cmd/secrets/list.py deleted file mode 100644 index 7d4fc4b..0000000 --- a/src/cmd/secrets/list.py +++ /dev/null @@ -1,33 +0,0 @@ -from src.utils.phase_io import Phase -from src.utils.misc import render_table - -def phase_list_secrets(show=False, env_name=None, phase_app=None): - """ - Lists the secrets fetched from Phase for the specified environment. - - Args: - show (bool, optional): Whether to show the decrypted secrets. Defaults to False. - env_name (str, optional): The name of the environment from which secrets are fetched. Defaults to None. - phase_app (str, optional): The name of the Phase application. Defaults to None. - - Raises: - ValueError: If the returned secrets data from Phase is not in the expected list format. - """ - # Initialize the Phase class - phase = Phase() - - try: - secrets_data = phase.get(env_name=env_name, app_name=phase_app) - - # Check that secrets_data is a list of dictionaries - if not isinstance(secrets_data, list): - raise ValueError("Unexpected format: secrets data is not a list") - - # Render the table - render_table(secrets_data, show=show) - - if not show: - print("\n🥽 To uncover the secrets, use: phase secrets list --show") - - except ValueError as e: - print(e) diff --git a/src/cmd/secrets/update.py b/src/cmd/secrets/update.py deleted file mode 100644 index 5260432..0000000 --- a/src/cmd/secrets/update.py +++ /dev/null @@ -1,56 +0,0 @@ -import sys -import getpass -from src.utils.phase_io import Phase -from src.cmd.secrets.list import phase_list_secrets - -def phase_secrets_update(key, env_name=None, phase_app=None): - """ - Updates a secret with a new value. - - Args: - key (str): The key of the secret to update. - env_name (str, optional): The name of the environment in which the secret is located. Defaults to None. - phase_app (str, optional): The name of the Phase application. Defaults to None. - """ - # Initialize the Phase class - phase = Phase() - - # Convert the key to uppercase - key = key.upper() - - try: - # Pass the key within a list to the get method - secrets_data = phase.get(env_name=env_name, keys=[key], app_name=phase_app) - - # Find the specific secret for the given key - secret_data = next((secret for secret in secrets_data if secret["key"] == key), None) - - # If no matching secret found, raise an error - if not secret_data: - print(f"No secret found for key: {key}") - return - - except ValueError as e: - print(e) - - # Check if input is being piped - if sys.stdin.isatty(): - new_value = getpass.getpass(f"Please enter the new value for {key} (hidden): ") - else: - new_value = sys.stdin.read().strip() - - try: - # Call the update method of the Phase class - response = phase.update(env_name=env_name, key=key, value=new_value, app_name=phase_app) - - # Check the response status code (assuming the update method returns a response with a status code) - if response == "Success": - print("Successfully updated the secret. ") - else: - print(f"Error: Failed to update secret. HTTP Status Code: {response.status_code}") - - # List remaining secrets (censored by default) - phase_list_secrets(show=False, env_name=env_name) - - except ValueError: - print(f"⚠️ Error occurred while updating the secret.") diff --git a/src/cmd/users/__init__.py b/src/cmd/users/__init__.py deleted file mode 100644 index e69de29..0000000 diff --git a/src/cmd/users/keyring.py b/src/cmd/users/keyring.py deleted file mode 100644 index 17bb71f..0000000 --- a/src/cmd/users/keyring.py +++ /dev/null @@ -1,9 +0,0 @@ -import keyring - -def show_keyring_info(): - kr = keyring.get_keyring() - print(f"Current keyring backend: {kr.__class__.__name__}") - print("Supported keyring backends:") - for backend in keyring.backend.get_all_keyring(): - print(f"- {backend.__class__.__name__}") - \ No newline at end of file diff --git a/src/cmd/users/logout.py b/src/cmd/users/logout.py deleted file mode 100644 index 5e5098a..0000000 --- a/src/cmd/users/logout.py +++ /dev/null @@ -1,28 +0,0 @@ -import os -import sys -import shutil -import keyring -from src.utils.const import PHASE_SECRETS_DIR -from src.utils.misc import get_default_user_id - -def phase_cli_logout(purge=False): - if purge: - all_user_ids = get_default_user_id(all_ids=True) - for user_id in all_user_ids: - keyring.delete_password(f"phase-cli-user-{user_id}", "pss") - - # Delete PHASE_SECRETS_DIR if it exists - if os.path.exists(PHASE_SECRETS_DIR): - shutil.rmtree(PHASE_SECRETS_DIR) - print("Logged out and purged all local data.") - else: - print("No local data found to purge.") - - else: - # For the default user - pss = keyring.get_password("phase", "pss") - if not pss: - print("No configuration found. Please run 'phase auth' to set up your configuration.") - sys.exit(1) - keyring.delete_password("phase", "pss") - print("Logged out successfully.") \ No newline at end of file diff --git a/src/cmd/users/whoami.py b/src/cmd/users/whoami.py deleted file mode 100644 index f6c70c6..0000000 --- a/src/cmd/users/whoami.py +++ /dev/null @@ -1,36 +0,0 @@ -import json -from src.utils.const import CONFIG_FILE - - -def phase_users_whoami(): - """ - Print details of the default user. - """ - try: - # Load the config file - with open(CONFIG_FILE, 'r') as f: - config_data = json.load(f) - - # Extract the default user ID - default_user_id = config_data.get("default-user") - - if not default_user_id: - print("No default user set.") - return - - # Find the user details matching the default user ID - default_user = next((user for user in config_data["phase-users"] if user["id"] == default_user_id), None) - - if not default_user: - print("Default user not found in the users list.") - return - - # Print the default user details - print(f"✉️` Email: {default_user['email']}") - print(f"🙋 User ID: {default_user['id']}") - print(f"☁️` Host: {default_user['host']}") - - except FileNotFoundError: - print(f"Config file not found at {CONFIG_FILE}.") - except json.JSONDecodeError: - print("Error reading the config file. The file may be corrupted or not in the expected format.") diff --git a/src/main.py b/src/main.py old mode 100755 new mode 100644 index f676baa..62d6f9a --- a/src/main.py +++ b/src/main.py @@ -1,227 +1,150 @@ -#!/bin/python -import os -import sys -import traceback -import argparse -from argparse import RawTextHelpFormatter -from src.cmd.open_console import phase_open_web -from src.cmd.update import phase_cli_update -from src.cmd.users.whoami import phase_users_whoami -from src.cmd.users.keyring import show_keyring_info -from src.cmd.users.logout import phase_cli_logout -from src.cmd.run import phase_run_inject -from src.cmd.init import phase_init -from src.cmd.auth import phase_auth -from src.cmd.secrets.list import phase_list_secrets -from src.cmd.secrets.get import phase_secrets_get -from src.cmd.secrets.export import phase_secrets_env_export -from src.cmd.secrets.import_env import phase_secrets_env_import -from src.cmd.secrets.delete import phase_secrets_delete -from src.cmd.secrets.create import phase_secrets_create -from src.cmd.secrets.update import phase_secrets_update - -from src.utils.const import __version__ -from src.utils.const import phaseASCii, description - - -def print_phase_cli_version(): - print(f"Version: {__version__}") - -def print_phase_cli_version_only(): - print(f"{__version__}") - -PHASE_DEBUG = os.environ.get('PHASE_DEBUG', 'False').lower() == 'true' - -class CustomHelpFormatter(argparse.HelpFormatter): - def __init__(self, prog): - super().__init__(prog, max_help_position=15, width=sys.maxsize) # set the alignment and wrapping width - - def add_usage(self, usage, actions, groups, prefix=None): - # Override to prevent the default behavior - return - - def _format_action(self, action): - # If the action type is subparsers, skip its formatting - if isinstance(action, argparse._SubParsersAction): - # Filter out the metavar option - action.metavar = None - parts = super(CustomHelpFormatter, self)._format_action(action) - # remove the unnecessary line - if "{auth,init,run,secrets,users,console,update}" in parts: - parts = parts.replace("{auth,init,run,secrets,users,console,update}", "") - return parts - -class HelpfulParser(argparse.ArgumentParser): - def __init__(self, *args, **kwargs): - kwargs['formatter_class'] = CustomHelpFormatter - super().__init__(*args, **kwargs) - - def error(self, message): - print (description) - print(phaseASCii) - self.print_help() - sys.exit(2) - - def add_subparsers(self, **kwargs): - kwargs['title'] = 'Commands' - return super(HelpfulParser, self).add_subparsers(**kwargs) - -def main (): - env_help = "Environment name eg. dev, staging, production" - +import kopf +import base64 +import datetime +import kubernetes.client +from kubernetes.client.rest import ApiException +from kubernetes.client import AppsV1Api +from kubernetes.client import CoreV1Api +from cmd.secrets.fetch import phase_secrets_fetch +from utils.const import REDEPLOY_ANNOTATION +from utils.misc import transform_name + + +@kopf.timer('secrets.phase.dev', 'v1alpha1', 'phasesecrets', interval=60) +def sync_secrets(spec, name, namespace, logger, **kwargs): try: - parser = HelpfulParser(prog='phase-cli', formatter_class=RawTextHelpFormatter) - parser.add_argument('--version', '-v', action='version', version=__version__) - - # Create subparsers with title 'Available Commands:' - subparsers = parser.add_subparsers(dest='command', required=True) - - # Auth command - auth_parser = subparsers.add_parser('auth', help='💻 Authenticate with Phase') - auth_parser.add_argument('--mode', choices=['token', 'webauth'], default='webauth', help='Mode of authentication. Default: webauth') - - # Init command - init_parser = subparsers.add_parser('init', help='🔗 Link your project with your Phase app') - - # Run command - run_parser = subparsers.add_parser('run', help='🚀 Run and inject secrets to your app') - run_parser.add_argument('command_to_run', nargs=argparse.REMAINDER, help='Command to be run. Ex. phase run yarn dev') - run_parser.add_argument('--env', type=str, help=env_help) - - # Secrets command - secrets_parser = subparsers.add_parser('secrets', help='🗝️` Manage your secrets') - secrets_subparsers = secrets_parser.add_subparsers(dest='secrets_command', required=True) - - # Secrets list command - secrets_list_parser = secrets_subparsers.add_parser('list', help='📇 List all the secrets') - secrets_list_parser.add_argument('--show', action='store_true', help='Return secrets uncensored') - secrets_list_parser.add_argument('--env', type=str, help=env_help) - secrets_list_parser.epilog = ( - "🔗 : Indicates that the secret value references another secret within the same environment.\n" - "⛓️` : Indicates a cross-environment reference, where a secret in the current environment references a secret from another environment." - ) - - # Secrets get command - secrets_get_parser = secrets_subparsers.add_parser('get', help='🔍 Get a specific secret by key') - secrets_get_parser.add_argument('key', type=str, help='The key associated with the secret to fetch') - secrets_get_parser.add_argument('--env', type=str, help=env_help) - - # Secrets create command - secrets_create_parser = secrets_subparsers.add_parser( - 'create', - description='💳 Create a new secret. Optionally, you can provide the secret value via stdin.\n\nExample:\n cat ~/.ssh/id_rsa | phase secrets create SSH_PRIVATE_KEY', - help='💳 Create a new secret' - ) - secrets_create_parser.add_argument( - 'key', - type=str, - nargs='?', - help='The key for the secret to be created. (Will be converted to uppercase.) If the value is not provided as an argument, it will be read from stdin.' + api_instance = CoreV1Api() + managed_secret_references = spec.get('managedSecretReferences', []) + phase_host = spec.get('phaseHost', 'https://console.phase.dev') + phase_app = spec.get('phaseApp') + phase_app_env = spec.get('phaseAppEnv', 'production') + phase_app_env_tag = spec.get('phaseAppEnvTag') + service_token_secret_name = spec.get('authentication', {}).get('serviceToken', {}).get('serviceTokenSecretReference', {}).get('secretName', 'phase-service-token') + + api_response = api_instance.read_namespaced_secret(service_token_secret_name, namespace) + service_token = base64.b64decode(api_response.data['token']).decode('utf-8') + + phase_secrets_dict = phase_secrets_fetch( + phase_service_token=service_token, + phase_service_host=phase_host, + phase_app=phase_app, + env_name=phase_app_env, + tags=phase_app_env_tag ) - secrets_create_parser.add_argument('--env', type=str, help=env_help) - # Secrets update command - secrets_update_parser = secrets_subparsers.add_parser( - 'update', - description='📝 Update an existing secret. Optionally, you can provide the new secret value via stdin.\n\nExample:\n cat ~/.ssh/id_ed25519 | phase secrets update SSH_PRIVATE_KEY', - help='📝 Update an existing secret' - ) - secrets_update_parser.add_argument( - 'key', - type=str, - help='The key associated with the secret to update. If the new value is not provided as an argument, it will be read from stdin.' - ) - secrets_update_parser.add_argument('--env', type=str, help=env_help) - - # Secrets delete command - secrets_delete_parser = secrets_subparsers.add_parser('delete', help='🗑️` Delete a secret') - secrets_delete_parser.add_argument('keys', nargs='*', help='Keys to be deleted') - secrets_delete_parser.add_argument('--env', type=str, help=env_help) - - # Secrets import command - secrets_import_parser = secrets_subparsers.add_parser('import', help='📩 Import secrets from a .env file') - secrets_import_parser.add_argument('env_file', type=str, help='The .env file to import') - secrets_import_parser.add_argument('--env', type=str, help=env_help) - - # Secrets export command - secrets_export_parser = secrets_subparsers.add_parser('export', help='🥡 Export secrets in a dotenv format') - secrets_export_parser.add_argument('keys', nargs='*', help='List of keys separated by space', default=None) - secrets_export_parser.add_argument('--env', type=str, help=env_help) - - # Users command - users_parser = subparsers.add_parser('users', help='👥 Manage users and accounts') - users_subparsers = users_parser.add_subparsers(dest='users_command', required=True) - - # Users whoami command - whoami_parser = users_subparsers.add_parser('whoami', help='🙋 See details of the current user') - - # Users logout command - logout_parser = users_subparsers.add_parser('logout', help='🏃 Logout from phase-cli') - logout_parser.add_argument('--purge', action='store_true', help='Purge all local data') - - # Users keyring command - keyring_parser = users_subparsers.add_parser('keyring', help='🔐 Display information about the Phase keyring') - - # Web command - web_parser = subparsers.add_parser('console', help='🖥️` Open the Phase Console in your browser') - - # Check if the operating system is Linux before adding the update command - if sys.platform == "linux": - update_parser = subparsers.add_parser('update', help='🆙 Update the Phase CLI to the latest version') - - args = parser.parse_args() - - if args.command == 'auth': - phase_auth(args.mode) - sys.exit(0) - elif args.command == 'init': - phase_init() - elif args.command == 'run': - command = ' '.join(args.command_to_run) - phase_run_inject(command, env_name=args.env) - elif args.command == 'console': - phase_open_web() - elif args.command == 'update': - phase_cli_update() - sys.exit(0) - elif args.command == 'users': - if args.users_command == 'whoami': - phase_users_whoami() - elif args.users_command == 'logout': - phase_cli_logout(args.purge) - elif args.users_command == 'keyring': - show_keyring_info() - else: - print("Unknown users sub-command: " + args.users_command) - parser.print_help() - sys.exit(1) - elif args.command == 'secrets': - if args.secrets_command == 'list': - phase_list_secrets(args.show, env_name=args.env) - elif args.secrets_command == 'get': - phase_secrets_get(args.key, env_name=args.env) - elif args.secrets_command == 'create': - phase_secrets_create(args.key, env_name=args.env) - elif args.secrets_command == 'delete': - phase_secrets_delete(args.keys, env_name=args.env) - elif args.secrets_command == 'import': - phase_secrets_env_import(args.env_file, env_name=args.env) - elif args.secrets_command == 'export': - phase_secrets_env_export(env_name=args.env, keys=args.keys) - elif args.secrets_command == 'update': - phase_secrets_update(args.key, env_name=args.env) - else: - print("Unknown secrets sub-command: " + args.secrets_command) - parser.print_help() - sys.exit(1) - except KeyboardInterrupt: - print("\nStopping Phase.") - sys.exit(0) - + secret_changed = False + for secret_reference in managed_secret_references: + secret_name = secret_reference['secretName'] + secret_namespace = secret_reference.get('secretNamespace', namespace) + secret_type = secret_reference.get('secretType', 'Opaque') + name_transformer = secret_reference.get('nameTransformer', 'upper_snake') + processors = secret_reference.get('processors', {}) + + processed_secrets = process_secrets(phase_secrets_dict, processors, secret_type, name_transformer) + + try: + existing_secret = api_instance.read_namespaced_secret(name=secret_name, namespace=secret_namespace) + if existing_secret.type != secret_type or existing_secret.data != processed_secrets: + api_instance.delete_namespaced_secret(name=secret_name, namespace=secret_namespace) + create_secret(api_instance, secret_name, secret_namespace, secret_type, processed_secrets, logger) + secret_changed = True + except ApiException as e: + if e.status == 404: + create_secret(api_instance, secret_name, secret_namespace, secret_type, processed_secrets, logger) + secret_changed = True + else: + logger.error(f"Failed to update secret {secret_name} in namespace {secret_namespace}: {e}") + + if secret_changed: + affected_secrets = [ref['secretName'] for ref in managed_secret_references] + redeploy_affected_deployments(namespace, affected_secrets, logger, api_instance) + + logger.info(f"Secrets for PhaseSecret {name} have been successfully updated in namespace {namespace}") + + except ApiException as e: + logger.error(f"Failed to fetch secrets for PhaseSecret {name} in namespace {namespace}: {e}") except Exception as e: - # Always print the full traceback, regardless of PHASE_DEBUG - traceback.print_exc() - sys.exit(1) + logger.error(f"Unexpected error when handling PhaseSecret {name} in namespace {namespace}: {e}") -if __name__ == '__main__': - main() +def redeploy_affected_deployments(namespace, affected_secrets, logger, api_instance): + try: + apps_v1_api = AppsV1Api(api_instance.api_client) + deployments = apps_v1_api.list_namespaced_deployment(namespace) + for deployment in deployments.items: + if should_redeploy(deployment, affected_secrets): + patch_deployment_for_redeploy(deployment, namespace, apps_v1_api, logger) + except ApiException as e: + logger.error(f"Error listing deployments in namespace {namespace}: {e}") + +def should_redeploy(deployment, affected_secrets): + if not (deployment.metadata.annotations and REDEPLOY_ANNOTATION in deployment.metadata.annotations): + return False + + deployment_secrets = extract_deployment_secrets(deployment) + return any(secret in affected_secrets for secret in deployment_secrets) + +def extract_deployment_secrets(deployment): + secrets = [] + for container in deployment.spec.template.spec.containers: + if container.env_from: + for env_from in container.env_from: + if env_from.secret_ref: + secrets.append(env_from.secret_ref.name) + return secrets + +def patch_deployment_for_redeploy(deployment, namespace, apps_v1_api, logger): + try: + timestamp = datetime.datetime.utcnow().isoformat() + patch_body = { + "spec": { + "template": { + "metadata": { + "annotations": { + "phase.autoredeploy.timestamp": timestamp + } + } + } + } + } + apps_v1_api.patch_namespaced_deployment(name=deployment.metadata.name, namespace=namespace, body=patch_body) + logger.info(f"Triggered redeployment of {deployment.metadata.name} in namespace {namespace}") + except ApiException as e: + logger.error(f"Failed to patch deployment {deployment.metadata.name} in namespace {namespace}: {e}") + +def process_secrets(fetched_secrets, processors, secret_type, name_transformer): + processed_secrets = {} + for key, value in fetched_secrets.items(): + processor_info = processors.get(key, {}) + processor_type = processor_info.get('type', 'plain') + as_name = processor_info.get('asName', key) # Use asName for mapping + + # Check and process the value based on the processor type + if processor_type == 'base64': + # Assume value is already base64 encoded; do not re-encode + processed_value = value + elif processor_type == 'plain': + # Base64 encode the value + processed_value = base64.b64encode(value.encode()).decode() + else: + # Default to plain processing if processor type is not recognized + processed_value = base64.b64encode(value.encode()).decode() + + # Map the processed value to the asName + processed_secrets[as_name] = processed_value + + return processed_secrets + +def create_secret(api_instance, secret_name, secret_namespace, secret_type, secret_data, logger): + try: + response = api_instance.create_namespaced_secret( + namespace=secret_namespace, + body=kubernetes.client.V1Secret( + metadata=kubernetes.client.V1ObjectMeta(name=secret_name), + type=secret_type, + data=secret_data + ) + ) + if response: + logger.info(f"Created secret {secret_name} in namespace {secret_namespace}") + except ApiException as e: + logger.error(f"Failed to create secret {secret_name} in namespace {secret_namespace}: {e}") diff --git a/src/utils/const.py b/src/utils/const.py index 375aa47..31d1d18 100644 --- a/src/utils/const.py +++ b/src/utils/const.py @@ -1,6 +1,6 @@ import os import re -__version__ = "0.2" +__version__ = "1.0.0" __ph_version__ = "v1" description = "Securely manage and sync environment variables with Phase." @@ -26,6 +26,9 @@ # Define paths to Phase configs PHASE_ENV_CONFIG = '.phase.json' # Holds project and environment contexts in users repo, unique to each application. +# Kubernetes annotation to that deployments can use redeploy deployments after a secret has been changed. +REDEPLOY_ANNOTATION = "secrets.phase.dev/redeploy" + PHASE_SECRETS_DIR = os.path.expanduser('~/.phase/secrets') # Holds local encrypted caches of secrets and environment variables, common to all applications. (only if offline mode is enabled) CONFIG_FILE = os.path.join(PHASE_SECRETS_DIR, 'config.json') # Holds local user account configurations diff --git a/src/utils/crypto.py b/src/utils/crypto.py index 94c91fa..d6490de 100644 --- a/src/utils/crypto.py +++ b/src/utils/crypto.py @@ -19,7 +19,7 @@ from nacl.hash import blake2b from nacl.utils import random from base64 import b64encode, b64decode -from src.utils.const import __ph_version__ +from utils.const import __ph_version__ class CryptoUtils: diff --git a/src/utils/keyring.py b/src/utils/keyring.py index 911a59c..f7e0c72 100644 --- a/src/utils/keyring.py +++ b/src/utils/keyring.py @@ -2,7 +2,7 @@ import sys import getpass import keyring -from src.utils.misc import get_default_user_id +from utils.misc import get_default_user_id def get_credentials(): # Use environment variables if available diff --git a/src/utils/misc.py b/src/utils/misc.py index 4b714d9..d8d6e9a 100644 --- a/src/utils/misc.py +++ b/src/utils/misc.py @@ -1,136 +1,11 @@ import os +import base64 import platform import subprocess import json from urllib.parse import urlparse from typing import Union, List -from src.utils.const import __version__, PHASE_ENV_CONFIG, PHASE_CLOUD_API_HOST, PHASE_SECRETS_DIR, cross_env_pattern, local_ref_pattern - -def get_terminal_width(): - """ - Get the width of the terminal window. - If an OSError occurs (e.g., when running under 'watch' or piping output to a file), - a default width of 80 is assumed. - """ - try: - return os.get_terminal_size().columns - except OSError: - return 80 - - -def print_phase_links(): - """ - Print a beautiful welcome message inviting users to Phase's community and GitHub repository. - """ - # Calculate the dynamic width of the terminal for the separator line - separator_line = "─" * get_terminal_width() - - print("\033[1;34m" + separator_line + "\033[0m") - print("🙋 Need help?: \033[4mhttps://slack.phase.dev\033[0m") - print("💻 Bug reports / feature requests: \033[4mhttps://github.com/phasehq/cli\033[0m") - print("\033[1;34m" + separator_line + "\033[0m") - - -def sanitize_value(value): - """Sanitize values by removing surrounding single or double quotes.""" - if value.startswith("'") and value.endswith("'"): - return value[1:-1] - elif value.startswith('"') and value.endswith('"'): - return value[1:-1] - return value - - -def censor_secret(secret, max_length): - """ - Censor a secret to not exceed a certain length. - Also includes legacy censoring logic. - - :param secret: The secret to be censored - :param max_length: The maximum allowable length of the censored secret - :return: The censored secret - """ - # Legacy censoring logic - if len(secret) <= 6: - censored = '*' * len(secret) - else: - censored = secret[:3] + '*' * (len(secret) - 6) + secret[-3:] - - # Check for column width and truncate if necessary - if len(censored) > max_length: - return censored[:max_length - 3] - - return censored - - -def render_table(data, show=False, min_key_width=20): - """ - Render a table of key-value pairs. - - :param data: List of dictionaries containing keys and values - :param show: Whether to show the values or censor them - :param min_key_width: Minimum width for the "Key" column - """ - - # Find the length of the longest key - longest_key_length = max([len(item.get("key", "")) for item in data], default=0) - - # Set min_key_width to be the length of the longest key plus a buffer of 3, - # but not less than the provided minimum - min_key_width = max(longest_key_length + 3, min_key_width) - - terminal_width = get_terminal_width() - value_width = terminal_width - min_key_width - 4 # Deducting for spaces and pipe - - # Print the headers - print(f'{"KEY 🗝️":<{min_key_width}} | {"VALUE ✨":<{value_width}}') - print('-' * (min_key_width + value_width + 3)) # +3 accounts for spaces and pipe - - # If data is empty, just return after printing headers - if not data: - return - - # Tokenizing function - def tokenize(value): - delimiters = [':', '@', '/', ' '] - tokens = [value] - for delimiter in delimiters: - new_tokens = [] - for token in tokens: - new_tokens.extend(token.split(delimiter)) - tokens = new_tokens - return tokens - - # Print the rows - for item in data: - key = item.get("key") - value = item.get("value") - icon = '' - - # Tokenize value and check each token for references - tokens = tokenize(value) - cross_env_detected = any(cross_env_pattern.match(token) for token in tokens) - local_ref_detected = any(local_ref_pattern.match(token) for token in tokens if not cross_env_pattern.match(token)) - - # Set icon based on detected references - if cross_env_detected: - icon += '⛓️` ' - if local_ref_detected: - icon += '🔗 ' - - censored_value = censor_secret(value, value_width-len(icon)) - - # Include the icon before the value or censored value - displayed_value = icon + (value if show else censored_value) - - print(f'{key:<{min_key_width}} | {displayed_value:<{value_width}}') - - -def validate_url(url): - parsed_url = urlparse(url) - return all([ - parsed_url.scheme, # Scheme should be present (e.g., "https") - parsed_url.netloc, # Network location (e.g., "example.com") should be present - ]) +from utils.const import __version__, PHASE_ENV_CONFIG, PHASE_CLOUD_API_HOST, PHASE_SECRETS_DIR, cross_env_pattern, local_ref_pattern def get_default_user_host() -> str: @@ -215,7 +90,7 @@ def phase_get_context(user_data, app_name=None, env_name=None): Parameters: - user_data (dict): The user data from the API response. - - app_name (str, optional): The name of the desired application. + - app_name (str, optional): The name (or partial name) of the desired application. - env_name (str, optional): The name (or partial name) of the desired environment. Returns: @@ -232,45 +107,70 @@ def phase_get_context(user_data, app_name=None, env_name=None): default_env_name = config_data.get("defaultEnv") app_id = config_data.get("appId") except FileNotFoundError: - default_env_name = "Development" # Set the default environment to "Development" + default_env_name = "Development" app_id = None # 2. If env_name isn't explicitly provided, use the default env_name = env_name or default_env_name - # 3. Get the application using app_id or app_name - if app_name: # Override app_id if app_name is provided - application = next((app for app in user_data["apps"] if app["name"] == app_name), None) - elif app_id: - application = next((app for app in user_data["apps"] if app["id"] == app_id), None) - else: - application = user_data["apps"][0] + # 3. Match the application using app_id or find the best match for partial app_name + try: + if app_name: + matching_apps = [app for app in user_data["apps"] if app_name.lower() in app["name"].lower()] + if not matching_apps: + raise ValueError(f"🔍 No application found with the name '{app_name}'.") + # Sort matching applications by the length of their names, shorter names are likely to be more specific matches + matching_apps.sort(key=lambda app: len(app["name"])) + application = matching_apps[0] + elif app_id: + application = next((app for app in user_data["apps"] if app["id"] == app_id), None) + if not application: + raise ValueError(f"No application found with the ID '{app_id}'.") + else: + raise ValueError("🤔 No application context provided. Please run 'phase init' or pass the '--app' flag followed by your application name.") + + # 4. Attempt to match environment with the exact name or a name that contains the env_name string + environment = next((env for env in application["environment_keys"] if env_name.lower() in env["environment"]["name"].lower()), None) + + if not environment: + raise ValueError(f"⚠️ Warning: The environment '{env_name}' either does not exist or you do not have access to it.") + + return application["id"], environment["environment"]["id"], environment["identity_key"] + + except StopIteration: + raise ValueError("🔍 Application or environment not found.") + - if not application: - raise ValueError(f"No application matched using ID '{app_id}' or name '{app_name}'.") +def normalize_tag(tag): + """ + Normalize a tag by replacing underscores with spaces. - # 4. Attempt to match environment with the exact name or a name that contains the env_name string - environment = next((env for env in application["environment_keys"] if env_name.lower() in env["environment"]["name"].lower()), None) + Args: + tag (str): The tag to normalize. - if not environment: - raise ValueError(f"⚠️ Warning: The environment '{env_name}' either does not exist or you do not have access to it.") + Returns: + str: The normalized tag. + """ + return tag.replace('_', ' ').lower() - return application["id"], environment["environment"]["id"], environment["identity_key"] +def tag_matches(secret_tags, user_tag): + """ + Check if the user-provided tag partially matches any of the secret tags. -def open_browser(url): - """Open a URL in the default browser without any console output.""" - # Determine the right command based on the OS - if platform.system() == "Windows": - cmd = ['start', url] - elif platform.system() == "Darwin": - cmd = ['open', url] - else: # Assume Linux - cmd = ['xdg-open', url] + Args: + secret_tags (list): The list of tags associated with a secret. + user_tag (str): The user-provided tag to match. - # Suppress output by redirecting to devnull - with open(os.devnull, 'w') as fnull: - subprocess.run(cmd, stdout=fnull, stderr=fnull, check=True) + Returns: + bool: True if there's a partial match, False otherwise. + """ + normalized_user_tag = normalize_tag(user_tag) + for tag in secret_tags: + normalized_secret_tag = normalize_tag(tag) + if normalized_user_tag in normalized_secret_tag: + return True + return False def get_user_agent(): @@ -316,4 +216,33 @@ def get_user_agent(): pass user_agent_str = ' '.join(details) - return user_agent_str \ No newline at end of file + return user_agent_str + + +def transform_name(secret_key, format): + """ + Transforms a secret key from UPPER_SNAKE_CASE to the specified format. + + Args: + secret_key (str): The secret key to transform. + format (str): The target format ('camel', 'upper-camel', 'lower-snake', 'tf-var', 'dotnet-env', 'lower-kebab'). + + Returns: + str: The transformed secret key. + """ + words = secret_key.lower().split('_') + + if format == 'camel': + return words[0] + ''.join(word.capitalize() for word in words[1:]) + elif format == 'upper-camel': + return ''.join(word.capitalize() for word in words) + elif format == 'lower-snake': + return '_'.join(words) + elif format == 'tf-var': + return 'TF_VAR_' + '_'.join(words) + elif format == 'dotnet-env': + return '__'.join(word.capitalize() for word in words) + elif format == 'lower-kebab': + return '-'.join(words) + else: + return secret_key # Default: return the key as is if format is unknown diff --git a/src/utils/network.py b/src/utils/network.py index dc5b806..2502338 100644 --- a/src/utils/network.py +++ b/src/utils/network.py @@ -1,6 +1,6 @@ import os import requests -from src.utils.misc import get_user_agent +from utils.misc import get_user_agent from typing import List from typing import Dict @@ -28,8 +28,9 @@ def handle_request_errors(response: requests.Response) -> None: return if response.status_code != 200: - # Always include the response text in the error message - error_message = f"🗿 Request failed with status code {response.status_code}: {response.text}" + error_message = f"🗿 Request failed with status code {response.status_code}" + if PHASE_DEBUG: + error_message += f": {response.text}" raise Exception(error_message) @@ -40,8 +41,9 @@ def handle_connection_error(e: Exception) -> None: Args: e (Exception): The exception to handle. """ - # Always include exception details in the error message - error_message = f"🗿 Network error: Please check your internet connection. Detail: {str(e)}" + error_message = "🗿 Network error: Please check your internet connection." + if PHASE_DEBUG: + error_message += f" Detail: {str(e)}" raise Exception(error_message) @@ -52,8 +54,9 @@ def handle_ssl_error(e: Exception) -> None: Args: e (Exception): The exception to handle. """ - # Always include exception details in the error message - error_message = f"🗿 SSL error: The Phase Console is using an invalid/expired or a self-signed certificate. Detail: {str(e)}" + error_message = "🗿 SSL error: The Phase Console is using an invalid/expired or a self-signed certificate." + if PHASE_DEBUG: + error_message += f" Detail: {str(e)}" raise Exception(error_message) diff --git a/src/utils/phase_io.py b/src/utils/phase_io.py index a31aeea..181a991 100644 --- a/src/utils/phase_io.py +++ b/src/utils/phase_io.py @@ -1,9 +1,8 @@ -from typing import Optional -import requests, json +import requests from typing import Tuple from typing import List from dataclasses import dataclass -from src.utils.network import ( +from utils.network import ( fetch_phase_user, fetch_app_key, fetch_wrapped_key_share, @@ -13,19 +12,12 @@ delete_phase_secrets ) from nacl.bindings import ( - crypto_kx_keypair, - crypto_aead_xchacha20poly1305_ietf_encrypt, - crypto_aead_xchacha20poly1305_ietf_decrypt, - randombytes, - crypto_secretbox_NONCEBYTES, crypto_kx_server_session_keys, - crypto_kx_client_session_keys, - crypto_kx_seed_keypair, ) -from src.utils.crypto import CryptoUtils -from src.utils.const import __ph_version__, PHASE_ENV_CONFIG, pss_user_pattern, pss_service_pattern -from src.utils.misc import phase_get_context, get_default_user_host -from src.utils.keyring import get_credentials +from utils.crypto import CryptoUtils +from utils.const import __ph_version__, pss_user_pattern, pss_service_pattern +from utils.misc import phase_get_context, get_default_user_host, normalize_tag, tag_matches +from utils.keyring import get_credentials @dataclass class AppSecret: @@ -58,7 +50,7 @@ def __init__(self, init=True, pss=None, host=None): self._api_host = get_default_user_host() else: if not pss or not host: - raise ValueError("Invalid PHASE_HOST or PHASE_SERVICE_TOKEN") + raise ValueError("Both pss and host must be provided when init is set to False.") app_secret = pss self._api_host = host @@ -152,18 +144,18 @@ def create(self, key_value_pairs: List[Tuple[str, str]], env_name: str, app_name return create_phase_secrets(self._token_type, self._app_secret.app_token, env_id, secrets, self._api_host) - def get(self, env_name: str, keys: List[str] = None, app_name: str = None): + def get(self, env_name: str, keys: List[str] = None, app_name: str = None, tag: str = None) -> List[dict]: """ - Get secrets from Phase KMS based on key and environment. - + Get secrets from Phase KMS based on key and environment, with support for personal overrides, optional tag matching, and decrypting comments. + Args: - key (str, optional): The key for which to retrieve the secret value. env_name (str): The name (or partial name) of the desired environment. + keys (List[str], optional): The keys for which to retrieve the secret values. app_name (str, optional): The name of the desired application. - + tag (str, optional): The tag to match against the secrets. + Returns: - dict or list: A dictionary containing the decrypted key and value if key is provided, - otherwise a list of dictionaries for all secrets in the environment. + List[dict]: A list of dictionaries for all secrets in the environment that match the criteria. """ user_response = fetch_phase_user(self._token_type, self._app_secret.app_token, self._api_host) @@ -187,11 +179,30 @@ def get(self, env_name: str, keys: List[str] = None, app_name: str = None): results = [] for secret in secrets_data: - decrypted_key = CryptoUtils.decrypt_asymmetric(secret["key"], env_private_key, public_key) - if keys and decrypted_key not in keys: + if tag and not tag_matches(secret.get("tags", []), tag): continue - decrypted_value = CryptoUtils.decrypt_asymmetric(secret["value"], env_private_key, public_key) - results.append({"key": decrypted_key, "value": decrypted_value}) + + secret_id = secret["id"] + override = secret.get("override") + use_override = override and override.get("secret") == secret_id and override.get("is_active") + + key_to_decrypt = secret["key"] + value_to_decrypt = override["value"] if use_override else secret["value"] + comment_to_decrypt = secret["comment"] + + decrypted_key = CryptoUtils.decrypt_asymmetric(key_to_decrypt, env_private_key, public_key) + decrypted_value = CryptoUtils.decrypt_asymmetric(value_to_decrypt, env_private_key, public_key) + decrypted_comment = CryptoUtils.decrypt_asymmetric(comment_to_decrypt, env_private_key, public_key) if comment_to_decrypt else None + + if not keys or decrypted_key in keys: + result = { + "key": decrypted_key, + "value": decrypted_value, + "overridden": use_override, + "tags": secret.get("tags", []), + "comment": decrypted_comment + } + results.append(result) return results @@ -312,34 +323,9 @@ def delete(self, env_name: str, keys_to_delete: List[str], app_name: str = None) delete_phase_secrets(self._token_type, self._app_secret.app_token, env_id, secret_ids_to_delete, self._api_host) return keys_not_found + - - # TODO: Remove - def encrypt(self, plaintext, tag="") -> Optional[str]: - """ - Encrypts a plaintext string. - - Args: - plaintext (str): The plaintext to encrypt. - tag (str, optional): A tag to include in the encrypted message. The tag will not be encrypted. - - Returns: - str: The encrypted message, formatted as a string that includes the public key used for the one-time keypair, - the ciphertext, and the tag. Returns `None` if an error occurs. - """ - try: - one_time_keypair = random_key_pair() - symmetric_keys = crypto_kx_client_session_keys( - one_time_keypair[0], one_time_keypair[1], bytes.fromhex(self._app_secret.pss_user_public_key)) - ciphertext = CryptoUtils.encrypt_b64(plaintext, symmetric_keys[1]) - pub_key = one_time_keypair[0].hex() - - return f"ph:{__ph_version__}:{pub_key}:{ciphertext}:{tag}" - except ValueError as err: - raise ValueError(f"Something went wrong: {err}") - - - def decrypt(self, phase_ciphertext) -> Optional[str]: + def decrypt(self, phase_ciphertext) -> str | None: """ Decrypts a Phase ciphertext string. diff --git a/tests/crypto.py b/tests/crypto.py new file mode 100644 index 0000000..dcfecf9 --- /dev/null +++ b/tests/crypto.py @@ -0,0 +1,87 @@ +import pytest +import base64 +import re +import os +from utils.crypto import CryptoUtils +from nacl.secret import SecretBox + +class TestCryptoUtils: + def test_random_key_pair(self): + # Testing if the generated key pair (public and private keys) are of the correct length (32 bytes each) + public_key, private_key = CryptoUtils.random_key_pair() + assert len(public_key) == 32 + assert len(private_key) == 32 + + def test_client_session_keys(self): + # Testing client session keys generation by ensuring the keys are of the correct length + client_keypair = CryptoUtils.random_key_pair() + server_keypair = CryptoUtils.random_key_pair() + client_keys = CryptoUtils.client_session_keys(client_keypair, server_keypair[0]) + assert len(client_keys[0]) == 32 and len(client_keys[1]) == 32 + + def test_server_session_keys(self): + # Testing server session keys generation similar to client session keys + client_keypair = CryptoUtils.random_key_pair() + server_keypair = CryptoUtils.random_key_pair() + server_keys = CryptoUtils.server_session_keys(server_keypair, client_keypair[0]) + assert len(server_keys[0]) == 32 and len(server_keys[1]) == 32 + +def test_encrypt_and_decrypt_asymmetric(): + # Testing asymmetric encryption and decryption to ensure the decrypted text matches the original plaintext + test_plaintext = "Saigon, I'm still only in Saigon. Every time I think I'm gonna wake up back in the jungle.." + public_key, private_key = CryptoUtils.random_key_pair() + encrypted_data = CryptoUtils.encrypt_asymmetric(test_plaintext, public_key.hex()) + decrypted_data = CryptoUtils.decrypt_asymmetric(encrypted_data, private_key.hex(), public_key.hex()) + pattern = rf"ph:v{CryptoUtils.VERSION}:[0-9a-fA-F]{{64}}:.+" + assert re.match(pattern, encrypted_data) is not None + assert decrypted_data == test_plaintext + +class TestBlake2bDigest: + def test_blake2b_digest_length(self): + # Testing the length of the BLAKE2b hash to ensure it's 64 characters (32 bytes hex encoded) + input_str = "test string" + salt = "salt" + result = CryptoUtils.blake2b_digest(input_str, salt) + assert len(result) == 64 + + def test_blake2b_digest_consistency(self): + # Testing hash consistency for the same input and salt + input_str = "consistent input" + salt = "consistent salt" + hash1 = CryptoUtils.blake2b_digest(input_str, salt) + hash2 = CryptoUtils.blake2b_digest(input_str, salt) + assert hash1 == hash2 + + def test_blake2b_digest_unique_with_different_inputs(self): + # Ensuring different inputs with the same salt produce different hashes + salt = "salt" + hash1 = CryptoUtils.blake2b_digest("input1", salt) + hash2 = CryptoUtils.blake2b_digest("input2", salt) + assert hash1 != hash2 + + def test_blake2b_digest_unique_with_different_salts(self): + # Ensuring the same input with different salts produces different hashes + input_str = "input" + hash1 = CryptoUtils.blake2b_digest(input_str, "salt1") + hash2 = CryptoUtils.blake2b_digest(input_str, "salt2") + assert hash1 != hash2 + + @pytest.mark.parametrize("input_str, salt, expected_hash", [ + # Testing known hash values for specific inputs and salts + ("hello", "world", "38010cfe3a8e684cb17e6d049525e71d4e9dc3be173fc05bf5c5ca1c7e7c25e7"), + ("another test", "another salt", "5afad949edcfb22bd24baeed4e75b0aeca41731b8dff78f989a5a4c0564f211f") + ]) + def test_blake2b_digest_known_values(self, input_str, salt, expected_hash): + # Testing that the calculated hash matches the expected known hash + result = CryptoUtils.blake2b_digest(input_str, salt) + assert result == expected_hash + +class TestSecretSplitting: + def test_xor_secret_splitting_and_reconstruction(self): + # Testing XOR-based secret splitting and reconstruction + original_secret_hex = "6eed8a70ac9e75ab1894b06d4a5e21d1072649529753f3244316c6d9e4c9c951" + original_secret_bytes = bytes.fromhex(original_secret_hex) + random_share = os.urandom(len(original_secret_bytes)) + second_share = CryptoUtils.xor_bytes(original_secret_bytes, random_share) + reconstructed_secret_hex = CryptoUtils.reconstruct_secret([random_share.hex(), second_share.hex()]) + assert reconstructed_secret_hex == original_secret_hex diff --git a/tests/tags.py b/tests/tags.py new file mode 100644 index 0000000..e772756 --- /dev/null +++ b/tests/tags.py @@ -0,0 +1,28 @@ +import pytest +from utils.misc import tag_matches, normalize_tag + + +full_tag_names = { + "prod": ["Production", "ProdData", "NonProd_Environment"], + "config": ["ConfigData", "Configuration", "config_file"], + "test": ["Test_Tag", "testEnvironment", "Testing_Data"], + "dev": ["DevEnv", "DevelopmentData", "dev_tools"], + "prod_data": ["prod_data"], + "DEV_ENV": [] # No matching tags under the current logic +} + + +def test_normalize_tag(): + for tag in full_tag_names: + normalized_tag = normalize_tag(tag) + assert normalized_tag == tag.replace('_', ' ').lower(), f"Normalization failed for tag: {tag}" + + +def test_tag_matches(): + for tag, secret_tags in full_tag_names.items(): + normalized_tag = normalize_tag(tag) + + # Test for matching scenarios + for secret_tag in secret_tags: + normalized_secret_tag = normalize_tag(secret_tag) + assert normalized_tag in normalized_secret_tag, f"Tag '{tag}' should match with secret tag '{secret_tag}'"