Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions lib/cuckoo/common/abstracts.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,13 @@ def availables(self, label=None, platform=None, tags=None, arch=None, include_re
label=label, platform=platform, tags=tags, arch=arch, include_reserved=include_reserved, os_version=os_version
)

def find_machine_to_service_task(self, task):
"""Find a machine that is able to service the given task.
This can be overridden by machinery modules for custom logic.
By default, it delegates to the database implementation.
"""
return self.db.find_machine_to_service_task(task)

def scale_pool(self, machine: Machine) -> None:
"""This can be overridden in sub-classes to scale the pool of machines once one has been acquired."""
return
Expand Down
2 changes: 1 addition & 1 deletion lib/cuckoo/core/machinery_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -206,7 +206,7 @@ def create_machinery() -> Machinery:
return machinery

def find_machine_to_service_task(self, task: Task) -> Optional[Machine]:
machine = self.db.find_machine_to_service_task(task)
machine = self.machinery.find_machine_to_service_task(task)
if machine:
log.info(
"Task #%s: found useable machine %s (arch=%s, platform=%s)",
Expand Down
177 changes: 156 additions & 21 deletions modules/machinery/az.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,22 @@
import threading
import time
import timeit
from typing import Optional, cast

import sqlalchemy

# Cuckoo-specific imports
from lib.cuckoo.common.config import Config
from lib.cuckoo.common.abstracts import Machinery
from lib.cuckoo.common.constants import CUCKOO_GUEST_PORT
from lib.cuckoo.common.exceptions import (
CuckooCriticalError,
CuckooDependencyError,
CuckooGuestCriticalTimeout,
CuckooMachineError,
CuckooUnserviceableTaskError,
)
from lib.cuckoo.core.database import TASK_PENDING, Machine, Task

HAVE_AZURE = False
cfg = Config()
Expand All @@ -27,11 +41,6 @@
print("Missing machinery-required libraries.")
print("poetry run pip install azure-identity msrest msrestazure azure-mgmt-compute azure-mgmt-network")

# Cuckoo-specific imports
from lib.cuckoo.common.abstracts import Machinery
from lib.cuckoo.common.constants import CUCKOO_GUEST_PORT
from lib.cuckoo.common.exceptions import CuckooCriticalError, CuckooDependencyError, CuckooGuestCriticalTimeout, CuckooMachineError
from lib.cuckoo.core.database import TASK_PENDING, Machine

# Only log INFO or higher from imported python packages
logging.getLogger("adal-python").setLevel(logging.INFO)
Expand Down Expand Up @@ -153,13 +162,6 @@ def _initialize(self):
if "initial_pool_size" not in scale_set_opts:
raise AttributeError("'initial_pool_size' not present in scale set configuration")

# If the initial pool size is 0, then post-initialization we will have 0 machines available for a
# scale set, which is bad for Cuckoo logic
if scale_set_opts["initial_pool_size"] <= 0:
raise CuckooCriticalError(
"The initial pool size for VMSS '%s' is 0. Please set it to a positive integer.", scale_set_id
)

# Insert the scale_set_opts into the module.scale_sets attribute
mmanager_opts["scale_sets"][scale_set_id] = scale_set_opts

Expand Down Expand Up @@ -370,6 +372,7 @@ def _process_pre_existing_vmsss(self):
"size": int(vmss.sku.capacity),
"is_scaling": False,
"is_scaling_down": False,
"has_placeholder_machine": False,
"wait": False,
}

Expand Down Expand Up @@ -424,6 +427,47 @@ def _check_cpu_cores(self):
else:
self.instance_type_cpus = self.options.az.instance_type_cores

def _insert_placeholder_machine(self, vmss_name, vmss_vals):
"""
Insert a placeholder machine into the DB.

Only used when "initial_pool_size" is set to 0 in the configs.
The placeholder machine is used to have available the machine tag for a VMSS's "pool_tag".
Without the tag present at task submission, an "incorrect tag" error will result.
The placeholder machine is removed when a task is submitted with the matching tag.
"""
try:
self._remove_placeholder_machine(vmss_name)
self.db.add_machine(
name=f"{vmss_name}_placeholder",
label=f"{vmss_name}_placeholder",
ip=f"{vmss_name}_placeholder_ip",
platform=vmss_vals["platform"].lower(),
tags=vmss_vals["tag"],
arch=self.options.az.scale_sets[vmss_name].arch,
interface=self.options.az.interface,
snapshot=f"{vmss_name}_placeholder_snapshot",
resultserver_ip=self.options.az.resultserver_ip,
resultserver_port=self.options.az.resultserver_port,
locked=True,
reserved=True,
)
machine_pools[vmss_name]["has_placeholder_machine"] = True
except Exception as e:
log.debug("Could not add placeholder machine for %s VMSS", vmss_name)
log.exception(repr(e))

