Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
19 changes: 17 additions & 2 deletions .devcontainer/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,20 @@ FROM windpioneers/gdal-python:rational-swordtail-gdal-3.10.0-python-3.13-dev
# Use system environment for UV instead of .venv (we're already in a devcontainer)
ENV UV_PROJECT_ENVIRONMENT="/usr/local/"

# Install localtunnel
RUN npm install -g localtunnel
ENV PROMPT_COMMAND='history -a'
ENV HISTFILE=/command-history/.zsh_history

# Install terraform-docs
RUN curl -sSLo ./terraform-docs.tar.gz https://terraform-docs.io/dl/v0.17.0/terraform-docs-v0.17.0-$(uname)-amd64.tar.gz && \
tar -xzf terraform-docs.tar.gz && \
chmod +x terraform-docs && \
mv terraform-docs /usr/bin/terraform-docs

# Install tflint
RUN curl -s https://raw.githubusercontent.com/terraform-linters/tflint/master/install_linux.sh | bash

# Install trivy
RUN curl -sfL https://raw.githubusercontent.com/aquasecurity/trivy/main/contrib/install.sh | sudo sh -s -- -b /usr/local/bin v0.67.2


USER vscode
1 change: 1 addition & 0 deletions .devcontainer/devcontainer.json
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
// IN ALPHABETIC ORDER: Add the IDs of extensions you want installed when the container is created.
"extensions": [
"4ops.terraform",
"anthropic.claude-code",
"charliermarsh.ruff",
"erikphansen.vscode-toggle-column-selection",
"esbenp.prettier-vscode",
Expand Down
17 changes: 17 additions & 0 deletions .devcontainer/docker-compose.developer-example.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
services:
web:
volumes:
# Map the workspace
- ..:/workspace:cached
# Map your local directories to persist credentials and command history...
# I keep everything in a .config directory so it looks something like this...
- $HOME/.config/claude/django-gcp:/claude/config
- $HOME/.config/gcloud/django-gcp:/gcp/config
- $HOME/.config/history/django-gcp:/command-history
- $HOME/.config/terraform/django-gcp:/home/vscode/.terraform.d

environment:
# Define where tools should find the things you've mapped in
- CLAUDE_CONFIG_DIR=/claude/config
- CLOUDSDK_CONFIG=/gcp/config
- GOOGLE_APPLICATION_CREDENTIALS=/gcp/config/your-service-account-key-file.json
19 changes: 9 additions & 10 deletions .devcontainer/postattach.sh
Original file line number Diff line number Diff line change
Expand Up @@ -12,18 +12,17 @@ git config --global --add safe.directory /workspace
# Install precommit hooks
pre-commit install && pre-commit install -t commit-msg

# Install claude code
# NOTE: You may have to redo this, sometimes it doesn't
# take after container rebuilds.
npm install -g @anthropic-ai/claude-code

# Install localtunnel
npm install -g localtunnel

# Set zsh history location
# This is done in postAttach so it's not overridden by the oh-my-zsh devcontainer feature
#
# We leave you to decide, but if you put this into a folder that's been mapped
# into the container, then history will persist over container rebuilds :)
#
# !!!IMPORTANT!!!
# Make sure your .zsh_history file is NOT committed into your repository or docker builds,
# as it can contain sensitive information. So in this case, you should add
# .devcontainer/.zsh_history
# to your .gitignore and .dockerignore files.
export HISTFILE="/workspace/.devcontainer/.zsh_history"
export HISTFILE="/command-history/.zsh_history"

# Add aliases to zshrc file
echo '# Aliases to avoid typing "python manage.py" repeatedly' >> ~/.zshrc
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ jobs:
python-version: 3.13

- name: Run precommit
run: SKIP=build-docs,check-branch-name uv run pre-commit run --all-files
run: SKIP=build-docs,check-branch-name,terraform_fmt,terraform_tflint,terraform_validate uv run pre-commit run --all-files

publish-test:
runs-on: ubuntu-latest
Expand Down
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,4 @@ notebooks/
# Tools caches
.ruff_cache
.poetry_cache
.claude
10 changes: 10 additions & 0 deletions .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,16 @@ repos:
- 'prettier@2.2.1'
- 'prettier-plugin-tailwindcss@0.2.6'

- repo: https://github.com/antonbabenko/pre-commit-terraform
rev: 'v1.103.0'
hooks:
- id: terraform_fmt
- id: terraform_tflint
- id: terraform_validate
# Trivy has interesting output but lots of failures on things
# we've deliberately (and correctly) done...
# - id: terraform_trivy

- repo: https://github.com/octue/conventional-commits
rev: 0.9.0
hooks:
Expand Down
1 change: 1 addition & 0 deletions CLAUDE.md
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
- The terraform folder for this repo contains infrastructure definitions used for live testing of the module. It contains definitions for example queus, buckets and so on.
20 changes: 20 additions & 0 deletions django_gcp/workflows/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
"""GCP Cloud Workflows integration for Django."""

from .exceptions import (
InvalidWorkflowArgumentsError,
WorkflowConfigurationError,
WorkflowError,
WorkflowExecutionError,
WorkflowNotFoundError,
)
from .workflows import Workflow, WorkflowExecution

__all__ = [
"Workflow",
"WorkflowExecution",
"WorkflowError",
"WorkflowNotFoundError",
"WorkflowExecutionError",
"InvalidWorkflowArgumentsError",
"WorkflowConfigurationError",
]
31 changes: 31 additions & 0 deletions django_gcp/workflows/exceptions.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
"""Custom exceptions for GCP Cloud Workflows."""


class WorkflowError(Exception):
"""Base exception for workflow-related errors."""

pass


class WorkflowNotFoundError(WorkflowError):
"""Raised when a workflow cannot be found in GCP."""

pass


class WorkflowExecutionError(WorkflowError):
"""Raised when workflow execution fails."""

pass


class InvalidWorkflowArgumentsError(WorkflowError):
"""Raised when workflow arguments are invalid or cannot be serialized."""

pass


class WorkflowConfigurationError(WorkflowError):
"""Raised when workflow configuration is invalid or incomplete."""

pass
199 changes: 199 additions & 0 deletions django_gcp/workflows/workflows.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
"""GCP Cloud Workflows integration for Django."""

from dataclasses import dataclass
from datetime import datetime
from typing import Optional

import google.auth
from google.cloud.workflows import executions_v1
from google.cloud.workflows.executions_v1.types import Execution

from django_gcp.tasks.serializers import serialize

from .exceptions import (
InvalidWorkflowArgumentsError,
WorkflowConfigurationError,
WorkflowExecutionError,
WorkflowNotFoundError,
)


@dataclass
class WorkflowExecution:
"""Wrapper for GCP Workflow execution response.

Attributes:
id: The execution ID (last part of the execution name)
name: Full execution resource name
workflow: Workflow resource name
state: Execution state (ACTIVE, SUCCEEDED, FAILED, CANCELLED)
start_time: When execution started
end_time: When execution ended (if completed)
argument: The JSON argument passed to the workflow
result: The workflow result (if completed successfully)
"""

id: str
name: str
workflow: str
state: str
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
argument: Optional[str] = None
result: Optional[str] = None

@classmethod
def from_execution(cls, execution: Execution) -> "WorkflowExecution":
"""Create WorkflowExecution from GCP Execution proto."""
# Extract execution ID from name (format: projects/.../locations/.../workflows/.../executions/{id})
execution_id = execution.name.split("/")[-1]

return cls(
id=execution_id,
name=execution.name,
workflow=execution.workflow_revision_id or "",
state=execution.state.name,
start_time=execution.start_time,
end_time=execution.end_time if execution.end_time else None,
argument=execution.argument if execution.argument else None,
result=execution.result if execution.result else None,
)


class Workflow:
"""Base class for triggering GCP Cloud Workflows.

Subclasses must define:
- workflow_name: The name of the workflow deployed in GCP
- location: The GCP region where the workflow is deployed

Example:
class ProcessUserData(Workflow):
workflow_name = "process-user-data"
location = "europe-west1"

# Invoke the workflow
execution = ProcessUserData().invoke(user_id=123, action="process")

# Store execution ID for tracking
execution_id = execution.id

# Later: check status
status = ProcessUserData().get_execution_status(execution_id)

# Get console URL
url = ProcessUserData().get_console_url(execution_id)
"""

workflow_name: str = None
location: str = None

def __init__(self):
"""Initialize the workflow client and get project information from credentials."""
if not self.workflow_name:
raise WorkflowConfigurationError(f"{self.__class__.__name__} must define 'workflow_name' class attribute")
if not self.location:
raise WorkflowConfigurationError(f"{self.__class__.__name__} must define 'location' class attribute")

# Get default credentials and project ID
self._credentials, self._project_id = google.auth.default()

if not self._project_id:
raise WorkflowConfigurationError(
"Could not determine project ID from credentials. Ensure GOOGLE_APPLICATION_CREDENTIALS is set."
)

# Initialize the Executions client
self._client = executions_v1.ExecutionsClient(credentials=self._credentials)

@property
def project_id(self) -> str:
"""Get the GCP project ID."""
return self._project_id

@property
def workflow_path(self) -> str:
"""Get the full workflow resource path."""
return f"projects/{self.project_id}/locations/{self.location}/workflows/{self.workflow_name}"

def invoke(self, **kwargs) -> WorkflowExecution:
"""Invoke the workflow with the given arguments.

Args:
**kwargs: Keyword arguments to pass to the workflow as JSON.
These will be serialized using Django's JSON encoder.

Returns:
WorkflowExecution: Execution object with ID and initial state.

Raises:
InvalidWorkflowArgumentsError: If arguments cannot be serialized.
WorkflowNotFoundError: If the workflow doesn't exist.
WorkflowExecutionError: If execution creation fails.
"""
# Serialize arguments to JSON
try:
argument_json = serialize(kwargs) if kwargs else None
except (TypeError, ValueError) as e:
raise InvalidWorkflowArgumentsError(f"Failed to serialize workflow arguments: {e}") from e

# Create execution request
execution = Execution()
if argument_json:
execution.argument = argument_json

# Invoke the workflow
try:
response = self._client.create_execution(
request={
"parent": self.workflow_path,
"execution": execution,
}
)
except Exception as e:
error_msg = str(e)
if "NOT_FOUND" in error_msg or "404" in error_msg:
raise WorkflowNotFoundError(
f"Workflow '{self.workflow_name}' not found in project '{self.project_id}', "
f"location '{self.location}'. Ensure the workflow is deployed."
) from e
raise WorkflowExecutionError(f"Failed to invoke workflow '{self.workflow_name}': {error_msg}") from e

return WorkflowExecution.from_execution(response)

def get_execution_status(self, execution_id: str) -> WorkflowExecution:
"""Get the current status of a workflow execution.

Args:
execution_id: The execution ID returned from invoke().

Returns:
WorkflowExecution: Current execution state and details.

Raises:
WorkflowExecutionError: If fetching execution status fails.
"""
execution_name = f"{self.workflow_path}/executions/{execution_id}"

try:
execution = self._client.get_execution(name=execution_name)
except Exception as e:
raise WorkflowExecutionError(f"Failed to get execution status for '{execution_id}': {e}") from e

return WorkflowExecution.from_execution(execution)

def get_console_url(self, execution_id: str) -> str:
"""Get the GCP Console URL for monitoring a workflow execution.

Args:
execution_id: The execution ID returned from invoke().

Returns:
str: URL to view the execution in GCP Console.
"""
# Format: https://console.cloud.google.com/workflows/workflow/{location}/{workflow}/execution/{execution_id}?project={project}
return (
f"https://console.cloud.google.com/workflows/workflow/"
f"{self.location}/{self.workflow_name}/execution/{execution_id}"
f"?project={self.project_id}"
)
8 changes: 5 additions & 3 deletions docs/source/index.rst
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,9 @@ Helpers are provided for:
* `Events (PubSub and EventArc) <https://cloud.google.com/pubsub>`_,
* `Structured Cloud Logging <https://cloud.google.com/logging>`_ and `Error Reporting <https://cloud.google.com/error-reporting>`_
* `Cloud Run (Metadata) <https://cloud.google.com/run/docs/container-contract#metadata-server>`_
* `Cloud Tasks <https://cloud.google.com/tasks>`_ and
* `Cloud Scheduler <https://cloud.google.com/scheduler>`_.
* `Cloud Tasks <https://cloud.google.com/tasks>`_,
* `Cloud Scheduler <https://cloud.google.com/scheduler>`_, and
* `Cloud Workflows <https://cloud.google.com/workflows>`_.


.. _aims:
Expand All @@ -31,7 +32,7 @@ The ultimate goals are to:

- **Enable event-based integration** between django and various GCP services.

- **Simplify the use of GCP resources in django** including Storage, Logging, Erorr Reporting, Run, PubSub, Tasks and Scheduler.
- **Simplify the use of GCP resources in django** including Storage, Logging, Error Reporting, Run, PubSub, Tasks, Scheduler, and Workflows.

.. TIP::
For example, if we have *both* a Store *and* a PubSub subscription to events on that store, we can do smart things in django when files or their metadata change.
Expand Down Expand Up @@ -70,6 +71,7 @@ Contents
logs
storage
tasks
workflows
projects
api
license
Expand Down
Loading
Loading