-
Notifications
You must be signed in to change notification settings - Fork 1.4k
/
vm_service.py
109 lines (89 loc) · 3.66 KB
/
vm_service.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
from dataclasses import dataclass
from azure.mgmt.compute import ComputeManagementClient
from azure.mgmt.compute.models import StorageProfile
from prowler.lib.logger import logger
from prowler.providers.azure.azure_provider import AzureProvider
from prowler.providers.azure.lib.service.service import AzureService
########################## VirtualMachines
class VirtualMachines(AzureService):
def __init__(self, provider: AzureProvider):
super().__init__(ComputeManagementClient, provider)
self.virtual_machines = self.__get_virtual_machines__()
self.disks = self.__get_disks__()
def __get_virtual_machines__(self):
logger.info("VirtualMachines - Getting virtual machines...")
virtual_machines = {}
for subscription_name, client in self.clients.items():
try:
virtual_machines_list = client.virtual_machines.list_all()
virtual_machines.update({subscription_name: {}})
for vm in virtual_machines_list:
virtual_machines[subscription_name].update(
{
vm.vm_id: VirtualMachine(
resource_id=vm.id,
resource_name=vm.name,
storage_profile=getattr(vm, "storage_profile", None),
location=vm.location,
security_profile=vm.security_profile,
)
}
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return virtual_machines
def __get_disks__(self):
logger.info("VirtualMachines - Getting disks...")
disks = {}
for subscription_name, client in self.clients.items():
try:
disks_list = client.disks.list()
disks.update({subscription_name: {}})
for disk in disks_list:
vms_attached = []
if disk.managed_by:
vms_attached.append(disk.managed_by)
if disk.managed_by_extended:
vms_attached.extend(disk.managed_by_extended)
disks[subscription_name].update(
{
disk.unique_id: Disk(
resource_id=disk.id,
resource_name=disk.name,
location=disk.location,
vms_attached=vms_attached,
encryption_type=getattr(
getattr(disk, "encryption", None), "type", None
),
)
}
)
except Exception as error:
logger.error(
f"Subscription name: {subscription_name} -- {error.__class__.__name__}[{error.__traceback__.tb_lineno}]: {error}"
)
return disks
@dataclass
class UefiSettings:
secure_boot_enabled: bool
v_tpm_enabled: bool
@dataclass
class SecurityProfile:
security_type: str
uefi_settings: UefiSettings
@dataclass
class VirtualMachine:
resource_id: str
resource_name: str
storage_profile: StorageProfile
location: str
security_profile: SecurityProfile
@dataclass
class Disk:
resource_id: str
resource_name: str
vms_attached: list[str]
encryption_type: str
location: str