diff --git a/transformers/domains/url/models.py b/transformers/domains/url/models.py index 5ec6dba..66b4e01 100644 --- a/transformers/domains/url/models.py +++ b/transformers/domains/url/models.py @@ -1,8 +1,7 @@ """ URL Domain Models - Unified Data Model (UDM). -This module defines the canonical schema for URLs, URL collections, and -categories within the URL domain. +This module defines the canonical schema for URLs, URL collections. Design Principles: @@ -21,21 +20,6 @@ from pydantic import Field -class Category(BaseModel): - """ - Represents a normalized category entity. - - This includes a stable identifier and taxonomic classification. - """ - - id: str = Field(..., description="Internal unique identifier for the category") - name: str = Field(..., description="Human-readable name of the category") - type: Literal["standard", "custom"] = Field( - ..., - description="Distinguishes between system-standard and user-defined categories", - ) - - class Metadata(BaseModel): """ Extensible container for enrichment data. @@ -70,30 +54,15 @@ class URL_UDM(BaseModel): type: Literal["literal", "wildcard", "regex"] = Field( ..., description="The syntax type of the pattern" ) - action: Literal["allow", "block", "monitor"] = Field( - ..., description="Standardized enforcement action" - ) - status: Literal["enable", "disable"] = Field( - ..., description="Operational status of the rule" - ) url_list_id: str = Field( ..., description="Unique ID for the parent URL list" ) url_list_name: str = Field( ..., description="Human-readable name of the URL list" ) - - categories: List[Category] = Field( - default_factory=list, - description="Merged array of standard and custom categories", - ) - vendor: Optional[str] = Field( None, description="Original vendor for traceability purposes" ) metadata: Optional[Metadata] = Field( None, description="Processing metadata and timestamps" ) - notes: Optional[str] = Field( - None, description="Optional justifications or comments" - ) diff --git a/transformers/domains/url/vendors/fortinet.py b/transformers/domains/url/vendors/fortinet.py index bc443aa..720442d 100644 --- a/transformers/domains/url/vendors/fortinet.py +++ b/transformers/domains/url/vendors/fortinet.py @@ -1,21 +1,25 @@ """ Fortinet URL Domain Integration. -This module implements the transformer, mapper, and exporter for Fortinet, -converting between Fortinet-specific configurations and the Unified Data -Model (UDM). +This module implements the Transformer, Mapper, and Exporter for Fortinet, +converting between Fortinet-specific configurations and the Pydantic +Unified Data Model (UDM). """ from datetime import datetime from typing import Any +from typing import Dict from typing import List from typing import Optional import jmespath +# Domain Model imports from transformers.domains.url.models import URL_UDM from transformers.domains.url.models import Category from transformers.domains.url.models import Metadata +# Framework imports - Absolute paths +from transformers.framework.udm_transformers.action_mapper import ActionMapper from transformers.framework.udm_transformers.category_mapper import \ CategoryMapper from transformers.framework.udm_transformers.metadata_enricher import \ @@ -24,19 +28,7 @@ PatternNormalizer from transformers.framework.udm_transformers.type_mapper import TypeMapper -FORTINET_ACTION_MAP = { - "allow": "allow", - "block": "block", - "monitor": "monitor", - "exempt": "allow", -} - -FORTINET_CATEGORY_MAP = { - "3": "malware", - "4": "phishing", - "5": "gambling", - "default": "uncategorized", -} +# ---------------- FORTINET MAPPINGS ---------------- FORTINET_TYPE_MAP = { "simple": "literal", @@ -44,19 +36,19 @@ "regex": "regex", } +# ---------------- EXTRACTION LAYER ---------------- + + JMESPATH_FLATTEN_URLS = """ *.urls.*.{ pattern: url, - action: action, - status: status, type: type, url_id: url_id } """ - -def flatten_fortinet_jmespath(raw_data: Dict[str, Any]) -> List[Dict[str, Any]]: - """Flatten nested Fortinet URL data into normalized records.""" +def flatten_fortinet_jmespath(raw_data): + """Flatten nested Fortinet dict into a list of record dictionaries.""" flat = [] for _, url_list in raw_data.items(): @@ -64,105 +56,79 @@ def flatten_fortinet_jmespath(raw_data: Dict[str, Any]) -> List[Dict[str, Any]]: list_name = url_list["filter_name"] for _, item in url_list["urls"].items(): - flat.append( - { - "pattern": item["url"], - "action": item["action"], - "status": item["status"], - "type": item["type"], - "url_id": item["url_id"], - "list_id": list_id, - "list_name": list_name, - "category_id": "Uncategorized", - } - ) + flat.append({ + "pattern": item["url"], + "type": item["type"], + "url_id": item["url_id"], + "list_id": list_id, + "list_name": list_name, + }) return flat +# ---------------- MAPPER & EXPORTER ---------------- class FortinetMapper: - """Map transformed dictionaries into URL_UDM instances.""" + """Handle semantic alignment and Pydantic UDM instantiation.""" def to_udm(self, item: Dict[str, Any]) -> URL_UDM: - """Convert a transformed dictionary into a validated URL_UDM.""" - cat_id = item.get("category_id", "uncategorized") - categories = [ - Category( - id=cat_id, - name=cat_id.capitalize(), - type="standard", - ) - ] - + """Convert a transformed dictionary into a validated URL_UDM instance.""" + # Construct the Metadata model + # MetadataEnricher provides the ISO timestamp string meta = Metadata( - processed_at=datetime.fromisoformat( - item["metadata"]["processed_at"] - ), - source=item["metadata"].get("source"), + processed_at=datetime.fromisoformat(item["metadata"]["processed_at"]), ) return URL_UDM( pattern=item["pattern"], type=item["type"], - action=item["action"], - status="enable", url_list_id=str(item["list_id"]), url_list_name=item["list_name"], - categories=categories, vendor=item["vendor"], metadata=meta, - notes=item.get("notes"), ) - class FortinetExporter: - """Export URL_UDM records into Fortinet format.""" + """Universal Model -> Fortinet Format.""" def transform(self, udm: URL_UDM) -> Dict[str, Any]: """Reconstruct Fortinet-specific pattern and type syntax.""" - reverse_type_map = { - value: key for key, value in FORTINET_TYPE_MAP.items() - } + # Reverse mapping for Type + reverse_type_map = {v: k for k, v in FORTINET_TYPE_MAP.items()} return { "url": udm.pattern, - "type": reverse_type_map.get(udm.type, "simple"), + "type": reverse_type_map.get(udm.type, "simple") } - -def run_universal_to_fortinet_pipeline( - records: List[URL_UDM], -) -> List[Dict[str, Any]]: - """Transform universal records into Fortinet export records.""" +def run_universal_to_fortinet_pipeline(records: List[URL_UDM]) -> List[dict]: + """Execute the pipeline to convert UDM records back to Fortinet dicts.""" output = [] - for record in records: - output.append( - { - "pattern": record.pattern, - "type": record.type, - "action": record.action, - "list_id": record.url_list_id, - "list_name": record.url_list_name, - } - ) + for r in records: + output.append({ + "pattern": r.pattern, + "type": r.type, + "list_id": r.url_list_id, + "list_name": r.url_list_name + }) return output -def export_fortinet_json(records: List[Dict[str, Any]]) -> Dict[str, Any]: - """Convert flat Fortinet records into grouped Fortinet JSON.""" +def export_fortinet_json(records: List[dict]) -> dict: + """Group flat records into the Fortinet-specific JSON structure.""" grouped = {} counters = {} - for record in records: - key = record["list_id"] + for r in records: + key = r["list_id"] if key not in grouped: grouped[key] = { - "object_id": record["list_id"], - "filter_name": record["list_name"], - "urls": {}, + "object_id": r["list_id"], + "filter_name": r["list_name"], + "urls": {} } counters[key] = 0 @@ -171,41 +137,42 @@ def export_fortinet_json(records: List[Dict[str, Any]]) -> Dict[str, Any]: grouped[key]["urls"][idx] = { "url_id": str(counters[key]), - "url": record["pattern"], - "type": record["type"], - "action": record["action"], - "status": "enable", + "url": r["pattern"], + "type": r["type"], } return grouped +# ---------------- EXECUTION PIPELINE ---------------- -def run_fortinet_to_universal_pipeline( - raw_data: Dict[str, Any], -) -> List[URL_UDM]: - """Run the full Fortinet-to-universal transformation pipeline.""" +def run_fortinet_to_universal_pipeline(raw_data: Dict[str, Any]) -> List[URL_UDM]: + """Orchestrate deterministic flow from raw Fortinet data to UDM objects.""" + # 1. Extraction flat_data = flatten_fortinet_jmespath(raw_data) + # 2. Transformation Pipeline steps = [ - ActionMapper(FORTINET_ACTION_MAP), PatternNormalizer(), TypeMapper(FORTINET_TYPE_MAP), - CategoryMapper(FORTINET_CATEGORY_MAP), - MetadataEnricher("fortinet"), + MetadataEnricher("fortinet") ] mapper = FortinetMapper() udm_records = [] for record in flat_data: + # Apply each modular transformation unit for step in steps: record = step.transform(record) + # 3. Validation & Pydantic Conversion udm_records.append(mapper.to_udm(record)) return udm_records +# ---------------- REGISTRATION ---------------- +# This is what debugPythonScript.py is looking for VENDOR_TO_UNIVERSAL_PIPELINES = { - "fortinet": run_fortinet_to_universal_pipeline, + "fortinet": run_fortinet_to_universal_pipeline } diff --git a/transformers/domains/url/vendors/netskope.py b/transformers/domains/url/vendors/netskope.py index 7a7a67e..a0ac54f 100644 --- a/transformers/domains/url/vendors/netskope.py +++ b/transformers/domains/url/vendors/netskope.py @@ -1,21 +1,26 @@ """ Netskope URL Domain Integration. -This module implements the transformer, mapper, and exporter for Netskope, +This module implements the Transformer, Mapper, and Exporter for Netskope, converting between Netskope-specific configurations and the Pydantic Unified Data Model (UDM). """ +import re from datetime import datetime from typing import Any +from typing import Dict from typing import List from typing import Optional import jmespath +# Domain Model imports from transformers.domains.url.models import URL_UDM from transformers.domains.url.models import Category from transformers.domains.url.models import Metadata +# Framework imports - Absolute paths +from transformers.framework.udm_transformers.action_mapper import ActionMapper from transformers.framework.udm_transformers.category_mapper import \ CategoryMapper from transformers.framework.udm_transformers.metadata_enricher import \ @@ -24,24 +29,23 @@ PatternNormalizer from transformers.framework.udm_transformers.type_mapper import TypeMapper -NETSKOPE_ACTION_MAP = { - "block": "deny", - "allow": "allow", - "monitor": "allow", -} +# ---------------- NETSKOPE MAPPINGS ---------------- -NETSKOPE_CATEGORY_MAP = { - "malware": "malware", - "phishing": "phishing", - "gambling": "gambling", - "uncategorized": "uncategorized", -} NETSKOPE_TO_UNIVERSAL_TYPE_MAP = { "exact": "literal", "regex": "regex", } +UNIVERSAL_TO_NETSKOPE_TYPE_MAP = { + "literal": "exact", + "regex": "regex", + "wildcard": "regex", + "substring": "regex", +} + +# ---------------- EXTRACTION LAYER ---------------- + JMESPATH_NETSKOPE = """ values(@)[?modify_type!='Deleted'].{ list_name: name, @@ -51,155 +55,154 @@ } """ - def flatten_netskope_jmespath(url_lists: Dict[str, Any]) -> List[Dict[str, Any]]: - """Flatten Netskope hierarchical data using JMESPath extraction.""" + """Flatten the structure using jmespath.""" extracted = jmespath.search(JMESPATH_NETSKOPE, url_lists) or [] flat = [] + now_iso = datetime.utcnow().isoformat() + for lst in extracted: for entry in lst.get("urls", []): url = entry.get("url") if not url: continue - - flat.append( - { - "pattern": url, - "action": "allow", - "category_id": "Uncategorized", - "list_name": lst["list_name"], - "list_id": str(lst["list_id"]), - "type": lst["type"], - } - ) - + flat.append({ + "pattern": url, + "list_name": lst["list_name"], + "list_id": str(lst["list_id"]), + "type": lst["type"], + "metadata": {"processed_at": now_iso} + }) return flat +# ---------------- TRANSFORMERS ---------------- class NetskopePatternNormalizer(BaseTransformer): - """Normalize Netskope URL patterns for universal compatibility.""" + """Normalize Netskope patterns for the universal model.""" def wildcard_to_regex(self, pattern: str) -> str: - """Convert wildcard pattern to regex format.""" - if pattern.startswith("*."): - domain = pattern[2:].replace(".", r"\.") - return rf"^([^.]+\.)*{domain}$" - return pattern + """Convert a wildcard pattern to a regex string.""" + if not pattern.startswith("*."): + return pattern + + domain = re.escape(pattern[2:]) + return rf"^([^.]+\.)*{domain}$" def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: - """Normalize pattern into Netskope-compatible format.""" + """Transform Netskope patterns into normalized UDM formats.""" + item = item.copy() + raw_pattern = item.get("pattern", "") universal_type = item.get("type", "literal") if universal_type in ("literal", "exact"): item["pattern"] = raw_pattern - item["netskope_type"] = "exact" + item["type"] = "literal" + elif universal_type in ("wildcard", "regex"): item["pattern"] = self.wildcard_to_regex(raw_pattern) - item["netskope_type"] = "regex" + item["type"] = "regex" + else: item["pattern"] = raw_pattern - item["netskope_type"] = "exact" + item["type"] = "literal" return item class NetskopePatternDenormalizer(BaseTransformer): - """Convert Netskope patterns back to universal format.""" + """Convert Netskope patterns back to universal model patterns.""" def regex_to_wildcard(self, pattern: str) -> Optional[str]: - """Attempt to convert regex to wildcard pattern.""" - wildcard_regex = r"^\^\(\[\^\.\]\+\\\.\)\*(.+)\\\.([a-zA-Z0-9\-]+)\$$" - match = re.match(wildcard_regex, pattern) - if match: - domain = f"{match.group(1)}.{match.group(2)}" + """Attempt to convert a regex back to a wildcard string.""" + prefix = "^([^.]+\\.)*" + suffix = "$" + + if pattern.startswith(prefix) and pattern.endswith(suffix): + domain = pattern[len(prefix):-len(suffix)] + domain = domain.replace("\\.", ".") return f"*.{domain}" + return None def is_regex(self, pattern: str) -> bool: - """Check if a pattern contains regex syntax.""" + """Check if a pattern contains regex special characters.""" regex_markers = ("^", "$", "(", ")", "[", "]", "+", "?", "|", "{", "}") return any(marker in pattern for marker in regex_markers) def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: - """Denormalize Netskope pattern into universal format.""" + """Denormalize patterns into standard UDM type and format.""" pattern = item.get("pattern", "").replace("\\\\", "\\") + # already wildcard if pattern.startswith("*.") and pattern.count("*") == 1: item["type"] = "wildcard" - elif "*" in pattern: - item["type"] = "regex" + + # regex patterns FIRST elif self.is_regex(pattern): wildcard = self.regex_to_wildcard(pattern) + if wildcard: item["type"] = "wildcard" pattern = wildcard else: item["type"] = "regex" + + # non-regex wildcard syntax + elif "*" in pattern: + item["type"] = "wildcard" + else: - item["type"] = "exact" + item["type"] = "literal" item["pattern"] = pattern item.pop("netskope_type", None) + return item +# ---------------- MAPPER & EXPORTER ---------------- class NetskopeMapper: - """Handles semantic mapping into the Unified Data Model.""" + """Handle semantic alignment and Pydantic UDM instantiation.""" def to_udm(self, item: Dict[str, Any]) -> URL_UDM: - """Convert transformed dictionary into URL_UDM instance.""" - cat_id = item.get("category_id", "uncategorized") - categories = [ - Category(id=cat_id, name=cat_id.capitalize(), type="standard") - ] - + """Convert transformed dictionary into validated URL_UDM instance.""" meta = Metadata( - processed_at=datetime.fromisoformat( - item["metadata"]["processed_at"] - ), - source="netskope", + processed_at=datetime.fromisoformat(item["metadata"]["processed_at"]), ) return URL_UDM( pattern=item["pattern"], type=item["type"], - action=item["action"], - status="enable", url_list_id=item["list_id"], url_list_name=item["list_name"], - categories=categories, - vendor=item["vendor"], - metadata=meta, - notes=item.get("notes"), + vendor="netskope", ) - class NetskopeExporter: - """Convert UDM objects into Netskope format.""" + """Universal Model -> Netskope Format.""" - def transform(self, udm: URL_UDM) -> Dict[str, Any]: - """Convert UDM object into Netskope-compatible structure.""" + def transform(self, item: Dict[str, Any]) -> Dict[str, Any]: + """Convert UDM fields into Netskope-specific schema.""" return { - "pattern": udm.pattern, - "type": udm.type, - "action": udm.action, + "object_id": item.get("url_list_id"), + "name": item.get("url_list_name"), + "data_type": item.get("type"), + "data_urls": item.get("urls", []) } +# ---------------- EXECUTION PIPELINE ---------------- -def run_netskope_to_universal_pipeline( - raw_data: Dict[str, Any], -) -> List[URL_UDM]: - """Run full Netskope ingestion pipeline into UDM objects.""" +def run_netskope_to_universal_pipeline(raw_data: Dict[str, Any]) -> List[URL_UDM]: + """Orchestrate the flow from raw Netskope data to UDM objects.""" + # 1. Extraction flat_data = flatten_netskope_jmespath(raw_data) + # 2. Transformation Pipeline steps = [ - ActionMapper(NETSKOPE_ACTION_MAP), TypeMapper(NETSKOPE_TO_UNIVERSAL_TYPE_MAP), NetskopePatternDenormalizer(), - CategoryMapper(NETSKOPE_CATEGORY_MAP), - MetadataEnricher("netskope"), ] mapper = NetskopeMapper() @@ -211,3 +214,67 @@ def run_netskope_to_universal_pipeline( udm_records.append(mapper.to_udm(record)) return udm_records + +def run_universal_to_netskope_pipeline(udm_records: List[Any]) -> List[Dict[str, Any]]: + """Convert UDM records into the structured Netskope payload.""" + if not udm_records: + return [] + + steps = [ + TypeMapper(UNIVERSAL_TO_NETSKOPE_TYPE_MAP), + NetskopePatternNormalizer(), + MetadataEnricher("netskope") + ] + + grouped = defaultdict( + lambda: { + "name": "", + "data_type": "literal", + "data_urls": set() + } + ) + + for entry in udm_records: + + # Apply all transformers sequentially + transformed = entry + + for step in steps: + transformed = step.transform(transformed) + + # Use transformed record + obj_id = str(transformed.get("url_list_id", "0")) + name = transformed.get("url_list_name", "Default_List") + url_val = transformed.get("pattern", "") + d_type = transformed.get("type", "literal") + + if not url_val: + continue + + group = grouped[obj_id] + group["name"] = name + group["data_urls"].add(url_val) + + if d_type == "wildcard" or d_type == "regex" or "*" in url_val: + group["data_type"] = "regex" + + final_payload = [] + + for oid, data in grouped.items(): + final_payload.append({ + "object_id": int(oid) if oid.isdigit() else oid, + "name": data["name"], + "data_type": data["data_type"], + "data_urls": sorted(list(data["data_urls"])) + }) + + return final_payload + +# Pipeline definition +VENDOR_TO_UNIVERSAL_PIPELINES = { + "netskope": run_netskope_to_universal_pipeline +} + +UNIVERSAL_TO_VENDOR_PIPELINES = { + "netskope": run_universal_to_netskope_pipeline +}