Skip to content
This repository was archived by the owner on Jun 5, 2025. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 27 additions & 12 deletions src/codegate/pipeline/extract_snippets/extract_snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,14 @@
from codegate.pipeline.base import CodeSnippet, PipelineContext, PipelineResult, PipelineStep

CODE_BLOCK_PATTERN = re.compile(
r"```(?:(?P<language>\w+)\s+)?(?P<filename>[^\s\(]+)?(?:\s*\((?P<lineinfo>[^)]+)\))?\n(?P<content>(?:.|\n)*?)```"
r"```" # Opening backticks, no whitespace after backticks and before language
r"(?:(?P<language>[a-zA-Z0-9_+-]+)\s+)?" # Language must be followed by whitespace if present
r"(?:(?P<filename>[^\s\(\n]+))?" # Optional filename (cannot contain spaces or parentheses)
r"(?:\s+\([0-9]+-[0-9]+\))?" # Optional line numbers in parentheses
r"\s*\n" # Required newline after metadata
r"(?P<content>.*?)" # Content (non-greedy match)
r"```", # Closing backticks
re.DOTALL,
)

logger = structlog.get_logger("codegate")
Expand Down Expand Up @@ -78,18 +85,26 @@ def extract_snippets(message: str) -> List[CodeSnippet]:

# Find all code block matches
for match in CODE_BLOCK_PATTERN.finditer(message):
filename = match.group("filename")
matched_language = match.group("language") if match.group("language") else None
filename = match.group("filename") if match.group("filename") else None
content = match.group("content")
matched_language = match.group("language")

# Determine language
lang = None
if matched_language:
lang = ecosystem_from_message(matched_language.strip())
if lang is None and filename:
filename = filename.strip()
# Determine language from the filename
lang = ecosystem_from_filepath(filename)

# If we have a single word without extension after the backticks,
# it's a language identifier, not a filename. Typicaly used in the
# format ` ```python ` in output snippets
if filename and not matched_language and "." not in filename:
lang = filename
filename = None
else:
# Determine language from the message, either by the short
# language identifier or by the filename
lang = None
if matched_language:
lang = ecosystem_from_message(matched_language.strip())
if lang is None and filename:
filename = filename.strip()
# Determine language from the filename
lang = ecosystem_from_filepath(filename)

snippets.append(CodeSnippet(filepath=filename, code=content, language=lang))

Expand Down
99 changes: 99 additions & 0 deletions src/codegate/pipeline/extract_snippets/output.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
from typing import Optional

import structlog
from litellm import ModelResponse
from litellm.types.utils import Delta, StreamingChoices

from codegate.pipeline.base import PipelineContext
from codegate.pipeline.extract_snippets.extract_snippets import extract_snippets
from codegate.pipeline.output import OutputPipelineContext, OutputPipelineStep

logger = structlog.get_logger("codegate")


class CodeCommentStep(OutputPipelineStep):
"""Pipeline step that adds comments after code blocks"""

def __init__(self):
pass

@property
def name(self) -> str:
return "code-comment"

def _create_chunk(self, original_chunk: ModelResponse, content: str) -> ModelResponse:
"""
Creates a new chunk with the given content, preserving the original chunk's metadata
"""
return ModelResponse(
id=original_chunk.id,
choices=[
StreamingChoices(
finish_reason=None,
index=0,
delta=Delta(content=content, role="assistant"),
logprobs=None,
)
],
created=original_chunk.created,
model=original_chunk.model,
object="chat.completion.chunk",
)

def _split_chunk_at_code_end(self, content: str) -> tuple[str, str]:
"""Split content at the end of a code block (```)"""
lines = content.split("\n")
for i, line in enumerate(lines):
if line.strip() == "```":
# Return content up to and including ```, and the rest
before = "\n".join(lines[: i + 1])
after = "\n".join(lines[i + 1 :])
return before, after
return content, ""

async def process_chunk(
self,
chunk: ModelResponse,
context: OutputPipelineContext,
input_context: Optional[PipelineContext] = None,
) -> list[ModelResponse]:
"""Process a single chunk of the stream"""
if not chunk.choices[0].delta.content:
return [chunk]

# Get current content plus this new chunk
current_content = "".join(context.processed_content + [chunk.choices[0].delta.content])

# Extract snippets from current content
snippets = extract_snippets(current_content)

# Check if a new snippet has been completed
if len(snippets) > len(context.snippets):
# Get the last completed snippet
last_snippet = snippets[-1]
context.snippets = snippets # Update context with new snippets

# Split the chunk content if needed
before, after = self._split_chunk_at_code_end(chunk.choices[0].delta.content)

chunks = []

# Add the chunk with content up to the end of code block
if before:
chunks.append(self._create_chunk(chunk, before))

# Add the comment
chunks.append(
self._create_chunk(
chunk, f"\nThe above is a {last_snippet.language or 'unknown'} code snippet\n\n"
)
)

# Add the remaining content if any
if after:
chunks.append(self._create_chunk(chunk, after))

return chunks

# Pass through all other content that does not create a new snippet
return [chunk]
58 changes: 36 additions & 22 deletions src/codegate/pipeline/output.py
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from typing import AsyncIterator, Optional
from typing import AsyncIterator, List, Optional

from litellm import ModelResponse
from litellm.types.utils import Delta, StreamingChoices

from codegate.pipeline.base import PipelineContext
from codegate.pipeline.base import CodeSnippet, PipelineContext


@dataclass
Expand All @@ -19,6 +19,10 @@ class OutputPipelineContext:
# We store the messages that are not yet sent to the client in the buffer.
# One reason for this might be that the buffer contains a secret that we want to de-obfuscate
buffer: list[str] = field(default_factory=list)
# Store extracted code snippets
snippets: List[CodeSnippet] = field(default_factory=list)
# Store all content that has been processed by the pipeline
processed_content: List[str] = field(default_factory=list)


