# Google Workflows Integration (process + rawDocument)

Bu notebook, **Google Workflows** içinde **Document AI `:process`** çağrısını **rawDocument (base64)** ile yaparak,
çıktıdan `document.text` okumayı gösterir.


Akış:
1) GCS’den PDF indir → base64 encode  
2) Workflow YAML üret (args.pdf_b64 + args.mime_type)  
3) Deploy/Update  
4) Execute → `document.text` preview


## 0) Kurulum & Ön koşullar

- Workflows API + Document AI API etkin
- Yetkiler:
  - `roles/workflows.admin` (deploy)
  - `roles/workflows.invoker` (run)
  - `roles/documentai.apiUser`
- Auth: `gcloud auth application-default login`


In [None]:
%pip -q install google-cloud-storage google-cloud-workflows google-auth

# Kurulum sonrası kernel restart gerekebilir.


## 1) Konfigürasyon

In [None]:
import json, base64
from google.cloud import storage
from google.cloud import workflows_v1
from google.cloud.workflows import executions_v1
from google.api_core.exceptions import AlreadyExists

project_id = "vertextraining-486212"

workflow_region = "europe-west2"
workflow_name = "docai-process-rawdocument"

docai_region = "eu"      # processor region ile aynı olmalı
processor_id = "dda63aa0d93c03aa" # örn: f0bd8dcffc752533
mime_type = "application/pdf"

gcs_uri = "gs://my-vertex-training-bucket/ornek_fatura.pdf"

gemini_region = "europe-west2"
gemini_model = "gemini-2.5-flash"

print("Config loaded.")


## 2) GCS'den PDF indir ve base64'e çevir

In [None]:
def download_gcs_bytes(gcs_uri: str) -> bytes:
    assert gcs_uri.startswith("gs://")
    _, rest = gcs_uri.split("gs://", 1)
    bucket_name, blob_name = rest.split("/", 1)

    client = storage.Client(project=project_id)
    bucket = client.bucket(bucket_name)
    blob = bucket.blob(blob_name)
    return blob.download_as_bytes()

pdf_bytes = download_gcs_bytes(gcs_uri)
pdf_b64 = base64.b64encode(pdf_bytes).decode("utf-8")

print("✅ Downloaded bytes:", len(pdf_bytes))
print("✅ Base64 length:", len(pdf_b64))
print("Base64 preview:", pdf_b64[:80] + "...")


## 3) Workflow YAML (Document AI :process + rawDocument)

Endpoint:
`https://{region}-documentai.googleapis.com/v1/projects/{project}/locations/{region}/processors/{processor}:process`


In [None]:
from email.policy import default


workflow_yaml = f"""main:
  params: [args]
  steps:
    - init:
        assign:
          - project: "{project_id}"
          - docai_region: "{docai_region}"
          - processor: "{processor_id}"
          - pdf_b64: ${{args.pdf_b64}}
          - mime_type: ${{default(args.mime_type, "{mime_type}")}}
          - gemini_region: "{gemini_region}"
          - gemini_model: "{gemini_model}"

    - docai_process:
        call: http.post
        args:
          url: ${{"https://" + docai_region + "-documentai.googleapis.com/v1/projects/" + project + "/locations/" + docai_region + "/processors/" + processor + ":process"}}
          auth:
            type: OAuth2
          headers:
            Content-Type: application/json
          body:
            rawDocument:
              content: ${{pdf_b64}}
              mimeType: ${{mime_type}}
        result: docai_resp

    - extract_text:
        assign:
          - doc_text: ${{default(docai_resp.body.document.text, "")}}

    - done:
        return:
          text_preview: ${{text.substring(doc_text, 0, 800)}}
"""

print(workflow_yaml[1000:] + "\n...")


## 4) Deploy / Update Workflow

In [None]:
wf_client = workflows_v1.WorkflowsClient()

parent = f"projects/{project_id}/locations/{workflow_region}"
wf_path = wf_client.workflow_path(project_id, workflow_region, workflow_name)

workflow = workflows_v1.Workflow(
    name=wf_path,
    description="Document AI process (rawDocument base64) -> return document.text preview",
    source_contents=workflow_yaml,
)

try:
    op = wf_client.create_workflow(parent=parent, workflow=workflow, workflow_id=workflow_name)
    created = op.result()
    print("✅ Created:", created.name)
except AlreadyExists:
    op = wf_client.update_workflow(workflow=workflow, update_mask={"paths": ["source_contents", "description"]})
    updated = op.result()
    print("✅ Updated:", updated.name)


## 5) Execute (Workflow'u çalıştır)

In [None]:
exec_client = executions_v1.ExecutionsClient()
wf_full = f"projects/{project_id}/locations/{workflow_region}/workflows/{workflow_name}"

input_args = {
    "pdf_b64": pdf_b64,
    "mime_type": mime_type
}

execution = executions_v1.Execution(argument=json.dumps(input_args))
op = exec_client.create_execution(parent=wf_full, execution=execution)
print("✅ Execution started:", op.name)

import time
while True:
    ex = exec_client.get_execution(name=op.name)
    state = ex.state.name
    if state in ["SUCCEEDED", "FAILED", "CANCELLED"]:
        print("State:", state)
        print("Result:", ex.result[:2000] if ex.result else None)
        print("Error:", ex.error)
        break
    time.sleep(2)
