-
Notifications
You must be signed in to change notification settings - Fork 1
feat: user can view index change of streams #7
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
WalkthroughAdds a new MCP tool Changes
Sequence Diagram(s)sequenceDiagram
autonumber
actor Client
participant Server as MCP Server
participant DB as Postgres
participant Primitive as PrimitiveStreamTool
participant Composed as ComposedStreamTool
participant General as GeneralStreamTool
Client->>Server: get_index_change(data_provider, stream_id, from,to,interval, base_time?, frozen_at?, use_cache?)
Server->>DB: SELECT stream_type FROM main.streams WHERE (data_provider, stream_id)
DB-->>Server: stream_type
alt stream_type == primitive
Server->>Primitive: get_index(... from,to ...) → current_series
Server->>Primitive: get_index(... shifted by interval ...) → prev_series
else stream_type == composed
Server->>Composed: get_index(... from,to ...) → current_series
Server->>Composed: get_index(... shifted by interval ...) → prev_series
end
Server->>General: get_index_change(current_series, prev_series, interval)
General-->>Server: changes[]
Server-->>Client: {success, stream_type, data_provider, stream_id, time_interval, change_count, changes, query_parameters, message?}
Estimated code review effort🎯 4 (Complex) | ⏱️ ~60 minutes Assessment against linked issues
Assessment against linked issues: Out-of-scope changes
Possibly related PRs
Suggested reviewers
Poem
Tip 🔌 Remote MCP (Model Context Protocol) integration is now available!Pro plan users can now connect to remote MCP servers from the Integrations page. Connect with popular remote MCPs such as Notion and Linear to add more context to your reviews and chats. ✨ Finishing Touches
🧪 Generate unit tests
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. 🪧 TipsChatThere are 3 ways to chat with CodeRabbit:
SupportNeed help? Create a ticket on our support page for assistance with any issues or questions. CodeRabbit Commands (Invoked using PR/Issue comments)Type Other keywords and placeholders
CodeRabbit Configuration File (
|
|
waiting for coderabbit review |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 2
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (1)
src/postgres_mcp/truf/primitive_stream.py (1)
250-257: Exact base-time match detection is not exactexact_records uses _get_record_primitive(from=to=base_time), but that query returns the anchor at or before base_time, not strictly an exact event_time match. This collapses the “exact match” and “latest before” branches.
Fix: check equality explicitly before falling back.
- # Try to find exact match at base_time - exact_records = await self._get_record_primitive( - data_provider, stream_id, effective_base_time, effective_base_time, frozen_at - ) - - if exact_records: - return Decimal(str(exact_records[0]["value"])) + # Try to find exact match at base_time + effective_frozen_at = frozen_at if frozen_at is not None else 9223372036854775000 + exact_rows = await SafeSqlDriver.execute_param_query( + self.sql_driver, + """ + SELECT pe.value + FROM main.primitive_events pe + WHERE pe.stream_ref = {} + AND pe.event_time = {} + AND pe.created_at <= {} + ORDER BY pe.created_at DESC + LIMIT 1; + """, + [stream_ref, effective_base_time, effective_frozen_at] + ) + if exact_rows: + return Decimal(str(exact_rows[0].cells.get("value", "0")))Also applies to: 258-266, 267-275
🧹 Nitpick comments (11)
src/postgres_mcp/truf/query.py (4)
439-446: Minor: consistent aliasing and readabilitystream_ref_calc uses the alias btc for base_time_calc. It’s correct, but given frequent CROSS JOIN stream_ref_calc src usage later, consider aligning aliasing for readability (btc->base_calc) to make later references obvious in explain output.
500-559: Weight normalization logic: clarify top-level normalization and bounded recursion
- effective_weight at the base case is set to the raw tts.weight_for_segment (not normalized among siblings). Normalization is applied only in the recursive step via division by the siblings’ sum. This is fine because the final aggregation divides by cum_sw, but a short comment here would prevent future confusion.
- Recursion depth is capped at h.level < 10. If deeper hierarchies are possible in production data, consider making this a parameter or increasing to a safer upper bound to avoid silent truncation.
Apply documentation-only change:
tts.weight_for_segment AS effective_weight, + -- Note: base-level weights are not normalized; normalization occurs in recursive step. + -- Final aggregation divides by cum_sw to normalize the overall result.
586-635: cleaned_event_times can be simplified and sped upThe anchor_event subquery finds the max event_time before effective_from via a UNION and ORDER BY/LIMIT 1. On large primitive_events, this can be heavy. Consider:
- A windowed approach or two targeted index-friendly lookups:
- one for max(pe.event_time) before effective_from constrained by relevant stream_refs (via primitive_weights),
- unioned with relevant group_sequence_start candidates.
Ensure indexes exist:
- main.primitive_events(stream_ref, event_time, created_at)
- main.taxonomies(stream_ref, start_time, group_sequence, disabled_at)
984-986: Deterministic dedup and orderingSELECT DISTINCT event_time, value may drop legitimate duplicate event_time rows that can differ by value due to LOCF mapping differences. If your intent is “last value per event_time” deterministically, derive it upstream or switch to DISTINCT ON (event_time) with a clear ORDER BY that ensures the chosen value is consistent.
src/postgres_mcp/truf/general.py (2)
68-71: Defensive parsing for event_time/valueIf upstream ever emits non-numeric strings, int(...) and Decimal(...) will raise. Consider a small guard to skip malformed records rather than fail the whole computation.
- current_sorted = sorted(current_data, key=lambda x: int(x["event_time"])) - prev_sorted = sorted(prev_data, key=lambda x: int(x["event_time"])) + def safe_int(v): + try: return int(v) + except Exception: return None + def safe_dec(v): + try: return Decimal(str(v)) + except Exception: return None + + current_sorted = sorted( + (r for r in current_data if safe_int(r.get("event_time")) is not None and safe_dec(r.get("value")) is not None), + key=lambda x: int(x["event_time"]) + ) + prev_sorted = sorted( + (r for r in prev_data if safe_int(r.get("event_time")) is not None and safe_dec(r.get("value")) is not None), + key=lambda x: int(x["event_time"]) + )
91-99: Normalize output scale to 18 decimalsTo match NUMERIC(36,18) semantics and avoid very long strings, quantize the percentage to 18 decimal places.
- change_percent = ((current_value - prev_value) * Decimal('100')) / prev_value + change_percent = ((current_value - prev_value) * Decimal('100')) / prev_value + # quantize to 18 decimal places + change_percent = change_percent.quantize(Decimal('0.000000000000000001'))src/postgres_mcp/server.py (1)
910-951: Parallelize current vs. previous fetches to reduce latencyBoth calls are I/O bound. Use asyncio.gather to fetch current and previous windows concurrently.
- if stream_type == "composed": - composed_tool = ComposedStreamTool(sql_driver) - current_data = await composed_tool.get_index( + if stream_type == "composed": + composed_tool = ComposedStreamTool(sql_driver) + current_coro = composed_tool.get_index( data_provider=data_provider, stream_id=stream_id, from_time=from_time, to_time=to_time, frozen_at=frozen_at, base_time=base_time, use_cache=use_cache ) - # Get previous data - prev_data = await composed_tool.get_index( + prev_coro = composed_tool.get_index( data_provider=data_provider, stream_id=stream_id, from_time=earliest_prev, to_time=latest_prev, frozen_at=frozen_at, base_time=base_time, use_cache=use_cache ) + current_data, prev_data = await asyncio.gather(current_coro, prev_coro) elif stream_type == "primitive": primitive_tool = PrimitiveStreamTool(sql_driver) - current_data = await primitive_tool.get_index( + current_coro = primitive_tool.get_index( data_provider=data_provider, stream_id=stream_id, from_time=from_time, to_time=to_time, frozen_at=frozen_at, base_time=base_time ) - # Get previous data - prev_data = await primitive_tool.get_index( + prev_coro = primitive_tool.get_index( data_provider=data_provider, stream_id=stream_id, from_time=earliest_prev, to_time=latest_prev, frozen_at=frozen_at, base_time=base_time ) + current_data, prev_data = await asyncio.gather(current_coro, prev_coro)src/postgres_mcp/truf/primitive_stream.py (4)
86-93: Nit: base_value is already Decimalbase_value is a Decimal from _get_base_value. Wrapping in Decimal(str(...)) is redundant and adds overhead.
- base_value_decimal = Decimal(str(base_value)) + base_value_decimal = base_value
102-111: Nit: avoid redundant Decimal conversions and keep consistentSame as above within the loop.
- base_value_decimal = Decimal(str(base_value)) + base_value_decimal = base_value for record in records: - record_value = Decimal(str(record["value"])) + record_value = Decimal(str(record["value"])) indexed_value = (record_value * Decimal('100')) / base_value_decimal
171-173: Consider empty-range behaviorCurrently _get_record_primitive raises on empty results. For index calculation callers, it might be preferable to return [] and let the caller decide (e.g., to produce “no data” gracefully). If raising is intentional for diagnostics, ignore this.
116-118: Minor log message mismatchLog says get_index_primitive but method is get_index.
- logger.error(f"Error in get_index_primitive for {data_provider}/{stream_id}: {e}") + logger.error(f"Error in get_index for {data_provider}/{stream_id}: {e}")
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (4)
src/postgres_mcp/server.py(2 hunks)src/postgres_mcp/truf/general.py(1 hunks)src/postgres_mcp/truf/primitive_stream.py(7 hunks)src/postgres_mcp/truf/query.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (2)
src/postgres_mcp/truf/general.py (1)
src/postgres_mcp/server.py (1)
get_index_change(848-985)
src/postgres_mcp/server.py (4)
src/postgres_mcp/truf/general.py (2)
GeneralStreamTool(15-100)get_index_change(22-58)src/postgres_mcp/sql/safe_sql.py (2)
SafeSqlDriver(88-1033)execute_param_query(1027-1033)src/postgres_mcp/truf/composed_stream.py (2)
ComposedStreamTool(13-198)get_index(132-198)src/postgres_mcp/truf/primitive_stream.py (1)
get_index(25-118)
🔇 Additional comments (2)
src/postgres_mcp/truf/query.py (1)
398-414: Update get_index params to match COMPOSED_STREAM_INDEX_QUERY placeholdersThe SQL constant
COMPOSED_STREAM_INDEX_QUERYinsrc/postgres_mcp/truf/query.pydeclares 10 substitution placeholders:
- data_provider
- stream_id
- from_param
- to_param
- frozen_at_param
- base_time_param
- use_cache_param
- effective_from
- effective_to
- effective_frozen_at
However, in
ComposedStreamTool.get_index(src/postgres_mcp/truf/composed_stream.py), theparamslist only supplies 9 values:
- data_provider.lower()
- stream_id
- from_time
- to_time
- frozen_at
- use_cache
- from_time
- to_time
- frozen_at
This omits
base_time, causing every placeholder after the 5th to bind to the wrong argument.Action: insert
base_timebetweenfrozen_atanduse_cacheso that the 6th placeholder (base_time_param) is correctly populated, e.g.:async def get_index(…): # … params = [ data_provider.lower(), # data_provider stream_id, # stream_id from_time, # from_param to_time, # to_param frozen_at, # frozen_at_param + base_time, # base_time_param use_cache, # use_cache_param from_time, # effective_from to_time, # effective_to frozen_at, # effective_frozen_at ]This will properly align all 10 parameters with their corresponding placeholders.
Likely an incorrect or invalid review comment.
src/postgres_mcp/truf/general.py (1)
60-66: LGTM: Two-pointer algorithm with Decimal precisionThe linear two-pointer walk over time-aligned series is correct and efficient (O(N+M)), and Decimal usage aligns with NUMERIC(36,18).
Time Submission Status
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 0
♻️ Duplicate comments (1)
src/postgres_mcp/server.py (1)
919-942: Guard unknown stream_type explicitly (don’t treat unknowns as primitive)Right now anything not equal to "composed" falls through to primitive. This can mask data issues and return misleading results. Make the primitive check explicit and return an error for unknown values. (This echoes an earlier suggestion on a prior commit.)
- # Get current index data based on stream type - if stream_type == "composed": + # Get current index data based on stream type + if stream_type == "composed": ... - else: # primitive + elif stream_type == "primitive": primitive_tool = PrimitiveStreamTool(sql_driver) current_data = await primitive_tool.get_index( data_provider=data_provider, stream_id=stream_id, from_time=from_time, to_time=to_time, frozen_at=frozen_at, base_time=base_time ) - # Get previous data - prev_data = await primitive_tool.get_index( - data_provider=data_provider, - stream_id=stream_id, - from_time=earliest_prev, - to_time=latest_prev, - frozen_at=frozen_at, - base_time=base_time - ) + # Get previous data (set below after checking current_data) + else: + return format_error_response(f"Unsupported stream_type '{stream_type}' for {data_provider}/{stream_id}")
🧹 Nitpick comments (4)
src/postgres_mcp/server.py (4)
847-857: Enforce runtime type checking with @validate_callAdd @validate_call to catch incorrect parameter types early (e.g., strings passed for timestamps) and to align with other tools already using it.
-@mcp.tool(description="Calculate index change percentage over time interval - returns percentage change values (e.g., 2.147 = 2.147% change)") +@mcp.tool(description="Calculate index change percentage over time interval - returns percentage change values (e.g., 2.147 = 2.147% change)") +@validate_call async def get_index_change(
889-896: Add ID shape validation (consistent UX with other tools)get_composed_stream_records validates data_provider and stream_id format up front; mirroring that here gives earlier, clearer errors and avoids unnecessary DB hits.
if time_interval <= 0: return format_error_response( f"time_interval must be > 0 (got {time_interval})" ) + # Basic identifier validation (align with other tools) + if not data_provider.startswith('0x') or len(data_provider) != 42: + return format_error_response("data_provider must be 0x followed by 40 hex characters") + if not stream_id.startswith('st') or len(stream_id) != 32: + return format_error_response("stream_id must start with 'st' and be 32 characters total") + sql_driver = await get_sql_driver()
921-959: Skip the second query when there’s no current dataIf current_data is empty, fetching prev_data is unnecessary. Short-circuit or at least avoid the prev fetch to save a heavy round trip.
if stream_type == "composed": composed_tool = ComposedStreamTool(sql_driver) current_data = await composed_tool.get_index( data_provider=data_provider, stream_id=stream_id, from_time=from_time, to_time=to_time, frozen_at=frozen_at, base_time=base_time, use_cache=use_cache ) - # Get previous data - prev_data = await composed_tool.get_index( - data_provider=data_provider, - stream_id=stream_id, - from_time=earliest_prev, - to_time=latest_prev, - frozen_at=frozen_at, - base_time=base_time, - use_cache=use_cache - ) + # Only fetch previous data if we have current data + if current_data: + prev_data = await composed_tool.get_index( + data_provider=data_provider, + stream_id=stream_id, + from_time=earliest_prev, + to_time=latest_prev, + frozen_at=frozen_at, + base_time=base_time, + use_cache=use_cache + ) + else: + prev_data = [] - elif stream_type == "primitive": + elif stream_type == "primitive": primitive_tool = PrimitiveStreamTool(sql_driver) current_data = await primitive_tool.get_index( data_provider=data_provider, stream_id=stream_id, from_time=from_time, to_time=to_time, frozen_at=frozen_at, base_time=base_time ) - # Get previous data - prev_data = await primitive_tool.get_index( - data_provider=data_provider, - stream_id=stream_id, - from_time=earliest_prev, - to_time=latest_prev, - frozen_at=frozen_at, - base_time=base_time - ) + # Only fetch previous data if we have current data + if current_data: + prev_data = await primitive_tool.get_index( + data_provider=data_provider, + stream_id=stream_id, + from_time=earliest_prev, + to_time=latest_prev, + frozen_at=frozen_at, + base_time=base_time + ) + else: + prev_data = []
969-985: Trim noisy None fields and expose prev window for observability
- Avoid returning "use_cache": None for primitive streams.
- Include earliest_prev/latest_prev to help callers verify the comparison window.
result = { "success": True, "stream_type": stream_type, "data_provider": data_provider, "stream_id": stream_id, "time_interval": time_interval, "change_count": len(changes), "changes": changes, "query_parameters": { "from_time": from_time, "to_time": to_time, "time_interval": time_interval, "frozen_at": frozen_at, - "base_time": base_time, - "use_cache": use_cache if stream_type == "composed" else None + "base_time": base_time, + "earliest_prev": earliest_prev, + "latest_prev": latest_prev, + **({"use_cache": use_cache} if stream_type == "composed" else {}) } }
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
💡 Knowledge Base configuration:
- MCP integration is disabled by default for public repositories
- Jira integration is disabled by default for public repositories
- Linear integration is disabled by default for public repositories
You can enable these sources in your CodeRabbit configuration.
📒 Files selected for processing (1)
src/postgres_mcp/server.py(2 hunks)
🧰 Additional context used
🧬 Code graph analysis (1)
src/postgres_mcp/server.py (4)
src/postgres_mcp/truf/general.py (2)
GeneralStreamTool(15-100)get_index_change(22-58)src/postgres_mcp/sql/safe_sql.py (2)
SafeSqlDriver(88-1033)execute_param_query(1027-1033)src/postgres_mcp/truf/composed_stream.py (1)
get_index(132-198)src/postgres_mcp/truf/primitive_stream.py (1)
get_index(25-118)
🔇 Additional comments (3)
src/postgres_mcp/server.py (3)
38-38: Good addition: centralizing change math via GeneralStreamToolImporting GeneralStreamTool keeps percentage-change logic in one place and avoids duplication across tools.
889-896: Input validation: nice guardrailsThe checks for from_time <= to_time and time_interval > 0 are correct and prevent wasted queries and confusing results. Good call.
847-895: No duplicate @mcp.tool get_index_change foundA repository-wide search shows only one @mcp.tool‐decorated
get_index_changeinsrc/postgres_mcp/server.py. The other occurrence insrc/postgres_mcp/truf/general.pyis not decorated and thus not exported as a tool. Everything looks correct.
resolves: https://github.com/trufnetwork/truf-network/issues/1132
resolves: https://github.com/trufnetwork/truf-network/issues/1120
Test
Summary by CodeRabbit
New Features
Refactor