Skip to content

Commit 256a3f5

Browse files
committed
feat: direct resource access - replace HTTP calls with Blob/Queue/Cosmos
- Add ContentProcessService for direct Azure resource access - Fix queue message race condition: cancel renew task before delete - Fix Cosmos insert ordering: create record before enqueuing - Fix datetime serialization in gap_executor (default=str) - Add interim status propagation via on_status_change callback - Enable parallel document submission with asyncio.to_thread - Add upsert_lock to prevent concurrent Cosmos write races - Fix credential leak: replace config print with safe logger - Fix queue client resource leak in close() - ContentProcessor: write step status before execution for real-time UI - Add unit tests for content_process_models and content_process_service - Code quality: remove banner comments, dead code, debug prints
1 parent 4ca73ed commit 256a3f5

13 files changed

Lines changed: 1033 additions & 368 deletions

File tree

src/ContentProcessor/src/libs/pipeline/queue_handler_base.py

Lines changed: 23 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
from abc import ABC, abstractmethod
1616

1717
from azure.storage.queue import QueueClient
18-
1918
from libs.application.application_context import AppContext
2019
from libs.base.application_models import AppModelBase
2120
from libs.models.content_process import ContentProcess, Step_Outputs
@@ -123,6 +122,29 @@ async def _connect_async(
123122

124123
self._current_message_context.data_pipeline.pipeline_status.active_step = self.handler_name
125124

125+
# Update status to the currently running step BEFORE execution
126+
# so the UI reflects real-time progress.
127+
ContentProcess(
128+
process_id=self._current_message_context.data_pipeline.pipeline_status.process_id,
129+
processed_file_name=self._current_message_context.data_pipeline.files[
130+
0
131+
].name,
132+
processed_file_mime_type=self._current_message_context.data_pipeline.files[
133+
0
134+
].mime_type,
135+
status=step_name,
136+
imported_time=datetime.datetime.strptime(
137+
self._current_message_context.data_pipeline.pipeline_status.creation_time,
138+
"%Y-%m-%dT%H:%M:%S.%fZ",
139+
),
140+
last_modified_time=datetime.datetime.now(datetime.UTC),
141+
last_modified_by=step_name,
142+
).update_process_status_to_cosmos(
143+
connection_string=self.application_context.configuration.app_cosmos_connstr,
144+
database_name=self.application_context.configuration.app_cosmos_database,
145+
collection_name=self.application_context.configuration.app_cosmos_container_process,
146+
)
147+
126148
print(
127149
f"Start Processing : {self.handler_name}"
128150
) if show_information else None

src/ContentProcessorWorkflow/src/libs/application/application_configuration.py

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,15 @@ class Configuration(_configuration_base):
153153
)
154154
app_cps_processes: str = Field(default="cps-processes", alias="APP_CPS_PROCESSES")
155155

156+
app_cosmos_container_process: str = Field(
157+
default="Processes", alias="APP_COSMOS_CONTAINER_PROCESS"
158+
)
159+
app_storage_blob_url: str = Field(default="", alias="APP_STORAGE_BLOB_URL")
160+
app_storage_queue_url: str = Field(default="", alias="APP_STORAGE_QUEUE_URL")
161+
app_message_queue_extract: str = Field(
162+
default="content-pipeline-extract-queue", alias="APP_MESSAGE_QUEUE_EXTRACT"
163+
)
164+
156165
app_cps_content_process_endpoint: str = Field(
157166
default="http://localhost:8000/", alias="APP_CPS_CONTENT_PROCESS_ENDPOINT"
158167
)

src/ContentProcessorWorkflow/src/main.py

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
"""
1010

1111
import asyncio
12+
import logging
1213
import os
1314

1415
from sas.storage.blob.async_helper import AsyncStorageBlobHelper
@@ -22,8 +23,11 @@
2223
)
2324
from libs.base.application_base import ApplicationBase
2425
from repositories.claim_processes import Claim_Processes
26+
from services.content_process_service import ContentProcessService
2527
from steps.claim_processor import ClaimProcessor
2628

29+
logger = logging.getLogger(__name__)
30+
2731

2832
class Application(ApplicationBase):
2933
"""Local-development application that runs a single claim workflow.
@@ -40,10 +44,7 @@ def __init__(self):
4044

4145
def initialize(self):
4246
"""Bootstrap the application context and register services."""
43-
print(
44-
"Application initialized with configuration:",
45-
self.application_context.configuration,
46-
)
47+
logger.info("Application initialized with configuration (secrets redacted)")
4748

4849
self.register_services()
4950

@@ -57,8 +58,9 @@ def register_services(self):
5758
)
5859

5960
(
60-
self.application_context
61-
.add_singleton(DebuggingMiddleware, DebuggingMiddleware)
61+
self.application_context.add_singleton(
62+
DebuggingMiddleware, DebuggingMiddleware
63+
)
6264
.add_singleton(LoggingFunctionMiddleware, LoggingFunctionMiddleware)
6365
.add_singleton(InputObserverMiddleware, InputObserverMiddleware)
6466
.add_singleton(Mem0AsyncMemoryManager, Mem0AsyncMemoryManager)
@@ -81,6 +83,13 @@ def register_services(self):
8183
container_name=self.application_context.configuration.app_cosmos_container_batch_process,
8284
),
8385
)
86+
.add_singleton(
87+
ContentProcessService,
88+
lambda: ContentProcessService(
89+
config=self.application_context.configuration,
90+
credential=self.application_context.credential,
91+
),
92+
)
8493
)
8594