class OutputPipelineStep(ABC):
Expand All @@ -40,7 +44,7 @@ async def process_chunk(
chunk: ModelResponse,
context: OutputPipelineContext,
input_context: Optional[PipelineContext] = None,
) -> Optional[ModelResponse]:
) -> List[ModelResponse]:
"""
Process a single chunk of the stream.

Expand All @@ -52,8 +56,8 @@ async def process_chunk(
obfuscated in the user message or code snippets in the user message.

Return:
- None to pause the stream
- Modified or unmodified input chunk to pass through
- Empty list to pause the stream
- List containing one or more ModelResponse objects to emit
"""
pass

Expand All @@ -79,14 +83,25 @@ def __init__(

def _buffer_chunk(self, chunk: ModelResponse) -> None:
"""
Add chunk content to buffer.
Add chunk content to buffer. This is used to store content that is not yet processed
when a pipeline pauses streaming.
"""
self._buffered_chunk = chunk
for choice in chunk.choices:
# the last choice has no delta or content, let's not buffer it
if choice.delta is not None and choice.delta.content is not None:
self._context.buffer.append(choice.delta.content)

def _store_chunk_content(self, chunk: ModelResponse) -> None:
"""
Store chunk content in processed content. This keeps track of the content that has been
streamed through the pipeline.
"""
for choice in chunk.choices:
# the last choice has no delta or content, let's not buffer it
if choice.delta is not None and choice.delta.content is not None:
self._context.processed_content.append(choice.delta.content)

async def process_stream(
self, stream: AsyncIterator[ModelResponse]
) -> AsyncIterator[ModelResponse]:
Expand All @@ -99,27 +114,26 @@ async def process_stream(
self._buffer_chunk(chunk)

# Process chunk through each step of the pipeline
current_chunk = chunk
current_chunks = [chunk]
for step in self._pipeline_steps:
if current_chunk is None:
# Stop processing if a step returned None previously
# this means that the pipeline step requested to pause the stream
# instead, let's try again with the next chunk
if not current_chunks:
# Stop processing if a step returned empty list
break

processed_chunk = await step.process_chunk(
current_chunk, self._context, self._input_context
)
# the returned chunk becomes the input for the next chunk in the pipeline
current_chunk = processed_chunk
processed_chunks = []
for c in current_chunks:
step_result = await step.process_chunk(
c, self._context, self._input_context
)
processed_chunks.extend(step_result)

current_chunks = processed_chunks

# we have either gone through all the steps in the pipeline and have a chunk
# to return or we are paused in which case we don't yield
if current_chunk is not None:
# Step processed successfully, yield the chunk and clear buffer
# Yield all processed chunks
for c in current_chunks:
self._store_chunk_content(c)
self._context.buffer.clear()
yield current_chunk
# else: keep buffering for next iteration
yield c

except Exception as e:
# Log exception and stop processing
Expand Down
10 changes: 5 additions & 5 deletions src/codegate/pipeline/secrets/secrets.py
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,7 @@ async def process_chunk(
chunk: ModelResponse,
context: OutputPipelineContext,
input_context: Optional[PipelineContext] = None,
) -> Optional[ModelResponse]:
) -> list[ModelResponse]:
"""Process a single chunk of the stream"""
if not input_context:
raise ValueError("Input context not found")
Expand All @@ -239,7 +239,7 @@ async def process_chunk(
raise ValueError("Session ID not found in input context")

if not chunk.choices[0].delta.content:
return chunk
return [chunk]

# Check the buffered content
buffered_content = "".join(context.buffer)
Expand Down Expand Up @@ -270,13 +270,13 @@ async def process_chunk(
logprobs=None,
)
]
return chunk
return [chunk]

# If we have a partial marker at the end, keep buffering
if self.marker_start in buffered_content or self._is_partial_marker_prefix(
buffered_content
):
return None
return []

# No markers or partial markers, let pipeline handle the chunk normally
return chunk
return [chunk]
2 changes: 2 additions & 0 deletions src/codegate/server.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from codegate.pipeline.codegate_context_retriever.codegate import CodegateContextRetriever
from codegate.pipeline.codegate_system_prompt.codegate import CodegateSystemPrompt
from codegate.pipeline.extract_snippets.extract_snippets import CodeSnippetExtractor
from codegate.pipeline.extract_snippets.output import CodeCommentStep
from codegate.pipeline.output import OutputPipelineProcessor, OutputPipelineStep
from codegate.pipeline.secrets.manager import SecretsManager
from codegate.pipeline.secrets.secrets import CodegateSecrets, SecretUnredactionStep
Expand Down Expand Up @@ -49,6 +50,7 @@ def init_app() -> FastAPI:

output_steps: List[OutputPipelineStep] = [
SecretUnredactionStep(),
CodeCommentStep(),
]
output_pipeline = OutputPipelineProcessor(output_steps)

Expand Down
21 changes: 21 additions & 0 deletions tests/pipeline/extract_snippets/test_extract_snippets.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,27 @@ def hello():
CodeSnippet(language=None, filepath=None, code='print("Hello, world!")'),
],
),
# output code snippet with no filename
CodeSnippetTest(
input_message=""":
```python
@app.route('/')
def hello():
GITHUB_TOKEN="ghp_RjzIRljYij9CznoS7QAnD5RaFF6yH32073uI"
if __name__ == '__main__':
app.run()
return "Hello, Moon!"
```
""",
expected_count=1,
expected=[
CodeSnippet(
language="python",
filepath=None,
code="Hello, Moon!",
),
],
),
# Single Python code block
CodeSnippetTest(
input_message="""
Expand Down
Loading