Skip to content

Commit

Permalink
Merge branch 'integration202109' of https://github.com/uc-cdis/audit-…
Browse files Browse the repository at this point in the history
…service into stable
  • Loading branch information
PlanXCyborg committed Aug 30, 2021
2 parents 1a279da + b937fbf commit 8acc13a
Show file tree
Hide file tree
Showing 8 changed files with 67 additions and 22 deletions.
7 changes: 3 additions & 4 deletions Jenkinsfile
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
#!groovy

// TODO: uncomment once we have integration tests for this service
// @Library('cdis-jenkins-lib@master') _
@Library('cdis-jenkins-lib@master') _

// testPipeline {
// }
testPipeline {
}
10 changes: 10 additions & 0 deletions docs/explanation/creating_audit_logs.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,3 +11,13 @@ However, it's difficult to monitor errors when using this endpoint.
## Pulling from a queue

The audit service can also handle pulling audit logs from a queue, which allows for easier monitoring. This can be configured by turning on the `PULL_FROM_QUEUE` flag in the configuration file (enabled by default). Right now, only AWS SQS is integrated, but integrations for other types of queues can be added by adding code and extending the values accepted for the `QUEUE_CONFIG.type` field in the configuration file.

## Timestamps

In most cases, services should **not** provide a timestamp when creating audit logs. The timestamp is only accepted in log creation requests to allow populating the audit database with historical data, for example by parsing historical logs from before the Audit Service was deployed to a Data Commons.