8695
async def run(self):

src/ContentProcessorWorkflow/src/main_service.py

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,7 @@
2727
)
2828
from libs.base.application_base import ApplicationBase
2929
from repositories.claim_processes import Claim_Processes
30+
from services.content_process_service import ContentProcessService
3031
from services.queue_service import (
3132
ClaimProcessingQueueService,
3233
QueueServiceConfig,
@@ -100,10 +101,7 @@ def initialize(self):
100101
Populates the DI container with agent-framework helpers, middlewares,
101102
repository services, and the queue-processing service.
102103
"""
103-
print(
104-
"Application initialized with configuration:",
105-
self.application_context.configuration,
106-
)
104+
logger.info("Application initialized with configuration (secrets redacted)")
107105
self.register_services()
108106

109107
def register_services(self):
@@ -116,8 +114,9 @@ def register_services(self):
116114
)
117115

118116
(
119-
self.application_context
120-
.add_singleton(DebuggingMiddleware, DebuggingMiddleware)
117+
self.application_context.add_singleton(
118+
DebuggingMiddleware, DebuggingMiddleware
119+
)
121120
.add_singleton(LoggingFunctionMiddleware, LoggingFunctionMiddleware)
122121
.add_singleton(InputObserverMiddleware, InputObserverMiddleware)
123122
.add_singleton(Mem0AsyncMemoryManager, Mem0AsyncMemoryManager)
@@ -140,6 +139,13 @@ def register_services(self):
140139
container_name=self.application_context.configuration.app_cosmos_container_batch_process,
141140
),
142141
)
142+
.add_singleton(
143+
ContentProcessService,
144+
lambda: ContentProcessService(
145+
config=self.application_context.configuration,
146+
credential=get_azure_credential(),
147+
),
148+
)
143149
)
144150

145151
config = self._build_service_config(self.config_override)
Lines changed: 86 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,86 @@
1+
# Copyright (c) Microsoft Corporation.
2+
# Licensed under the MIT License.
3+
4+
"""Pydantic models for content-processing queue messages.
5+
6+
These models mirror the queue message schema expected by the
7+
ContentProcessor worker (``src/ContentProcessor``). They must stay
8+
in sync with ``ContentProcessorAPI/app/routers/models/contentprocessor/model.py``.
9+
"""
10+
11+
from datetime import datetime
12+
from enum import Enum
13+
from typing import Any, Optional
14+
15+
from pydantic import BaseModel, ConfigDict, Field
16+
from sas.cosmosdb.mongo.model import RootEntityBase
17+
18+
19+
class ArtifactType(str, Enum):
20+
Undefined = "undefined"
21+
ConvertedContent = "converted_content"
22+
ExtractedContent = "extracted_content"
23+
SchemaMappedData = "schema_mapped_data"
24+
ScoreMergedData = "score_merged_data"
25+
SourceContent = "source_content"
26+
SavedContent = "saved_content"
27+
28+
29+
class PipelineStep(str, Enum):
30+
Transform = "transform"
31+
Extract = "extract"
32+
Mapping = "map"
33+
Evaluating = "evaluate"
34+
Save = "save"
35+
36+
37+
class ProcessFile(BaseModel):
38+
process_id: str
39+
id: str
40+
name: str
41+
size: int
42+
mime_type: str
43+
artifact_type: ArtifactType
44+
processed_by: str
45+
46+
47+
class PipelineStatus(BaseModel):
48+
process_id: str
49+
schema_id: str
50+
metadata_id: str
51+
completed: Optional[bool] = Field(default=False)
52+
creation_time: datetime
53+
last_updated_time: Optional[datetime] = Field(default=None)
54+
steps: list[str] = Field(default_factory=list)
55+
remaining_steps: Optional[list[str]] = Field(default_factory=list)
56+
completed_steps: Optional[list[str]] = Field(default_factory=list)
57+
58+
59+
class ContentProcessMessage(BaseModel):
60+
"""Queue message payload for a content-processing job."""
61+
62+
process_id: str
63+
files: list[ProcessFile] = Field(default_factory=list)
64+
pipeline_status: PipelineStatus = Field(default_factory=PipelineStatus)
65+
66+
67+
class ContentProcessRecord(RootEntityBase):
68+
"""Cosmos DB entity for the Processes collection.
69+
70+
Maps the document structure written by ContentProcessor/API.
71+
Only the fields we read are declared; extra fields are allowed
72+
so downstream writes (by the ContentProcessor worker) are preserved.
73+
"""
74+
75+
model_config = ConfigDict(extra="allow", arbitrary_types_allowed=True)
76+
77+
process_id: str = ""
78+
processed_file_name: Optional[str] = None
79+
processed_file_mime_type: Optional[str] = None
80+
processed_time: Optional[str] = None
81+
imported_time: Optional[datetime] = None
82+
status: Optional[str] = None
83+
entity_score: Optional[float] = 0.0
84+
schema_score: Optional[float] = 0.0
85+
result: Optional[Any] = None
86+
confidence: Optional[Any] = None

0 commit comments

Comments
 (0)