Skip to content

Commit

Permalink
Active: rewrite script merger
Browse files Browse the repository at this point in the history
  • Loading branch information
p-l- committed Mar 24, 2023
1 parent 637db85 commit ef04f78
Showing 1 changed file with 117 additions and 62 deletions.
179 changes: 117 additions & 62 deletions ivre/active/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -831,35 +831,98 @@ def set_auto_tags(host: NmapHost, update_openports: bool = True) -> None:
add_tags(host, gen_auto_tags(host, update_openports=update_openports))


def merge_ja3_scripts(
curscript: NmapScript, script: NmapScript, script_id: str
) -> NmapScript:
def is_server(script_id: str) -> bool:
return script_id == "ssl-ja3-server"
SCRIPT_MERGERS = {}

def ja3_equals(a: Dict[str, Any], b: Dict[str, Any], script_id: str) -> bool:
return a["md5"] == b["md5"] and (
not is_server(script_id) or a["client"]["md5"] == b["client"]["md5"]
)

def ja3_output(ja3: Dict[str, Any], script_id: str) -> str:
output = cast(str, ja3["md5"])
if is_server(script_id):
output += " - " + ja3["client"]["md5"]
return output
class ScriptMerger:
def __init__(self, script: NmapScript) -> None:
self.script = script
self.script_id_alias = ALIASES_TABLE_ELEMS.get(script["id"], script["id"])

return _merge_scripts(curscript, script, script_id, ja3_equals, ja3_output)
@staticmethod
def output(data: Any) -> str:
raise NotImplementedError

def merge_data(self, *scripts: Any) -> Any:
raise NotImplementedError

def merge(self, *scripts: NmapScript) -> NmapScript:
data = self.merge_data(*scripts)
return {
"id": self.script["id"],
"output": self.output(data),
self.script_id_alias: data,
}


def register_merger(id_: str) -> Callable[[type[ScriptMerger]], type[ScriptMerger]]:
def decorator(cls: type[ScriptMerger]) -> type[ScriptMerger]:
SCRIPT_MERGERS[id_] = cls
return cls

return decorator


@register_merger("http-user-agent")
class ListValueScriptMerger(ScriptMerger):
def merge_data(self, *scripts: NmapScript) -> List[Any]:
data = set(self.script.get(self.script_id_alias, []))
for new_script in scripts:
data.update(new_script.get(self.script_id_alias, []))
return sorted(data)

@staticmethod
def output(data: List[Any]) -> str:
return "".join(f"{elt}\n" for elt in data)


class ListDictScriptMerger(ScriptMerger):
@staticmethod
def key(elt: Dict[str, Any]) -> Any:
raise NotImplementedError

def merge_data(self, *scripts: NmapScript) -> List[Dict[str, Any]]:
data = {self.key(elt): elt for elt in self.script.get(self.script_id_alias, [])}
for new_script in scripts:
for new_data in new_script.get(self.script_id_alias, []):
key = self.key(new_data)
if key in data:
data[key].update(new_data)
else:
data[key] = new_data
return [data[key] for key in sorted(data)]


@register_merger("ssl-ja3-client")
class Ja3ClientScriptMerger(ListDictScriptMerger):
@staticmethod
def key(elt: Dict[str, Any]) -> str:
return cast(str, elt["md5"])

@staticmethod
def output(data: List[Dict[str, Any]]) -> str:
return "".join(f"{elt['md5']}\n" for elt in data)

def merge_http_app_scripts(
curscript: NmapScript, script: NmapScript, script_id: str
) -> NmapScript:
def http_app_equals(a: Dict[str, Any], b: Dict[str, Any], script_id: str) -> bool:
return cast(
bool, a["application"] == b["application"] and a["path"] == b["path"]
)

def http_app_output(app: Dict[str, Any], script_id: str) -> str:
@register_merger("ssl-ja3-server")
class Ja3ServerScriptMerger(ListDictScriptMerger):
@staticmethod
def key(elt: Dict[str, Any]) -> Tuple[str, str]:
return (elt["md5"], elt["client"]["md5"])

@staticmethod
def output(data: List[Dict[str, Any]]) -> str:
return "".join(f"{elt['md5']} - {elt['client']['md5']}\n" for elt in data)