def _remove_placeholder_machine(self, vmss_name):
"""
Remove placeholder machine from DB.
"""
try:
self.db.delete_machine(f"{vmss_name}_placeholder")
machine_pools[vmss_name]["has_placeholder_machine"] = False
except Exception as e:
log.debug("Could not remove placeholder machine for %s VMSS", vmss_name)
log.exception(repr(e))

def _update_or_create_vmsss(self, vmsss_dict):
"""
Reimage or scale up existing VMSSs. Create non-existant required VMSSs.
Expand All @@ -433,8 +477,12 @@ def _update_or_create_vmsss(self, vmsss_dict):
vmss_reimage_threads = []
for vmss, vals in vmsss_dict.items():
if vals["exists"] and not self.options.az.just_start:
if machine_pools[vmss]["size"] == 0:
self._thr_scale_machine_pool(self.options.az.scale_sets[vmss].pool_tag, True if vals["platform"] else False)
# If we want to start with 0 machines on this VMSS.
if vals["initial_pool_size"] == 0:
self._insert_placeholder_machine(vmss, vals)
# If the VMSS shows no VMs instantiated on Azure's side but we want to start with 1+
elif machine_pools[vmss]["size"] == 0:
self._thr_scale_machine_pool(vals["tag"], True)
else:
# Reimage VMSS!
thr = threading.Thread(
Expand Down Expand Up @@ -462,7 +510,8 @@ def _check_locked_machines(self):
if len(running) > 0:
log.info("%d machines found locked on initialize, unlocking.", len(running))
for machine in running:
self.db.unlock_machine(machine)
if "_placeholder" not in machine.name:
self.db.unlock_machine(machine)

def _create_batch_threads(self):
"""
Expand Down Expand Up @@ -552,6 +601,72 @@ def availables(self, label=None, platform=None, tags=None, arch=None, include_re
label=label, platform=platform, tags=tags, arch=arch, include_reserved=include_reserved, os_version=os_version
)

def find_machine_to_service_task(self, task: Task) -> Optional[Machine]:
"""
Overloading abstracts.py:find_machine_to_service_task to allow VMSS to scale down to 0 VMs
"""
task_archs, task_tags = self.db._task_arch_tags_helper(task)
os_version = self.db._package_vm_requires_check(task.package)

def get_first_machine(query: sqlalchemy.orm.Query) -> Optional[Machine]:
# Select for update a machine, preferring one that is available and was the one that was used the
# longest time ago. This will give us a machine that can get locked or, if there are none that are
# currently available, we'll at least know that the task is serviceable.
return cast(
Optional[Machine], query.order_by(Machine.locked, Machine.locked_changed_on).with_for_update(of=Machine).first()
)

machines = self.db.session.query(Machine).options(sqlalchemy.orm.joinedload(Machine.tags))
filter_kwargs = {
"machines": machines,
"label": task.machine,
"tags": task_tags,
"archs": task_archs,
"os_version": os_version,
}
filtered_machines = self.db.filter_machines_to_task(include_reserved=False, **filter_kwargs)
machine = get_first_machine(filtered_machines)
if machine is None and not task.machine and task_tags:
# The task was given at least 1 tag, but there are no non-reserved machines
# that could satisfy the request. So let's see if there are any "reserved"
# machines that can satisfy it.
filtered_machines = self.db.filter_machines_to_task(include_reserved=True, **filter_kwargs)
machine = get_first_machine(filtered_machines)

if machine is None:
self._scale_from_zero(task, os_version, task_tags)
if machine and machine.locked:
# There aren't any machines that can service the task NOW, but there is at least one in the pool
# that could service it once it's available.
return None
return machine

def _scale_from_zero(self, task: Task, os_version: str, tags):
"""
Scale up VMSS with current size of 0 and able to run the task.
"""
assignable_vmss = None
# Get the first VMSS that can run this task on an instance
for _, vals in self.required_vmsss.items():
if os_version and os_version == vals["tag"]:
assignable_vmss = vals
break
if tags and len(tags) == 1 and vals["tag"] == tags[0]:
assignable_vmss = vals
break
if vals["platform"] == task.platform:
assignable_vmss = vals
break

if assignable_vmss is None:
raise CuckooUnserviceableTaskError

# VMSS able to run the task exists but has no relevant machines. Scale up from zero.
threading.Thread(
target=self._thr_scale_machine_pool,
args=(assignable_vmss["tag"], True)
).start()

