Skip to content

Commit

Permalink
squashme: suggestions & domain list download refactor
Browse files Browse the repository at this point in the history
  • Loading branch information
half-duplex committed May 15, 2022
1 parent 0544f91 commit 9e51e47
Showing 1 changed file with 193 additions and 68 deletions.
261 changes: 193 additions & 68 deletions sopel/modules/safety.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,22 @@
from __future__ import annotations

from base64 import urlsafe_b64encode
from datetime import datetime, timedelta, timezone
import json
import logging
import os.path
import re
import threading
import time
from time import sleep
from typing import Dict, Optional
from urllib.parse import urlparse
from urllib.request import urlretrieve

import requests

from sopel import formatting, plugin, tools
from sopel import plugin, tools
from sopel.bot import Sopel
from sopel.config import Config, types
from sopel.formatting import bold, color, colors
from sopel.trigger import Trigger


Expand All @@ -41,7 +42,7 @@ class SafetySection(types.StaticSection):
default_mode = types.ValidatedAttribute("default_mode")
"""Which mode to use in channels without a mode set."""
known_good = types.ListAttribute('known_good')
"""List of "known good" domains to ignore."""
"""List of "known good" domains or regexes to consider trusted."""
vt_api_key = types.ValidatedAttribute('vt_api_key')
"""Optional VirusTotal API key (improves malicious URL detection)."""
domain_blocklist_url = types.ValidatedAttribute("domain_blocklist_url")
Expand All @@ -53,7 +54,7 @@ def configure(settings: Config):
| name | example | purpose |
| ---- | ------- | ------- |
| default\\_mode | on | Which mode to use in channels without a mode set. |
| known\\_good | sopel.chat,dftba.net | List of "known good" domains or regexes to ignore. This can save VT API calls. |
| known\\_good | sopel.chat,dftba.net | List of "known good" domains or regexes to consider trusted. This can save VT API calls. |
| vt\\_api\\_key | 0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdef | Optional VirusTotal API key to improve malicious URL detection |
| domain\\_blocklist\\_url | https://example.com/bad-hosts.txt | Optional hosts-file formatted domain blocklist to use instead of StevenBlack's. |
"""
Expand All @@ -68,7 +69,7 @@ def configure(settings: Config):
)
settings.safety.configure_setting(
'known_good',
"Enter any domains to allowlist",
"Enter any domains or regexes to consider trusted",
)
settings.safety.configure_setting(
'vt_api_key',
Expand Down Expand Up @@ -101,17 +102,7 @@ def setup(bot: Sopel):
for item in bot.settings.safety.known_good:
known_good.append(re.compile(item, re.I))

update_local_cache(bot, init=True)


def update_local_cache(bot: Sopel, init: bool = False):
"""Download the current malware domain list and load it into memory.
:param init: Load the file even if it's unchanged
"""

malware_domains = set()

# clean up old files
old_file = os.path.join(bot.settings.homedir, "malwaredomains.txt")
if os.path.exists(old_file) and os.path.isfile(old_file):
LOGGER.info('Removing old malwaredomains file from %s', old_file)
Expand All @@ -122,19 +113,67 @@ def update_local_cache(bot: Sopel, init: bool = False):
# Python on Windows throws an exception if the file is in use
LOGGER.info('Could not delete %s: %s', old_file, str(err))

loc = os.path.join(bot.settings.homedir, "unsafedomains.txt")
if not os.path.isfile(loc) or os.path.getmtime(loc) < time.time() - 24 * 60 * 60:
# File doesn't exist or is older than one day — update it
url = bot.settings.safety.domain_blocklist_url
if url is None or not url.startswith("http"):
url = "https://raw.githubusercontent.com/StevenBlack/hosts/master/hosts"
LOGGER.info("Downloading malicious domain list from %s", url)
# TODO: Can we use a cache header to avoid the download if it's unmodified?
urlretrieve(url, loc)
elif not init:
update_local_cache(bot, init=True)


def zwsp(text: str) -> str:
"""Insert a zero-width space between each character.
Useful for reducing highlights and clickability of links.
"""
return "\u200B".join(text)


def download_domain_list(bot: Sopel, path: str) -> bool:
"""Download the current unsafe domain list.
:param path: Where to save the unsafe domain list
:returns: True if the list was updated
"""
url = bot.settings.safety.domain_blocklist_url
if url is None or not url.startswith("http"):
url = "https://raw.githubusercontent.com/StevenBlack/hosts/master/hosts"
LOGGER.info("Downloading unsafe domain list from %s", url)
old_etag = bot.db.get_plugin_value("safety", "unsafe_domain_list_etag")
if old_etag:
r = requests.head(url)
if r.headers["ETag"] == old_etag and os.path.isfile(path):
LOGGER.debug("Unsafe domain list unchanged, skipping")
return False

r = requests.get(url, stream=True)
try:
r.raise_for_status()
with open(path + ".new", "wb") as f:
for data in r.iter_content(None):
f.write(data)
except Exception:
# don't bother handling, we'll try again tomorrow
LOGGER.warning("Unsafe domain list download failed, using cache")
return False
# .new+move so we don't clobber it if the download fails in the middle
os.rename(path + ".new", path)
bot.db.set_plugin_value("safety", "unsafe_domain_list_etag", r.headers.get("etag"))
return True


def update_local_cache(bot: Sopel, init: bool = False):
"""Download the current malware domain list and load it into memory.
:param init: Load the file even if it's unchanged
"""
path = os.path.join(bot.settings.homedir, "unsafedomains.txt")
updated = download_domain_list(bot, path)
if not os.path.isfile(path):
LOGGER.warning("Could not load unsafe domain list")
return

if not updated and not init:
return

with open(loc, 'r') as f:
LOGGER.debug("Loading new unsafedomains list")
malware_domains = set()
with open(path, "r") as f:
for line in f:
clean_line = str(line).strip().lower()
if not clean_line or clean_line[0] == '#':
Expand All @@ -151,7 +190,6 @@ def update_local_cache(bot: Sopel, init: bool = False):
if '.' in domain:
# only publicly routable domains matter; skip loopback/link-local stuff
malware_domains.add(domain)

bot.memory["safety_cache_local"] = malware_domains


Expand Down Expand Up @@ -188,7 +226,7 @@ def url_handler(bot: Sopel, trigger: Trigger):
pass # Invalid address
else:
if any(regex.search(netloc) for regex in known_good):
continue # explicitly allowed
continue # explicitly trusted

if netloc in bot.memory["safety_cache_local"]:
LOGGER.debug("[local] domain in blocklist: %r", netloc)
Expand All @@ -212,80 +250,167 @@ def url_handler(bot: Sopel, trigger: Trigger):
)
bot.say(
"{} {} of {} engine{} flagged a link {} posted as malicious".format(
formatting.bold(formatting.color("WARNING:", "red")),
bold(color("WARNING:", colors.RED)),
positives,
total,
"" if total == 1 else "s",
formatting.bold(trigger.nick),
bold(trigger.nick),
)
)
if strict:
bot.kick(trigger.nick, trigger.sender, "Posted a malicious link")


def virustotal_lookup(bot: Sopel, url: str, local_only: bool = False) -> Optional[Dict]:
def virustotal_lookup(
bot: Sopel,
url: str,
local_only: bool = False,
max_cache_age: Optional[timedelta] = None,
) -> Optional[Dict]:
"""Check VirusTotal for flags on a URL as malicious.
:param url: The URL to look up
:param local_only: If set, only check cache, do not make a new request.
:returns: A dict containing information about findings, or None if not found.
:param local_only: If set, only check cache, do not make a new request
:param max_cache_age: If set, don't use cache older than this value.
:returns: A dict containing information about findings, or None if not found
"""
if url.startswith("hxxp"):
url = "htt" + url[3:]
elif not url.startswith("http"):
# VT only does http/https URLs
return None

safe_url = "hxx" + url[3:]

if url in bot.memory["safety_cache"]:
oldest_cache = datetime(1970, 1, 1, 0, 0) # default: use any cache available
if max_cache_age is not None:
oldest_cache = datetime.now(timezone.utc) - max_cache_age
cache = bot.memory["safety_cache"]
if url in cache and cache[url]["fetched"] > oldest_cache:
LOGGER.debug("[VirusTotal] Using cached data for %r", safe_url)
return bot.memory["safety_cache"].get(url)
if local_only:
return None

LOGGER.debug("[VirusTotal] Looking up %r", safe_url)
url_id = urlsafe_b64encode(url.encode("utf-8")).rstrip(b"=").decode("ascii")
try:
r = requests.get(
VT_API_URL + "/" + url_id,
headers={"x-apikey": bot.settings.safety.vt_api_key},
)

if r.status_code == 404:
# Not analyzed - submit new
LOGGER.debug("[VirusTotal] No scan for %r, requesting", safe_url)
# TODO: handle checking back for results from queued scans
r = requests.post(
VT_API_URL,
data={"url": url},
attempts = 5
requested = False
while attempts > 0:
attempts -= 1
try:
r = requests.get(
VT_API_URL + "/" + url_id,
headers={"x-apikey": bot.settings.safety.vt_api_key},
)
if r.status_code == 200:
vt_data = r.json()
last_analysis = vt_data["data"]["attributes"]["last_analysis_stats"]
# VT returns 200s for recent submissions before scan results are in...
if not requested or sum(last_analysis.values()) > 0:
break
elif not requested and r.status_code == 404:
# Not analyzed - submit new
LOGGER.debug("[VirusTotal] No scan for %r, requesting", safe_url)
r = requests.post(
VT_API_URL,
data={"url": url},
headers={"x-apikey": bot.settings.safety.vt_api_key},
)
requested = True
sleep(2) # Scans seem to take ~5s minimum, so add 2s
except requests.exceptions.RequestException:
# Ignoring exceptions with VT so domain list will always work
LOGGER.debug(
"[VirusTotal] Error obtaining response for %r", safe_url, exc_info=True
)
return None
r.raise_for_status()
vt_data = r.json()
except requests.exceptions.RequestException:
# Ignoring exceptions with VT so domain list will always work
LOGGER.debug(
"[VirusTotal] Error obtaining response for %r", safe_url, exc_info=True
)
except json.JSONDecodeError:
# Ignoring exceptions with VT so domain list will always work
LOGGER.debug(
"[VirusTotal] Malformed response (invalid JSON) for %r",
safe_url,
exc_info=True,
)
fetched = time.time()
last_analysis = vt_data["data"]["attributes"]["last_analysis_stats"]
except json.JSONDecodeError:
# Ignoring exceptions with VT so domain list will always work
LOGGER.debug(
"[VirusTotal] Malformed response (invalid JSON) for %r",
safe_url,
exc_info=True,
)
return None
sleep(3)
else: # Still no results
LOGGER.debug("[VirusTotal] Scan failed or unfinished for %r", safe_url)
return None
fetched = datetime.now(timezone.utc)
# Only count strong opinions (ignore suspicious/timeout/undetected)
result = {
"positives": last_analysis["malicious"],
"total": last_analysis["malicious"] + last_analysis["harmless"],
"fetched": fetched,
"virustotal_data": vt_data,
# Subject to change formats!
"virustotal_data": vt_data["data"]["attributes"],
}
bot.memory["safety_cache"][url] = result
if len(bot.memory["safety_cache"]) >= (2 * CACHE_LIMIT):
_clean_cache(bot)
return result


@plugin.command("virustotal")
@plugin.example(".virustotal https://malware.wicar.org/")
@plugin.example(".virustotal hxxps://malware.wicar.org/")
@plugin.output_prefix("[safety][VirusTotal] ")
def vt_command(bot: Sopel, trigger: Trigger):
"""Look up VT results on demand"""
if not bot.settings.safety.vt_api_key:
bot.reply("Sorry, I don't have a VirusTotal API key configured.")
return

url = trigger.group(2)
zwsp_safe_url = zwsp("hxx" + url[3:])

result = virustotal_lookup(bot, url, max_cache_age=timedelta(minutes=1))
if not result:
bot.reply("Sorry, an error occurred while looking that up.")
return

analysis = result["virustotal_data"]["last_analysis_stats"]

result_types = {
"malicious": colors.RED,
"suspicious": colors.YELLOW,
"harmless": colors.GREEN,
"undetected": colors.GREY,
}
result_strs = []
for result_type, result_color in result_types.items():
if analysis[result_type] == 0:
result_strs.append("0 " + result_type)
else:
result_strs.append(
bold(
color(str(analysis[result_type]) + " " + result_type, result_color)
)
)
results_str = ", ".join(result_strs)

vt_scan_time = datetime.fromtimestamp(
result["virustotal_data"]["last_analysis_date"],
timezone.utc,
)
bot.reply(
"Results for {}: {} as of {}".format(
zwsp_safe_url,
results_str,
tools.time.format_time(
bot.db,
bot.config,
nick=trigger.nick,
channel=trigger.sender,
time=vt_scan_time,
),
)
)


@plugin.command('safety')
@plugin.example(".safety on")
@plugin.output_prefix(PLUGIN_OUTPUT_PREFIX)
def toggle_safety(bot: Sopel, trigger: Trigger):
"""Set safety setting for channel"""
Expand Down Expand Up @@ -328,9 +453,9 @@ def _clean_cache(bot: Sopel):

if bot.memory['safety_cache_lock'].acquire(False):
LOGGER.info('Starting safety cache cleanup...')
cutoff = datetime.now(timezone.utc) - timedelta(days=7)
try:
# clean up by age first
cutoff = time.time() - (7 * 24 * 60 * 60) # 7 days ago
old_keys = []
for key, data in bot.memory['safety_cache'].items():
if data['fetched'] <= cutoff:
Expand All @@ -339,7 +464,7 @@ def _clean_cache(bot: Sopel):
bot.memory['safety_cache'].pop(key, None)

# clean up more values if the cache is still too big
overage = len(bot.memory['safety_cache']) - CACHE_LIMIT
overage = len(bot.memory["safety_cache"]) - CACHE_LIMIT
if overage > 0:
extra_keys = sorted(
(data.fetched, key)
Expand Down

0 comments on commit 9e51e47

Please sign in to comment.