@register_merger("http-app")
class HttpAppScriptMerger(ListDictScriptMerger):
@staticmethod
def key(elt: Dict[str, Any]) -> Tuple[str, str]:
return (elt["application"], elt["path"])

@staticmethod
def output_one(app: Dict[str, Any]) -> str:
output = ["%(application)s: path %(path)s" % app]
if app.get("version") is not None:
output.append(", version %(version)s" % app)
Expand All @@ -872,41 +935,30 @@ def http_app_output(app: Dict[str, Any], script_id: str) -> str:
output.append(" (%(parsed_version)s)" % app)
return "".join(output)

return _merge_scripts(
curscript, script, script_id, http_app_equals, http_app_output
)


def merge_ua_scripts(
curscript: NmapScript, script: NmapScript, script_id: str
) -> NmapScript:
def ua_equals(a: str, b: str, script_id: str) -> bool:
return a == b

def ua_output(ua: str, script_id: str) -> str:
return ua
@classmethod
def output(cls, data: List[Dict[str, Any]]) -> str:
return "".join(f"{cls.output_one(elt)}\n" for elt in data)

return _merge_scripts(curscript, script, script_id, ua_equals, ua_output)

@register_merger("ssl-cert")
@register_merger("ssl-cacert")
class SslCertScriptMerger(ListDictScriptMerger):
@staticmethod
def key(elt: Dict[str, Any]) -> str:
return cast(str, elt["sha256"])

def merge_ssl_cert_scripts(
curscript: NmapScript, script: NmapScript, script_id: str
) -> NmapScript:
def cert_equals(a: ParsedCertificate, b: ParsedCertificate, script_id: str) -> bool:
return cast(bool, a["sha256"] == b["sha256"])

def cert_output(cert: Dict[str, Any], script_id: str) -> str:
@staticmethod
def output_one(cert: Dict[str, Any]) -> str:
return "\n".join(create_ssl_output(cert))

return _merge_scripts(
curscript,
script,
script_id,
cert_equals,
cert_output,
outsep="\n------------------------------------------------------------"
"----\n",
)
@classmethod
def output(cls, data: List[Dict[str, Any]]) -> str:
return (
"\n----------------------------------------------------------------\n".join(
f"{cls.output_one(elt)}" for elt in data
)
+ "\n"
)


def merge_axfr_scripts(
Expand Down Expand Up @@ -1266,16 +1318,10 @@ def _merge_scripts(
"dns-domains": merge_dns_domains_scripts,
"dns-tls-rpt": merge_dns_tls_rpt_scripts,
"dns-zone-transfer": merge_axfr_scripts,
"http-app": merge_http_app_scripts,
"http-git": merge_http_git_scripts,
"http-nuclei": merge_nuclei_scripts,
"http-user-agent": merge_ua_scripts,
"network-nuclei": merge_nuclei_scripts,
"scanner": merge_scanner_scripts,
"ssl-cacert": merge_ssl_cert_scripts,
"ssl-cert": merge_ssl_cert_scripts,
"ssl-ja3-client": merge_ja3_scripts,
"ssl-ja3-server": merge_ja3_scripts,
}


Expand All @@ -1286,7 +1332,16 @@ def merge_scripts(
try:
func = _SCRIPT_MERGE[script_id]
except KeyError:
return {}
try:
merger = SCRIPT_MERGERS[script_id]
except KeyError:
return {}

def func(
curscript: NmapScript, script: NmapScript, script_id: str
) -> NmapScript:
return merger(curscript).merge(script)

return func(curscript, script, script_id)


Expand Down Expand Up @@ -1700,7 +1755,7 @@ def handle_http_headers(
except StopIteration:
port.setdefault("scripts", []).append(script)
else:
merge_http_app_scripts(cur_script, script, "http-app")
cur_script.update(HttpAppScriptMerger(cur_script).merge(script))
# * Add a script "http-app" for Kibana, and merge it if necessary
try:
header = next(
Expand Down Expand Up @@ -1745,4 +1800,4 @@ def handle_http_headers(
except StopIteration:
port.setdefault("scripts", []).append(script)
else:
merge_http_app_scripts(cur_script, script, "http-app")
cur_script.update(HttpAppScriptMerger(cur_script).merge(script))

0 comments on commit ef04f78

Please sign in to comment.