-
Notifications
You must be signed in to change notification settings - Fork 707
/
Copy pathcloudwatch.py
471 lines (407 loc) · 15.5 KB
/
cloudwatch.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
"""CloudWatch Logs module."""
from __future__ import annotations
import datetime
import logging
import time
from typing import Any, Dict, List, cast
import boto3
import awswrangler.pandas as pd
from awswrangler import _utils, exceptions
from awswrangler._config import apply_configs
_logger: logging.Logger = logging.getLogger(__name__)
_QUERY_WAIT_POLLING_DELAY: float = 1.0 # SECONDS
def _validate_args(
start_timestamp: int,
end_timestamp: int,
) -> None:
if start_timestamp < 0:
raise exceptions.InvalidArgument("`start_time` cannot be a negative value.")
if start_timestamp >= end_timestamp:
raise exceptions.InvalidArgumentCombination("`start_time` must be inferior to `end_time`.")
def start_query(
query: str,
log_group_names: list[str],
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int | None = None,
boto3_session: boto3.Session | None = None,
) -> str:
"""Run a query against AWS CloudWatchLogs Insights.
https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html
Parameters
----------
query
The query string.
log_group_names
The list of log group names or ARNs to be queried. You can include up to 50 log groups.
start_time
The beginning of the time range to query.
end_time
The end of the time range to query.
limit
The maximum number of log events to return in the query.
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
Query ID.
Examples
--------
>>> import awswrangler as wr
>>> query_id = wr.cloudwatch.start_query(
... log_group_names=["loggroup"],
... query="fields @timestamp, @message | sort @timestamp desc | limit 5",
... )
"""
_logger.debug("log_group_names: %s", log_group_names)
start_time = (
start_time if start_time else datetime.datetime(year=1970, month=1, day=1, tzinfo=datetime.timezone.utc)
)
end_time = end_time if end_time else datetime.datetime.utcnow()
start_timestamp: int = int(1000 * start_time.timestamp())
end_timestamp: int = int(1000 * end_time.timestamp())
_logger.debug("start_timestamp: %s", start_timestamp)
_logger.debug("end_timestamp: %s", end_timestamp)
_validate_args(
start_timestamp=start_timestamp,
end_timestamp=end_timestamp,
)
args: dict[str, Any] = {
"logGroupIdentifiers": log_group_names,
"startTime": start_timestamp,
"endTime": end_timestamp,
"queryString": query,
}
if limit is not None:
args["limit"] = limit
client_logs = _utils.client(service_name="logs", session=boto3_session)
response = client_logs.start_query(**args)
return response["queryId"]
@apply_configs
def wait_query(
query_id: str,
boto3_session: boto3.Session | None = None,
cloudwatch_query_wait_polling_delay: float = _QUERY_WAIT_POLLING_DELAY,
) -> dict[str, Any]:
"""Wait query ends.
https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html
Parameters
----------
query_id
Query ID.
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
cloudwatch_query_wait_polling_delay
Interval in seconds for how often the function will check if the CloudWatch query has completed.
Returns
-------
Query result payload.
Examples
--------
>>> import awswrangler as wr
>>> query_id = wr.cloudwatch.start_query(
... log_group_names=["loggroup"],
... query="fields @timestamp, @message | sort @timestamp desc | limit 5",
... )
... response = wr.cloudwatch.wait_query(query_id=query_id)
"""
final_states: list[str] = ["Complete", "Failed", "Cancelled"]
client_logs = _utils.client(service_name="logs", session=boto3_session)
response = client_logs.get_query_results(queryId=query_id)
status = response["status"]
while status not in final_states:
time.sleep(cloudwatch_query_wait_polling_delay)
response = client_logs.get_query_results(queryId=query_id)
status = response["status"]
_logger.debug("status: %s", status)
if status == "Failed":
raise exceptions.QueryFailed(f"query ID: {query_id}")
if status == "Cancelled":
raise exceptions.QueryCancelled(f"query ID: {query_id}")
return cast(Dict[str, Any], response)
def run_query(
query: str,
log_group_names: list[str],
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int | None = None,
boto3_session: boto3.Session | None = None,
) -> list[list[dict[str, str]]]:
"""Run a query against AWS CloudWatchLogs Insights and wait the results.
https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html
Parameters
----------
query
The query string.
log_group_names
The list of log group names or ARNs to be queried. You can include up to 50 log groups.
start_time
The beginning of the time range to query.
end_time
The end of the time range to query.
limit
The maximum number of log events to return in the query.
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
Result.
Examples
--------
>>> import awswrangler as wr
>>> result = wr.cloudwatch.run_query(
... log_group_names=["loggroup"],
... query="fields @timestamp, @message | sort @timestamp desc | limit 5",
... )
"""
query_id: str = start_query(
query=query,
log_group_names=log_group_names,
start_time=start_time,
end_time=end_time,
limit=limit,
boto3_session=boto3_session,
)
response: dict[str, Any] = wait_query(query_id=query_id, boto3_session=boto3_session)
return cast(List[List[Dict[str, str]]], response["results"])
def read_logs(
query: str,
log_group_names: list[str],
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
limit: int | None = None,
boto3_session: boto3.Session | None = None,
) -> pd.DataFrame:
"""Run a query against AWS CloudWatchLogs Insights and convert the results to Pandas DataFrame.
https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/CWL_QuerySyntax.html
Parameters
----------
query:
The query string.
log_group_names
The list of log group names or ARNs to be queried. You can include up to 50 log groups.
start_time
The beginning of the time range to query.
end_time
The end of the time range to query.
limit
The maximum number of log events to return in the query.
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
Result as a Pandas DataFrame.
Examples
--------
>>> import awswrangler as wr
>>> df = wr.cloudwatch.read_logs(
... log_group_names=["loggroup"],
... query="fields @timestamp, @message | sort @timestamp desc | limit 5",
... )
"""
results: list[list[dict[str, str]]] = run_query(
query=query,
log_group_names=log_group_names,
start_time=start_time,
end_time=end_time,
limit=limit,
boto3_session=boto3_session,
)
pre_df: list[dict[str, str]] = []
for row in results:
new_row: dict[str, str] = {}
for col in row:
if col["field"].startswith("@"):
col_name = col["field"].replace("@", "", 1)
else:
col_name = col["field"]
new_row[col_name] = col["value"]
pre_df.append(new_row)
df: pd.DataFrame = pd.DataFrame(pre_df, dtype="string")
if "timestamp" in df.columns:
df["timestamp"] = pd.to_datetime(df["timestamp"])
return df
def describe_log_streams(
log_group_name: str,
log_stream_name_prefix: str | None = None,
order_by: str | None = "LogStreamName",
descending: bool | None = False,
limit: int | None = 50,
boto3_session: boto3.Session | None = None,
) -> pd.DataFrame:
"""List the log streams for the specified log group, return results as a Pandas DataFrame.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.describe_log_streams
Parameters
----------
log_group_name
The name of the log group.
log_stream_name_prefix
The prefix to match log streams' name
order_by
If the value is LogStreamName , the results are ordered by log stream name.
If the value is LastEventTime , the results are ordered by the event time.
The default value is LogStreamName .
descending
If the value is True, results are returned in descending order.
If the value is to False, results are returned in ascending order.
The default value is False.
limit
The maximum number of items returned. The default is up to 50 items.
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
Result as a Pandas DataFrame.
Examples
--------
>>> import awswrangler as wr
>>> df = wr.cloudwatch.describe_log_streams(
... log_group_name="aws_sdk_pandas_log_group",
... log_stream_name_prefix="aws_sdk_pandas_log_stream",
... )
"""
client_logs = _utils.client(service_name="logs", session=boto3_session)
args: dict[str, Any] = {
"logGroupName": log_group_name,
"descending": descending,
"orderBy": order_by,
"limit": limit,
}
if log_stream_name_prefix and order_by == "LogStreamName":
args["logStreamNamePrefix"] = log_stream_name_prefix
elif log_stream_name_prefix and order_by == "LastEventTime":
raise exceptions.InvalidArgumentCombination(
"Cannot call describe_log_streams with both `log_stream_name_prefix` and order_by equal 'LastEventTime'"
)
log_streams: list[dict[str, Any]] = []
response = client_logs.describe_log_streams(**args)
log_streams += cast(List[Dict[str, Any]], response["logStreams"])
while "nextToken" in response:
response = client_logs.describe_log_streams(
**args,
nextToken=response["nextToken"],
)
log_streams += cast(List[Dict[str, Any]], response["logStreams"])
if log_streams:
df: pd.DataFrame = pd.DataFrame(log_streams)
df["logGroupName"] = log_group_name
return df
return pd.DataFrame()
def _filter_log_events(
log_group_name: str,
log_stream_names: list[str],
start_timestamp: int | None = None,
end_timestamp: int | None = None,
filter_pattern: str | None = None,
limit: int | None = 10000,
boto3_session: boto3.Session | None = None,
) -> list[dict[str, Any]]:
client_logs = _utils.client(service_name="logs", session=boto3_session)
events: list[dict[str, Any]] = []
args: dict[str, Any] = {
"logGroupName": log_group_name,
"logStreamNames": log_stream_names,
"limit": limit,
}
if start_timestamp:
args["startTime"] = start_timestamp
if end_timestamp:
args["endTime"] = end_timestamp
if filter_pattern:
args["filterPattern"] = filter_pattern
response = client_logs.filter_log_events(**args)
events += cast(List[Dict[str, Any]], response["events"])
while "nextToken" in response:
response = client_logs.filter_log_events(
**args,
nextToken=response["nextToken"],
)
events += cast(List[Dict[str, Any]], response["events"])
return events
def filter_log_events(
log_group_name: str,
log_stream_name_prefix: str | None = None,
log_stream_names: list[str] | None = None,
filter_pattern: str | None = None,
start_time: datetime.datetime | None = None,
end_time: datetime.datetime | None = None,
boto3_session: boto3.Session | None = None,
) -> pd.DataFrame:
"""List log events from the specified log group. The results are returned as Pandas DataFrame.
https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.filter_log_events
Note
----
Cannot call ``filter_log_events`` with both ``log_stream_names`` and ``log_stream_name_prefix``.
Parameters
----------
log_group_name
The name of the log group.
log_stream_name_prefix
Filters the results to include only events from log streams that have names starting with this prefix.
log_stream_names
Filters the results to only logs from the log streams in this list.
filter_pattern
The filter pattern to use. If not provided, all the events are matched.
start_time
Events with a timestamp before this time are not returned.
end_time
Events with a timestamp later than this time are not returned.
boto3_session
The default boto3 session will be used if **boto3_session** is ``None``.
Returns
-------
Result as a Pandas DataFrame.
Examples
--------
Get all log events from log group 'aws_sdk_pandas_log_group' that have log stream prefix 'aws_sdk_pandas_log_stream'
>>> import awswrangler as wr
>>> df = wr.cloudwatch.filter_log_events(
... log_group_name="aws_sdk_pandas_log_group",
... log_stream_name_prefix="aws_sdk_pandas_log_stream",
... )
Get all log events contains 'REPORT' from log stream
'aws_sdk_pandas_log_stream_one' and 'aws_sdk_pandas_log_stream_two'
from log group 'aws_sdk_pandas_log_group'
>>> import awswrangler as wr
>>> df = wr.cloudwatch.filter_log_events(
... log_group_name="aws_sdk_pandas_log_group",
... log_stream_names=["aws_sdk_pandas_log_stream_one","aws_sdk_pandas_log_stream_two"],
... filter_pattern="REPORT",
... )
"""
if log_stream_name_prefix and log_stream_names:
raise exceptions.InvalidArgumentCombination(
"Cannot call `filter_log_events` with both `log_stream_names` and `log_stream_name_prefix`"
)
_logger.debug("log_group_name: %s", log_group_name)
events: list[dict[str, Any]] = []
if not log_stream_names:
describe_log_streams_args: dict[str, Any] = {
"log_group_name": log_group_name,
}
if boto3_session:
describe_log_streams_args["boto3_session"] = boto3_session
if log_stream_name_prefix:
describe_log_streams_args["log_stream_name_prefix"] = log_stream_name_prefix
log_streams = describe_log_streams(**describe_log_streams_args)
log_stream_names = log_streams["logStreamName"].tolist() if len(log_streams.index) else []
args: dict[str, Any] = {
"log_group_name": log_group_name,
}
if start_time:
args["start_timestamp"] = int(1000 * start_time.timestamp())
if end_time:
args["end_timestamp"] = int(1000 * end_time.timestamp())
if filter_pattern:
args["filter_pattern"] = filter_pattern
if boto3_session:
args["boto3_session"] = boto3_session
chunked_log_streams_size: int = 50
for i in range(0, len(log_stream_names), chunked_log_streams_size):
log_streams = log_stream_names[i : i + chunked_log_streams_size]
events += _filter_log_events(**args, log_stream_names=log_streams)
if events:
df: pd.DataFrame = pd.DataFrame(events)
df["logGroupName"] = log_group_name
return df
return pd.DataFrame()