|
5 | 5 | from collections.abc import Callable
|
6 | 6 | from typing import Any
|
7 | 7 |
|
8 |
| -from mcp_scan.models import CrossRefResult, ScanError, ScanPathResult, ServerScanResult |
| 8 | +from mcp_scan.models import ScanError, ScanPathResult, ServerScanResult |
9 | 9 |
|
10 | 10 | from .mcp_client import check_server_with_timeout, scan_mcp_config_file
|
11 | 11 | from .StorageFile import StorageFile
|
12 |
| -from .utils import calculate_distance |
13 | 12 | from .verify_api import verify_scan_path
|
14 | 13 |
|
15 | 14 | # Set up logger for this module
|
@@ -192,39 +191,9 @@ async def scan_path(self, path: str, inspect_only: bool = False) -> ScanPathResu
|
192 | 191 | path_result.servers[i] = await self.scan_server(server, inspect_only)
|
193 | 192 | logger.debug("Verifying server path: %s", path)
|
194 | 193 | path_result = await verify_scan_path(path_result, base_url=self.base_url, run_locally=self.local_only)
|
195 |
| - path_result.cross_ref_result = await self.check_cross_references(path_result) |
196 | 194 | await self.emit("path_scanned", path_result)
|
197 | 195 | return path_result
|
198 | 196 |
|
199 |
| - async def check_cross_references(self, path_result: ScanPathResult) -> CrossRefResult: |
200 |
| - logger.info("Checking cross references for path: %s", path_result.path) |
201 |
| - cross_ref_result = CrossRefResult(found=False) |
202 |
| - for server in path_result.servers: |
203 |
| - other_servers = [s for s in path_result.servers if s != server] |
204 |
| - other_server_names = [s.name for s in other_servers] |
205 |
| - other_entity_names = [e.name for s in other_servers for e in s.entities] |
206 |
| - flagged_names = set(map(str.lower, other_server_names + other_entity_names)) |
207 |
| - logger.debug("Found %d potential cross-reference names", len(flagged_names)) |
208 |
| - |
209 |
| - if len(flagged_names) < 1: |
210 |
| - logger.debug("No flagged names found, skipping cross-reference check") |
211 |
| - continue |
212 |
| - |
213 |
| - for entity in server.entities: |
214 |
| - tokens = (entity.description or "").lower().split() |
215 |
| - for token in tokens: |
216 |
| - best_distance = calculate_distance(reference=token, responses=list(flagged_names))[0] |
217 |
| - if ((best_distance[1] <= 2) and (len(token) >= 5)) or (token in flagged_names): |
218 |
| - logger.warning("Cross-reference found: %s with token %s", entity.name, token) |
219 |
| - cross_ref_result.found = True |
220 |
| - cross_ref_result.sources.append(f"{entity.name}:{token}") |
221 |
| - |
222 |
| - if cross_ref_result.found: |
223 |
| - logger.info("Cross references detected with %d sources", len(cross_ref_result.sources)) |
224 |
| - else: |
225 |
| - logger.debug("No cross references found") |
226 |
| - return cross_ref_result |
227 |
| - |
228 | 197 | async def scan(self) -> list[ScanPathResult]:
|
229 | 198 | logger.info("Starting scan of %d paths", len(self.paths))
|
230 | 199 | if self.context_manager is not None:
|
|
0 commit comments