Skip to content

Commit

Permalink
squashme: suggestions and stuff
Browse files Browse the repository at this point in the history
  • Loading branch information
half-duplex committed May 16, 2022
1 parent 9e51e47 commit 57ec25c
Show file tree
Hide file tree
Showing 2 changed files with 26 additions and 25 deletions.
47 changes: 24 additions & 23 deletions sopel/modules/safety.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
import threading
from time import sleep
from typing import Dict, Optional
from urllib.parse import urlparse
from urllib.parse import urlparse, urlunparse

import requests

Expand Down Expand Up @@ -116,12 +116,13 @@ def setup(bot: Sopel):
update_local_cache(bot, init=True)


def zwsp(text: str) -> str:
"""Insert a zero-width space between each character.
Useful for reducing highlights and clickability of links.
"""
return "\u200B".join(text)
def safeify_url(url: str) -> str:
"""Replace bits of a URL to make it hard to browse to."""
parts = urlparse(url)
scheme = "hxx" + parts.scheme[3:] # hxxp
netloc = parts.netloc.replace(".", "[.]") # google[.]com and IPv4
netloc = netloc.replace(":", "[:]") # IPv6 addresses (bad lazy method)
return urlunparse((scheme, netloc) + parts[2:])


def download_domain_list(bot: Sopel, path: str) -> bool:
Expand Down Expand Up @@ -171,8 +172,8 @@ def update_local_cache(bot: Sopel, init: bool = False):
if not updated and not init:
return

LOGGER.debug("Loading new unsafedomains list")
malware_domains = set()
LOGGER.debug("Loading new unsafe domain list")
unsafe_domains = set()
with open(path, "r") as f:
for line in f:
clean_line = str(line).strip().lower()
Expand All @@ -189,8 +190,8 @@ def update_local_cache(bot: Sopel, init: bool = False):

if '.' in domain:
# only publicly routable domains matter; skip loopback/link-local stuff
malware_domains.add(domain)
bot.memory["safety_cache_local"] = malware_domains
unsafe_domains.add(domain)
bot.memory["safety_cache_local"] = unsafe_domains


def shutdown(bot: Sopel):
Expand All @@ -215,21 +216,21 @@ def url_handler(bot: Sopel, trigger: Trigger):
strict = "strict" in mode

for url in tools.web.search_urls(trigger):
safe_url = "hxx" + url[3:]
safe_url = safeify_url(url)

positives = 0 # Number of engines saying it's malicious
total = 0 # Number of total engines

try:
netloc = urlparse(url).netloc.lower()
hostname = urlparse(url).hostname.lower()
except ValueError:
pass # Invalid address
else:
if any(regex.search(netloc) for regex in known_good):
if any(regex.search(hostname) for regex in known_good):
continue # explicitly trusted

if netloc in bot.memory["safety_cache_local"]:
LOGGER.debug("[local] domain in blocklist: %r", netloc)
if hostname in bot.memory["safety_cache_local"]:
LOGGER.debug("[local] domain in blocklist: %r", hostname)
positives += 1
total += 1

Expand Down Expand Up @@ -280,9 +281,10 @@ def virustotal_lookup(
# VT only does http/https URLs
return None

safe_url = "hxx" + url[3:]
safe_url = safeify_url(url)

oldest_cache = datetime(1970, 1, 1, 0, 0) # default: use any cache available
# default: use any cache available
oldest_cache = datetime(1970, 1, 1, 0, 0, tzinfo=timezone.utc)
if max_cache_age is not None:
oldest_cache = datetime.now(timezone.utc) - max_cache_age
cache = bot.memory["safety_cache"]
Expand Down Expand Up @@ -312,7 +314,7 @@ def virustotal_lookup(
elif not requested and r.status_code == 404:
# Not analyzed - submit new
LOGGER.debug("[VirusTotal] No scan for %r, requesting", safe_url)
r = requests.post(
requests.post(
VT_API_URL,
data={"url": url},
headers={"x-apikey": bot.settings.safety.vt_api_key},
Expand Down Expand Up @@ -343,7 +345,6 @@ def virustotal_lookup(
"positives": last_analysis["malicious"],
"total": last_analysis["malicious"] + last_analysis["harmless"],
"fetched": fetched,
# Subject to change formats!
"virustotal_data": vt_data["data"]["attributes"],
}
bot.memory["safety_cache"][url] = result
Expand All @@ -363,7 +364,7 @@ def vt_command(bot: Sopel, trigger: Trigger):
return

url = trigger.group(2)
zwsp_safe_url = zwsp("hxx" + url[3:])
safe_url = safeify_url(url)

result = virustotal_lookup(bot, url, max_cache_age=timedelta(minutes=1))
if not result:
Expand Down Expand Up @@ -395,8 +396,7 @@ def vt_command(bot: Sopel, trigger: Trigger):
timezone.utc,
)
bot.reply(
"Results for {}: {} as of {}".format(
zwsp_safe_url,
"Results: {} at {} for {}".format(
results_str,
tools.time.format_time(
bot.db,
Expand All @@ -405,6 +405,7 @@ def vt_command(bot: Sopel, trigger: Trigger):
channel=trigger.sender,
time=vt_scan_time,
),
safe_url,
)
)

Expand Down
4 changes: 2 additions & 2 deletions sopel/modules/url.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,8 +312,8 @@ def title_auto(bot, trigger):
if url in bot.memory.get("safety_cache", {}):
if bot.memory["safety_cache"][url]["positives"] > 0:
continue
netloc = urlparse(url).netloc.lower()
if netloc in bot.memory.get("safety_cache_local", {}):
hostname = urlparse(url).hostname.lower()
if hostname in bot.memory.get("safety_cache_local", {}):
continue
urls.append(url)

Expand Down

0 comments on commit 57ec25c

Please sign in to comment.