For other use cases, for consistency and to avoid mistakes, the timestamp should either be generated automatically by `audit-service` (when using the API's creation endpoint) or come from the message's `SentTimestamp` (when using AWS SQS).

> Why rely on `SentTimestamp`?
Because `audit-service` sleeps before fetching multiple messages from the SQS, the timestamps generated by `audit-service` at the time it processes the messages may not be accurate. For example, two messages sent to the SQS at the different times could be received by `audit-service` at the same time and end up with the same timestamp.
4 changes: 2 additions & 2 deletions docs/how-to/deployment.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,11 +16,11 @@ The `PUSH_AUDIT_LOGS_CONFIG` field must also be configured.
- **Systems where the audit-service has been deployed previously:** Fence 5.1.0 or more recent might NOT work with audit-service < 1.0.0. Update audit-service, then run `kubectl delete secret audit-g3auto` and `gen3 kube-setup-audit-service`. Then configure [PUSH_AUDIT_LOGS_CONFIG](https://github.com/uc-cdis/fence/blob/5.1.0/fence/config-default.yaml#L632-L636) in the Fence config (run `gen3 sqs info $(gen3 api safe-name audit-sqs)` to get the SQS URL) and run `kubectl delete secret fence-config` and `gen3 kube-setup-fence`.
- **Systems where the audit-service has never been deployed:** run `gen3 kube-setup-audit-service`. Then configure [ENABLE_AUDIT_LOGS](https://github.com/uc-cdis/fence/blob/5.1.0/fence/config-default.yaml#L624-L626) and [PUSH_AUDIT_LOGS_CONFIG](https://github.com/uc-cdis/fence/blob/5.1.0/fence/config-default.yaml#L632-L636) in the Fence config (run `gen3 sqs info $(gen3 api safe-name audit-sqs)` to get the SQS URL) and run `kubectl delete secret fence-config` and `gen3 kube-setup-fence`.

When deploying with Gen3 cloud-automation, there is no need to configure `PUSH_AUDIT_LOGS_CONFIG.aws_sqs_config.aws_access_key_id` and `PUSH_AUDIT_LOGS_CONFIG.aws_sqs_config.aws_secret_access_key` in the Fence configuration.
When deploying with Gen3 cloud-automation, there is no need to configure `PUSH_AUDIT_LOGS_CONFIG.aws_sqs_config.aws_cred` in the Fence configuration.

See the [Fence default configuration file](https://github.com/uc-cdis/fence/blob/5.1.0/fence/config-default.yaml#L622-L638) for more details.

## Notes

1. When adding audit log creation in a service for the first time, the `audit-service` deployment file `network-ingress` annotation (see [here](https://github.com/uc-cdis/cloud-automation/blob/27770776d239bc609bbbd23607689cf62de1bc66/kube/services/audit-service/audit-service-deploy.yaml#L6)) must be updated to allow the service to talk to `audit-service`.
2. In most cases, services should **not** provide a timestamp when creating audit logs. The timestamp is only accepted in log creation requests to allow populating the audit database with historical data, for example by parsing historical logs from before the Audit Service was deployed to a Data Commons.
2. In most cases, services should **not** provide a timestamp when creating audit logs. See [Creating audit logs, Timestamps section](../explanation/creating_audit_logs.md#timestamps).
2 changes: 1 addition & 1 deletion docs/reference/architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ Filters can be added as query strings. Accepted filters include all fields for t

If queries are time-boxed (depends on configuration variable `QUERY_TIMEBOX_MAX_DAYS`), (`stop` - `start`) must be lower than the configured maximum.

We can populate the Audit Service database with historical data by parsing logs and making POST requests to create audit entries, because the POST endpoint accepts the timestamp as an optional parameter.
We can populate the Audit Service database with historical data by parsing logs and making POST requests to create audit entries, because the log creation endpoint accepts the timestamp as an optional parameter.

About retention: for now, there is no planned mechanism to delete old entries.

Expand Down
13 changes: 9 additions & 4 deletions src/audit/config-default.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,24 @@ ARBORIST_URL:
PULL_FROM_QUEUE: true
# `QUEUE_CONFIG.type` is one of: [aws_sqs].
# - if type == aws_sqs: logs are pulled from an SQS and `aws_sqs_config`
# fields `sqs_url` and `region` are required. Fields `aws_access_key_id` and
# `aws_secret_access_key` are optional.
# fields `sqs_url` and `region` are required. Field `aws_cred` is optional and
# it should be a key in section `AWS_CREDENTIALS`.
QUEUE_CONFIG:
type: aws_sqs
aws_sqs_config:
sqs_url:
region:
aws_access_key_id:
aws_secret_access_key:
aws_cred:
# how often to check the queue for new audit logs after
# seeing an empty queue
PULL_FREQUENCY_SECONDS: 300 # default: 5 min

# NOTE: Remove the {} and supply creds if needed. Example in comments below
AWS_CREDENTIALS: {}
# CRED1:
# aws_access_key_id: ''
# aws_secret_access_key: ''

####################
# DATABASE #
####################
Expand Down
4 changes: 4 additions & 0 deletions src/audit/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,10 @@ def validate(self, logger) -> None:
logger.warning(
f"'PULL_FROM_QUEUE' is enabled with 'type' == 'aws_sqs', but config is missing 'QUEUE_CONFIG.aws_sqs_config.{key}'"
)
if "aws_cred" in aws_sqs_config and aws_sqs_config["aws_cred"]:
assert (
aws_sqs_config["aws_cred"] in config["AWS_CREDENTIALS"]
), f"The 'QUEUE_CONFIG.aws_sqs_config.aws_cred' value '{aws_sqs_config['aws_cred']}' is not configured in 'AWS_CREDENTIALS'"
else:
raise Exception(
f"Config 'QUEUE_CONFIG.type': unknown queue type '{queue_type}'"
Expand Down
32 changes: 28 additions & 4 deletions src/audit/pull_from_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,13 +9,17 @@
from .routes.maintain import insert_row, validate_presigned_url_log, validate_login_log


async def process_log(data):
async def process_log(data, timestamp):
# check log category
category = data.pop("category")
assert (
category and category in CATEGORY_TO_MODEL_CLASS
), f"Unknown log category {category}"

# if the timestamp was not provided, default to the message's SentTimestamp
if not data.get("timestamp"):
data["timestamp"] = timestamp

# validate log
if category == "presigned_url":
validate_presigned_url_log(data)
Expand All @@ -33,6 +37,7 @@ async def pull_from_queue(sqs):
response = sqs.receive_message(
QueueUrl=config["QUEUE_CONFIG"]["aws_sqs_config"]["sqs_url"],
MaxNumberOfMessages=10, # 10 is the max allowed by AWS
AttributeNames=["SentTimestamp"],
)
messages = response.get("Messages", [])
except Exception as e:
Expand All @@ -43,8 +48,11 @@ async def pull_from_queue(sqs):
for message in messages:
data = json.loads(message["Body"])
receipt_handle = message["ReceiptHandle"]
# when the message was sent to the queue
sent_timestamp = message["Attributes"]["SentTimestamp"]
timestamp = int(int(sent_timestamp) / 1000) # ms to s
try:
await process_log(data)
await process_log(data, timestamp)
except Exception as e:
failed = True
logger.error(f"Error processing audit log: {e}")
Expand Down Expand Up @@ -73,11 +81,27 @@ async def pull_from_queue_loop():
"""
logger.info("Starting to pull from queue...")
aws_sqs_config = config["QUEUE_CONFIG"]["aws_sqs_config"]
# we know the cred is in AWS_CREDENTIALS (see `AuditServiceConfig.validate`)
aws_creds = (
config["AWS_CREDENTIALS"][aws_sqs_config["aws_cred"]]
if "aws_cred" in aws_sqs_config
else {}
)
if (
not aws_creds
and "aws_access_key_id" in aws_sqs_config
and "aws_secret_access_key" in aws_sqs_config
):
# for backwards compatibility
aws_creds = {
"aws_access_key_id": aws_sqs_config["aws_access_key_id"],
"aws_secret_access_key": aws_sqs_config["aws_secret_access_key"],
}
sqs = boto3.client(
"sqs",
region_name=aws_sqs_config["region"],
aws_access_key_id=aws_sqs_config.get("aws_access_key_id"),
aws_secret_access_key=aws_sqs_config.get("aws_secret_access_key"),
aws_access_key_id=aws_creds.get("aws_access_key_id"),
aws_secret_access_key=aws_creds.get("aws_secret_access_key"),
)
sleep_time = config["PULL_FREQUENCY_SECONDS"]
while True:
Expand Down
17 changes: 10 additions & 7 deletions tests/test_queue.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,19 @@ async def test_process_log_success():
# create a log
guid = "dg.hello/abc"
category = "presigned_url"
timestamp = int(time.time())
message_data = {
"category": category,
"request_url": f"/request_data/download/{guid}",
"status_code": 200,
"timestamp": int(time.time()),
"username": "audit-service_user",
"sub": 10,
"guid": guid,
"resource_paths": ["/my/resource/path1", "/path2"],
"action": "download",
"protocol": "s3",
}
await process_log(message_data)
await process_log(message_data, timestamp)

data = await db.all(db.text(f"select * from {category}"))
assert len(data) == 1, f"1 row should have been inserted in table '{category}'"
Expand Down Expand Up @@ -60,11 +60,11 @@ async def test_process_log_failure():
# attempt to create a log with a bad category
guid = "dg.hello/abc"
category = "this_does_not_exist"
timestamp = int(time.time())
message_data = {
"category": category,
"request_url": f"/request_data/download/{guid}",
"status_code": 200,
"timestamp": int(time.time()),
"username": "audit-service_user",
"sub": 10,
"guid": guid,
Expand All @@ -73,7 +73,7 @@ async def test_process_log_failure():
"protocol": "s3",
}
with pytest.raises(AssertionError, match=f"Unknown log category {category}"):
await process_log(message_data)
await process_log(message_data, timestamp)

for category in CATEGORY_TO_MODEL_CLASS:
data = await db.all(db.text(f"select * from {category}"))
Expand All @@ -82,7 +82,6 @@ async def test_process_log_failure():
), f"Nothing should have been inserted in table '{category}'"

# attempt to create a log with missing fields
guid = "dg.hello/abc"
category = "presigned_url"
message_data = {
"category": category,
Expand All @@ -92,7 +91,7 @@ async def test_process_log_failure():
"action": "download",
}
with pytest.raises(Exception, match="null value in column"):
await process_log(message_data)
await process_log(message_data, timestamp)

# make sure `process_log` did not insert any rows
for category in CATEGORY_TO_MODEL_CLASS:
Expand Down Expand Up @@ -127,12 +126,13 @@ def __init__(self, messages=None) -> None:
else:
self.messages = messages

def receive_message(self, QueueUrl, MaxNumberOfMessages):
def receive_message(self, QueueUrl, MaxNumberOfMessages, AttributeNames):
n_messages = min(MaxNumberOfMessages, len(self.messages))
messages = [
{
"Body": json.dumps(message),
"ReceiptHandle": "123",
"Attributes": {"SentTimestamp": int(time.time())},
}
for message in self.messages[:n_messages]
]
Expand All @@ -142,6 +142,9 @@ def delete_message(self, QueueUrl, ReceiptHandle):
pass


TestQueue.__test__ = False # prevent pytest from trying to collect it


@pytest.mark.asyncio
async def test_pull_from_queue_success(monkeypatch):
"""
Expand Down

0 comments on commit 8acc13a

Please sign in to comment.