Skip to content

Commit f3eacc6

Browse files
authored
[azure][feat] Add Microsoft Graph collector (#2133)
1 parent 9e329d4 commit f3eacc6

19 files changed

+2182
-496
lines changed

plugins/azure/fix_plugin_azure/__init__.py

Lines changed: 93 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -1,26 +1,48 @@
11
import logging
22
import multiprocessing
33
from collections import namedtuple
4-
from concurrent.futures import as_completed, ProcessPoolExecutor
5-
from typing import Optional, Tuple, Any
6-
7-
from attr import evolve
8-
9-
from fix_plugin_azure.collector import AzureSubscriptionCollector
10-
from fix_plugin_azure.config import AzureConfig, AzureAccountConfig
11-
from fix_plugin_azure.resource.base import AzureSubscription
4+
from concurrent.futures import as_completed, ThreadPoolExecutor
5+
from enum import Enum
6+
from typing import Optional, Tuple, Any, List, TypeVar, Type
7+
8+
from fix_plugin_azure.azure_client import MicrosoftClient
9+
from fix_plugin_azure.collector import (
10+
AzureSubscriptionCollector,
11+
MicrosoftGraphOrganizationCollector,
12+
MicrosoftBaseCollector,
13+
)
14+
from fix_plugin_azure.config import AzureConfig, AzureAccountConfig, AzureCredentials
15+
from fix_plugin_azure.resource.base import AzureSubscription, MicrosoftResource
16+
from fix_plugin_azure.resource.microsoft_graph import MicrosoftGraphOrganization
1217
from fixlib.baseplugin import BaseCollectorPlugin
13-
from fixlib.baseresources import Cloud
18+
from fixlib.baseresources import Cloud, BaseAccount
1419
from fixlib.config import Config
1520
from fixlib.core.actions import CoreFeedback
1621
from fixlib.core.progress import ProgressTree, ProgressDone
1722
from fixlib.graph import Graph, MaxNodesExceeded
1823
from fixlib.proc import collector_initializer
1924

2025
log = logging.getLogger("fix.plugin.azure")
21-
22-
AzureSubscriptionArg = namedtuple(
23-
"AzureSubscriptionArg", ["config", "cloud", "subscription", "account_config", "core_feedback", "task_data"]
26+
T = TypeVar("T", bound=MicrosoftResource)
27+
28+
29+
class AzureCollectorKind(Enum):
30+
subscription = "subscription"
31+
microsoft_graph = "microsoft_graph"
32+
33+
34+
AzureCollectorArg = namedtuple(
35+
"AzureCollectorArg",
36+
[
37+
"collector_kind",
38+
"config",
39+
"cloud",
40+
"account",
41+
"account_config",
42+
"core_feedback",
43+
"task_data",
44+
"max_resources_per_account",
45+
],
2446
)
2547

2648

@@ -49,36 +71,55 @@ def collect(self) -> None:
4971
account_configs = config.accounts or {"default": AzureAccountConfig()}
5072

5173
# Gather all subscriptions
52-
args_by_subscription_id = {
53-
subscription.subscription_id: AzureSubscriptionArg(
74+
subscription_args = {
75+
subscription.subscription_id: AzureCollectorArg(
76+
AzureCollectorKind.subscription,
5477
config,
5578
cloud,
56-
evolve(subscription, account_name=name),
79+
subscription,
5780
ac,
5881
self.core_feedback.with_context(cloud.id, subscription.safe_name),
5982
self.task_data,
83+
self.max_resources_per_account,
6084
)
6185
for name, ac in account_configs.items()
62-
for subscription in AzureSubscription.list_subscriptions(config, ac.credentials())
86+
for subscription in list_all(AzureSubscription, config, ac.credentials())
6387
if ac.allowed(subscription.subscription_id)
6488
}
65-
args = list(args_by_subscription_id.values())
89+
# Gather all organizations
90+
microsoft_graph = Cloud(id="microsoft-graph")
91+
organization_args = {
92+
org.id: AzureCollectorArg(
93+
AzureCollectorKind.microsoft_graph,
94+
config,
95+
microsoft_graph,
96+
org,
97+
ac,
98+
self.core_feedback.with_context(cloud.id, org.safe_name),
99+
self.task_data,
100+
self.max_resources_per_account,
101+
)
102+
for name, ac in account_configs.items()
103+
for org in list_all(MicrosoftGraphOrganization, config, ac.credentials())
104+
if ac.collect_microsoft_graph
105+
}
106+
args = list(subscription_args.values()) + list(organization_args.values())
66107

67108
# Send initial progress
68109
progress = ProgressTree(self.cloud)
69110
for sub in args:
70-
progress.add_progress(ProgressDone(sub.subscription.subscription_id, 0, 1))
71-
log.debug(f"Found {sub.subscription.subscription_id}")
111+
progress.add_progress(ProgressDone(sub.account.id, 0, 1))
112+
log.debug(f"Found {sub.account.id}")
72113
self.core_feedback.progress(progress)
73114

74-
# Collect all subscriptions
75-
with ProcessPoolExecutor(max_workers=config.subscription_pool_size) as executor:
76-
wait_for = [executor.submit(collect_in_process, sub, self.max_resources_per_account) for sub in args]
115+
# Collect all subscriptions and organizations
116+
with ThreadPoolExecutor(max_workers=config.subscription_pool_size) as executor:
117+
wait_for = [executor.submit(collect_in_process, sub) for sub in args]
77118
for future in as_completed(wait_for):
78119
subscription, graph = future.result()
79-
progress.add_progress(ProgressDone(subscription.subscription_id, 1, 1, path=[cloud.id]))
120+
progress.add_progress(ProgressDone(subscription.id, 1, 1, path=[cloud.id]))
80121
if not isinstance(graph, Graph):
81-
log.debug(f"Skipping subscription graph of invalid type {type(graph)}")
122+
log.debug(f"Skipping account graph of invalid type {type(graph)}")
82123
continue
83124
try:
84125
self.send_account_graph(graph)
@@ -87,34 +128,41 @@ def collect(self) -> None:
87128
del graph
88129

89130

90-
def collect_account_proxy(subscription_collector_arg: AzureSubscriptionArg, queue: multiprocessing.Queue, max_resources_per_account: Optional[int] = None) -> None: # type: ignore
131+
def list_all(resource: Type[T], config: AzureConfig, credentials: AzureCredentials) -> List[T]:
132+
if resource.api_spec is None:
133+
return []
134+
client = MicrosoftClient.create(config, credentials, "global")
135+
return [resource.from_api(js) for js in client.list(resource.api_spec)]
136+
137+
138+
def collect_account_proxy(collector_arg: AzureCollectorArg, queue: multiprocessing.Queue) -> None: # type: ignore
91139
collector_initializer()
92-
config, cloud, subscription, account_config, core_feedback, task_data = subscription_collector_arg
93-
subscription_collector = AzureSubscriptionCollector(
94-
config, cloud, subscription, account_config.credentials(), core_feedback, task_data, max_resources_per_account
95-
)
140+
kind, config, cloud, account, account_config, core_feedback, task_data, max_resources = collector_arg
141+
log.info(f"Start collecting {kind}: {account.id}")
142+
mbc: MicrosoftBaseCollector
143+
if kind == AzureCollectorKind.subscription:
144+
mbc = AzureSubscriptionCollector(
145+
config, cloud, account, account_config.credentials(), core_feedback, task_data, max_resources
146+
)
147+
elif kind == AzureCollectorKind.microsoft_graph:
148+
mbc = MicrosoftGraphOrganizationCollector(
149+
config, cloud, account, account_config.credentials(), core_feedback, task_data, max_resources
150+
)
151+
else:
152+
queue.put((collector_arg.account, None)) # signal done
153+
raise ValueError(f"Invalid collector kind {kind}")
96154
try:
97-
subscription_collector.collect()
98-
queue.put((subscription_collector_arg.subscription, subscription_collector.graph))
155+
mbc.collect()
156+
queue.put((collector_arg.account, mbc.graph))
99157
except Exception as e:
100-
log.exception(f"Error collecting subscription {subscription.subscription_id}: {e}. Give up.")
101-
queue.put((subscription_collector_arg.subscription, None)) # signal done
158+
log.exception(f"Error collecting account {account.id}: {e}. Give up.")
159+
queue.put((collector_arg.account, None)) # signal done
102160

103161

104-
def collect_in_process(
105-
subscription_collector_arg: AzureSubscriptionArg,
106-
max_resources_per_account: Optional[int] = None,
107-
) -> Tuple[AzureSubscription, Graph]:
162+
def collect_in_process(collector_arg: AzureCollectorArg) -> Tuple[BaseAccount, Graph]:
108163
ctx = multiprocessing.get_context("spawn")
109164
queue = ctx.Queue()
110-
process = ctx.Process(
111-
target=collect_account_proxy,
112-
kwargs={
113-
"subscription_collector_arg": subscription_collector_arg,
114-
"queue": queue,
115-
"max_resources_per_account": max_resources_per_account,
116-
},
117-
)
165+
process = ctx.Process(target=collect_account_proxy, kwargs={"collector_arg": collector_arg, "queue": queue})
118166
process.start()
119167
result = queue.get()
120168
process.join()

0 commit comments

Comments
 (0)