Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ dependencies = [
"beautifulsoup4==4.14.3",
"pygls>=2.0,<3.0",
"lsprotocol>=2024.0.0",
"mcp>=1.0",
]

[project.urls]
Expand Down
24 changes: 23 additions & 1 deletion src/reqstool/command.py
Original file line number Diff line number Diff line change
Expand Up @@ -322,6 +322,11 @@ class ComboRawTextandArgsDefaultUltimateHelpFormatter(
help="Write server logs to a file (in addition to stderr)",
)

# command: mcp
mcp_parser = subparsers.add_parser("mcp", help="Start the Model Context Protocol server (stdio)")
mcp_source_subparsers = mcp_parser.add_subparsers(dest="source", required=True)
self._add_subparsers_source(mcp_source_subparsers, include_report_options=False, include_filter_options=False)

args = self.__parser.parse_args()

return args
Expand Down Expand Up @@ -432,11 +437,26 @@ def command_lsp(self, lsp_args: argparse.Namespace):
logging.fatal("reqstool LSP server crashed: %s", exc)
sys.exit(1)

def command_mcp(self, mcp_args: argparse.Namespace):
try:
from reqstool.mcp.server import start_server
except ImportError:
print(
"MCP server requires extra dependencies: pip install 'mcp>=1.0'",
file=sys.stderr,
)
sys.exit(1)
try:
start_server(location=self._get_initial_source(mcp_args))
except Exception as exc:
logging.fatal("reqstool MCP server crashed: %s", exc)
sys.exit(1)

def print_help(self):
self.__parser.print_help(sys.stderr)


def main():
def main(): # noqa: C901
command = Command()
args = command.get_arguments()

Expand Down Expand Up @@ -466,6 +486,8 @@ def main():
exit_code = command.command_status(status_args=args)
elif args.command == "lsp":
command.command_lsp(lsp_args=args)
elif args.command == "mcp":
command.command_mcp(mcp_args=args)
else:
command.print_help()
except MissingRequirementsFileError as exc:
Expand Down
2 changes: 0 additions & 2 deletions src/reqstool/commands/generate_json/generate_json.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,6 @@
# Copyright © LFV


from __future__ import annotations

import json
import logging

Expand Down
1 change: 0 additions & 1 deletion src/reqstool/commands/report/report.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Copyright © LFV

from __future__ import annotations

from enum import Enum

Expand Down
1 change: 0 additions & 1 deletion src/reqstool/commands/status/status.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
# Copyright © LFV

from __future__ import annotations

import json
import re
Expand Down
5 changes: 2 additions & 3 deletions src/reqstool/common/models/lifecycle.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,8 @@
# Copyright © LFV

from __future__ import annotations

from enum import Enum, unique
from typing import Optional
from typing import Optional, Self

from pydantic import BaseModel, ConfigDict

Expand Down Expand Up @@ -31,7 +30,7 @@ class LifecycleData(BaseModel):
state: LIFECYCLESTATE = LIFECYCLESTATE.EFFECTIVE