def _add_machines_to_db(self, vmss_name):
"""
Adding machines to database that did not exist there before.
Expand Down Expand Up @@ -620,8 +735,9 @@ def _add_machines_to_db(self, vmss_name):

private_ip = vmss_vm_nic.ip_configurations[0].private_ip_address
if private_ip in db_machine_ips:
log.error("The IP '%s' is already associated with a machine in the DB. Moving on...", private_ip)
continue
existing_machines = [machine for machine in machines_in_db if machine.ip == private_ip]
vmss_name, _ = existing_machines[0].label.split("_")
self._delete_machines_from_db_if_missing(vmss_name)

# Add machine to DB.
# TODO: What is the point of name vs label?
Expand Down Expand Up @@ -767,7 +883,7 @@ def _azure_api_call(*args, **kwargs):
# For ClientRequestErrors, they do not have the attribute 'error'
error = exc.error.error if getattr(exc, "error", False) else exc
log.warning(
"Failed to 5s due to the Azure error '%s': '%s'.", str(api_call), str(error), f"{exc.message if hasattr(exc, 'message') else repr(exc)}"
"Failed to '%s' due to the Azure error '%s': '%s'.", str(api_call), str(error), f"{exc.message if hasattr(exc, 'message') else repr(exc)}"
)
if "NotFound" in repr(exc) or (hasattr(exc, "status_code") and exc.status_code == 404):
# Note that this exception is used to represent if an Azure resource
Expand Down Expand Up @@ -883,8 +999,17 @@ def _thr_create_vmss(self, vmss_name, vmss_image_ref, vmss_image_os):
"wait": False,
}
self.required_vmsss[vmss_name]["exists"] = True
with self.db.session.begin():
self._add_machines_to_db(vmss_name)
try:
with self.db.session.begin():
if machine_pools[vmss_name]["size"] == 0:
self._insert_placeholder_machine(vmss_name, self.required_vmsss[vmss_name])
else:
self._add_machines_to_db(vmss_name)
except sqlalchemy.exc.InvalidRequestError:
if machine_pools[vmss_name]["size"] == 0:
self._insert_placeholder_machine(vmss_name, self.required_vmsss[vmss_name])
else:
self._add_machines_to_db(vmss_name)

def _thr_reimage_vmss(self, vmss_name):
"""
Expand Down Expand Up @@ -914,7 +1039,10 @@ def _thr_reimage_vmss(self, vmss_name):
else:
log.exception(repr(e))
raise
with self.db.session.begin():
try:
with self.db.session.begin():
self._add_machines_to_db(vmss_name)
except sqlalchemy.exc.InvalidRequestError:
self._add_machines_to_db(vmss_name)

def _thr_scale_machine_pool(self, tag, per_platform=False):
Expand Down Expand Up @@ -1029,6 +1157,8 @@ def _scale_machine_pool(self, tag, per_platform=False):
# Azure will delete a machine in a VMSS that has not been used in a while. So the machine_pools value
# will not be up-to-date
self._delete_machines_from_db_if_missing(vmss_name)
if len(self.db.list_machines(tags=[tag], include_reserved=True)) == 0:
self._insert_placeholder_machine(vmss_name, self.required_vmsss[vmss_name])
# Update the VMSS size accordingly
machine_pools[vmss_name]["size"] = len(self._get_relevant_machines(tag))
log.debug("The size of the machine pool %s is already the size that we want", vmss_name)
Expand Down Expand Up @@ -1147,9 +1277,13 @@ def _scale_machine_pool(self, tag, per_platform=False):
# Alter the database based on if we scaled up or down
log.debug("Updated %s capacity: %s; Initial capacity: %s", vmss_name, str(number_of_relevant_machines_required), str(initial_capacity))
if number_of_relevant_machines_required > initial_capacity:
if machine_pools[vmss_name]["has_placeholder_machine"]:
self._remove_placeholder_machine(vmss_name)
self._add_machines_to_db(vmss_name)
else:
self._delete_machines_from_db_if_missing(vmss_name)
if len(self.db.list_machines(tags=[tag], include_reserved=True)) == 0:
self._insert_placeholder_machine(vmss_name, self.required_vmsss[vmss_name])

# I release you from your earthly bonds!
machine_pools[vmss_name]["wait"] = False
Expand Down Expand Up @@ -1403,6 +1537,7 @@ def _thr_delete_list_reader(self):
self.options.az.sandbox_resource_group,
vmss_to_delete_from,
models.VirtualMachineScaleSetVMInstanceIDs(instance_ids=instance_ids),
force_deletion=True,
polling_interval=1,
operation=self.compute_client.virtual_machine_scale_sets.begin_delete_instances,
)
Expand Down