@classmethod
def from_dict(cls, data: Optional[dict]) -> LifecycleData:
def from_dict(cls, data: Optional[dict]) -> Self:
if data is None:
return cls(state=LIFECYCLESTATE.EFFECTIVE, reason=None)
return cls(
Expand Down
91 changes: 91 additions & 0 deletions src/reqstool/common/project_session.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
# Copyright © LFV


import logging

from reqstool.common.validators.lifecycle_validator import LifecycleValidator
from reqstool.common.validators.semantic_validator import SemanticValidator
from reqstool.common.validator_error_holder import ValidationErrorHolder
from reqstool.locations.location import LocationInterface
from reqstool.model_generators.combined_raw_datasets_generator import CombinedRawDatasetsGenerator
from reqstool.storage.database import RequirementsDatabase
from reqstool.storage.database_filter_processor import DatabaseFilterProcessor
from reqstool.storage.requirements_repository import RequirementsRepository

logger = logging.getLogger(__name__)


class ProjectSession:
"""Long-lived database session for a reqstool project loaded from any LocationInterface.

Keeps the SQLite database open for the lifetime of the session (unlike the
build_database() context manager which closes on exit). Suitable for servers
(MCP, LSP) that need persistent read access after a one-time build.
"""

def __init__(self, location: LocationInterface):
self._location = location
self._db: RequirementsDatabase | None = None
self._repo: RequirementsRepository | None = None
self._urn_source_paths: dict[str, dict[str, str]] = {}
self._ready: bool = False
self._error: str | None = None

@property
def ready(self) -> bool:
return self._ready

@property
def error(self) -> str | None:
return self._error

@property
def repo(self) -> RequirementsRepository | None:
return self._repo

@property
def urn_source_paths(self) -> dict[str, dict[str, str]]:
return self._urn_source_paths

def build(self) -> None:
self.close()
self._error = None
db = RequirementsDatabase()
try:
holder = ValidationErrorHolder()
semantic_validator = SemanticValidator(validation_error_holder=holder)

crdg = CombinedRawDatasetsGenerator(
initial_location=self._location,
semantic_validator=semantic_validator,
database=db,
)
crd = crdg.combined_raw_datasets

DatabaseFilterProcessor(db, crd.raw_datasets).apply_filters()
LifecycleValidator(RequirementsRepository(db))

self._db = db
self._repo = RequirementsRepository(db)
self._urn_source_paths = dict(crd.urn_source_paths)
self._ready = True
logger.info("Built project session for %s", self._location)
except SystemExit as e:
logger.warning("build() called sys.exit(%s) for %s", e.code, self._location)
self._error = f"Pipeline error (exit code {e.code})"
db.close()
except Exception as e:
logger.error("Failed to build project session for %s: %s", self._location, e)
self._error = str(e)
db.close()

def rebuild(self) -> None:
self.build()

def close(self) -> None:
if self._db is not None:
self._db.close()
self._db = None
self._repo = None
self._urn_source_paths = {}
self._ready = False
1 change: 1 addition & 0 deletions src/reqstool/common/queries/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
# Copyright © LFV
187 changes: 187 additions & 0 deletions src/reqstool/common/queries/details.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,187 @@
# Copyright © LFV


from reqstool.common.models.urn_id import UrnId
from reqstool.storage.requirements_repository import RequirementsRepository


def _svc_test_summary(svc_urn_id: UrnId, repo: RequirementsRepository) -> dict:
test_results = repo.get_test_results_for_svc(svc_urn_id)
return {
"passed": sum(1 for t in test_results if t.status.value == "passed"),
"failed": sum(1 for t in test_results if t.status.value == "failed"),
"skipped": sum(1 for t in test_results if t.status.value == "skipped"),
"missing": sum(1 for t in test_results if t.status.value == "missing"),
}


def get_requirement_details(
raw_id: str,
repo: RequirementsRepository,
urn_source_paths: dict[str, dict[str, str]] | None = None,
) -> dict | None:
initial_urn = repo.get_initial_urn()
urn_id = UrnId.assure_urn_id(initial_urn, raw_id)
all_reqs = repo.get_all_requirements()
req = all_reqs.get(urn_id)
if req is None:
return None

svc_urn_ids = repo.get_svcs_for_req(req.id)
all_svcs = repo.get_all_svcs()
svcs = [all_svcs[uid] for uid in svc_urn_ids if uid in all_svcs]

impls = repo.get_annotations_impls_for_req(req.id)
references = [str(ref_id) for rd in (req.references or []) for ref_id in rd.requirement_ids]

paths = urn_source_paths or {}
return {
"type": "requirement",
"id": req.id.id,
"urn": req.id.urn,
"title": req.title,
"significance": req.significance.value,
"description": req.description,
"rationale": req.rationale or "",
"revision": str(req.revision),
"lifecycle": {
"state": req.lifecycle.state.value,
"reason": req.lifecycle.reason or "",
},
"categories": [c.value for c in req.categories],
"implementation": req.implementation.value,
"references": references,
"implementations": [{"element_kind": a.element_kind, "fqn": a.fully_qualified_name} for a in impls],
"svcs": [
{
"id": s.id.id,
"urn": s.id.urn,
"title": s.title,
"verification": s.verification.value,
"lifecycle_state": s.lifecycle.state.value,
"test_summary": _svc_test_summary(s.id, repo),
}
for s in svcs
],
"location": repo.get_urn_location(req.id.urn),
"source_paths": paths.get(req.id.urn, {}),
}


def get_svc_details(
raw_id: str,
repo: RequirementsRepository,
urn_source_paths: dict[str, dict[str, str]] | None = None,
) -> dict | None:
initial_urn = repo.get_initial_urn()
urn_id = UrnId.assure_urn_id(initial_urn, raw_id)
all_svcs = repo.get_all_svcs()
svc = all_svcs.get(urn_id)
if svc is None:
return None

mvr_urn_ids = repo.get_mvrs_for_svc(svc.id)
all_mvrs = repo.get_all_mvrs()
mvrs = [all_mvrs[uid] for uid in mvr_urn_ids if uid in all_mvrs]

test_annotations = repo.get_annotations_tests_for_svc(svc.id)
test_results = repo.get_test_results_for_svc(svc.id)

all_reqs = repo.get_all_requirements()

paths = urn_source_paths or {}
return {
"type": "svc",
"id": svc.id.id,
"urn": svc.id.urn,
"title": svc.title,
"description": svc.description or "",
"verification": svc.verification.value,
"instructions": svc.instructions or "",
"revision": str(svc.revision),
"lifecycle": {
"state": svc.lifecycle.state.value,
"reason": svc.lifecycle.reason or "",
},
"requirement_ids": [
{
"id": r.id,
"urn": r.urn,
"title": all_reqs[r].title if r in all_reqs else "",
"lifecycle_state": all_reqs[r].lifecycle.state.value if r in all_reqs else "",
}
for r in svc.requirement_ids
],
"test_annotations": [{"element_kind": a.element_kind, "fqn": a.fully_qualified_name} for a in test_annotations],
"test_results": [{"fqn": t.fully_qualified_name, "status": t.status.value} for t in test_results],
"test_summary": {
"passed": sum(1 for t in test_results if t.status.value == "passed"),
"failed": sum(1 for t in test_results if t.status.value == "failed"),
"skipped": sum(1 for t in test_results if t.status.value == "skipped"),
"missing": sum(1 for t in test_results if t.status.value == "missing"),
},
"mvrs": [
{
"id": m.id.id,
"urn": m.id.urn,
"passed": m.passed,
"comment": m.comment or "",
}
for m in mvrs
],
"location": repo.get_urn_location(svc.id.urn),
"source_paths": paths.get(svc.id.urn, {}),
}


def get_mvr_details(
raw_id: str,
repo: RequirementsRepository,
urn_source_paths: dict[str, dict[str, str]] | None = None,
) -> dict | None:
initial_urn = repo.get_initial_urn()
urn_id = UrnId.assure_urn_id(initial_urn, raw_id)
all_mvrs = repo.get_all_mvrs()
mvr = all_mvrs.get(urn_id)
if mvr is None:
return None

paths = urn_source_paths or {}
return {
"type": "mvr",
"id": mvr.id.id,
"urn": mvr.id.urn,
"passed": mvr.passed,
"comment": mvr.comment or "",
"svc_ids": [{"id": s.id, "urn": s.urn} for s in mvr.svc_ids],
"location": repo.get_urn_location(mvr.id.urn),
"source_paths": paths.get(mvr.id.urn, {}),
}


def get_requirement_status(raw_id: str, repo: RequirementsRepository) -> dict | None:
"""Lightweight status check — avoids the full detail lookup."""
initial_urn = repo.get_initial_urn()
urn_id = UrnId.assure_urn_id(initial_urn, raw_id)
req = repo.get_all_requirements().get(urn_id)
if req is None:
return None

svc_urn_ids = repo.get_svcs_for_req(req.id)
test_summary = {"passed": 0, "failed": 0, "skipped": 0, "missing": 0}
for svc_uid in svc_urn_ids:
for t in repo.get_test_results_for_svc(svc_uid):
key = t.status.value
if key in test_summary:
test_summary[key] += 1

# skipped tests are not counted as failures; a requirement only "meets" if
# it has at least one implementation and no failed or missing test results
all_passing = test_summary["failed"] == 0 and test_summary["missing"] == 0
return {
"id": req.id.id,
"lifecycle_state": req.lifecycle.state.value,
"implementation": req.implementation.value,
"test_summary": test_summary,
"meets_requirements": req.implementation.value != "not_implemented" and all_passing,
}
Loading
Loading