Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(logs-destinations): support Amazon Data Firehose logs destination #33683

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Data Firehose destination support
  • Loading branch information
Tietew committed Mar 4, 2025
commit 0097c320f736e267896784bb85bcf74546ed1692
61 changes: 61 additions & 0 deletions packages/aws-cdk-lib/aws-logs-destinations/lib/firehose.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
import { Construct } from 'constructs';
import * as iam from '../../aws-iam';
import * as firehose from '../../aws-kinesisfirehose';
import * as logs from '../../aws-logs';
import { Stack } from '../../core';

/**
* Customize the Amazon Data Firehose Logs Destination
*/
export interface FirehoseDestinationProps {
/**
* The role to assume to write log events to the destination
*
* @default - A new Role is created
*/
readonly role?: iam.IRole;
}

/**
* Use a Data Firehose delivery stream as the destination for a log subscription
*/
export class FirehoseDestination implements logs.ILogSubscriptionDestination {
/**
* @param stream The Data Firehose delivery stream to use as destination
* @param props The Data Firehose Destination properties
*
*/
constructor(private readonly stream: firehose.IDeliveryStream, private readonly props: FirehoseDestinationProps = {}) {
}

public bind(scope: Construct, _sourceLogGroup: logs.ILogGroup): logs.LogSubscriptionDestinationConfig {
// Following example from https://docs.aws.amazon.com/AmazonCloudWatch/latest/logs/SubscriptionFilters.html#FirehoseExample
// Create a role to be assumed by CWL that can write to this stream.
const id = 'CloudWatchLogsCanPutRecords';
const role = this.props.role ?? scope.node.tryFindChild(id) as iam.IRole ?? new iam.Role(scope, id, {
assumedBy: new iam.ServicePrincipal('logs.amazonaws.com', {
conditions: {
StringLike: {
'aws:SourceArn': Stack.of(scope).formatArn({ service: 'logs', resource: '*' }),
},
},
}),
});
role.addToPrincipalPolicy(new iam.PolicyStatement({
actions: ['firehose:PutRecord'],
resources: [this.stream.deliveryStreamArn],
}));

const policy = role.node.tryFindChild('DefaultPolicy') as iam.CfnPolicy;
if (policy) {
// Remove circular dependency
const cfnRole = role.node.defaultChild as iam.CfnRole;
cfnRole.addOverride('DependsOn', undefined);

// Ensure policy is created before subscription filter
scope.node.addDependency(policy);
}

return { arn: this.stream.deliveryStreamArn, role };
}
}
1 change: 1 addition & 0 deletions packages/aws-cdk-lib/aws-logs-destinations/lib/index.ts
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
export * from './lambda';
export * from './kinesis';
export * from './firehose';
178 changes: 178 additions & 0 deletions packages/aws-cdk-lib/aws-logs-destinations/test/firehose.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,178 @@
import { Template } from '../../assertions';
import * as iam from '../../aws-iam';
import * as firehose from '../../aws-kinesisfirehose';
import * as logs from '../../aws-logs';
import * as s3 from '../../aws-s3';
import * as cdk from '../../core';
import * as dests from '../lib';

test('stream can be subscription destination', () => {
// GIVEN
const stack = new cdk.Stack();
const bucket = new s3.Bucket(stack, 'MyBucket');
const stream = new firehose.DeliveryStream(stack, 'MyStream', { destination: new firehose.S3Bucket(bucket) });
const logGroup = new logs.LogGroup(stack, 'LogGroup');

// WHEN
new logs.SubscriptionFilter(stack, 'Subscription', {
logGroup,
destination: new dests.FirehoseDestination(stream),
filterPattern: logs.FilterPattern.allEvents(),
});

// THEN: subscription target is Stream
Template.fromStack(stack).hasResourceProperties('AWS::Logs::SubscriptionFilter', {
DestinationArn: { 'Fn::GetAtt': ['MyStream5C050E93', 'Arn'] },
RoleArn: { 'Fn::GetAtt': ['SubscriptionCloudWatchLogsCanPutRecords9C1223EC', 'Arn'] },
});

// THEN: we have a role to write to the Stream
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Role', {
AssumeRolePolicyDocument: {
Version: '2012-10-17',
Statement: [{
Action: 'sts:AssumeRole',
Effect: 'Allow',
Principal: {
Service: 'logs.amazonaws.com',
},
Condition: {
StringLike: {
'aws:SourceArn': { 'Fn::Join': ['', ['arn:', { Ref: 'AWS::Partition' }, ':logs:', { Ref: 'AWS::Region' }, ':', { Ref: 'AWS::AccountId' }, ':*']] },
},
},
}],
},
});

Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', {
PolicyDocument: {
Version: '2012-10-17',
Statement: [
{
Action: 'firehose:PutRecord',
Effect: 'Allow',
Resource: { 'Fn::GetAtt': ['MyStream5C050E93', 'Arn'] },
},
],
},
});
});

test('stream can be subscription destination twice, without duplicating permissions', () => {
// GIVEN
const stack = new cdk.Stack();
const bucket = new s3.Bucket(stack, 'MyBucket');
const stream = new firehose.DeliveryStream(stack, 'MyStream', { destination: new firehose.S3Bucket(bucket) });
const logGroup1 = new logs.LogGroup(stack, 'LogGroup');
const logGroup2 = new logs.LogGroup(stack, 'LogGroup2');

// WHEN
new logs.SubscriptionFilter(stack, 'Subscription', {
logGroup: logGroup1,
destination: new dests.FirehoseDestination(stream),
filterPattern: logs.FilterPattern.allEvents(),
});

new logs.SubscriptionFilter(stack, 'Subscription2', {
logGroup: logGroup2,
destination: new dests.FirehoseDestination(stream),
filterPattern: logs.FilterPattern.allEvents(),
});

// THEN: subscription target is Stream
Template.fromStack(stack).hasResourceProperties('AWS::Logs::SubscriptionFilter', {
DestinationArn: { 'Fn::GetAtt': ['MyStream5C050E93', 'Arn'] },
RoleArn: { 'Fn::GetAtt': ['SubscriptionCloudWatchLogsCanPutRecords9C1223EC', 'Arn'] },
});

// THEN: we have a role to write to the Stream
Template.fromStack(stack).hasResourceProperties('AWS::IAM::Role', {
AssumeRolePolicyDocument: {
Version: '2012-10-17',
Statement: [{
Action: 'sts:AssumeRole',
Effect: 'Allow',
Principal: {
Service: 'logs.amazonaws.com',
},
Condition: {
StringLike: {
'aws:SourceArn': { 'Fn::Join': ['', ['arn:', { Ref: 'AWS::Partition' }, ':logs:', { Ref: 'AWS::Region' }, ':', { Ref: 'AWS::AccountId' }, ':*']] },
},
},
}],
},
});

Template.fromStack(stack).hasResourceProperties('AWS::IAM::Policy', {
PolicyDocument: {
Version: '2012-10-17',
Statement: [
{
Action: 'firehose:PutRecord',
Effect: 'Allow',
Resource: { 'Fn::GetAtt': ['MyStream5C050E93', 'Arn'] },
},
],
},
});
});

test('an existing IAM role can be passed to new destination instance instead of auto-created', ()=> {
// GIVEN
const stack = new cdk.Stack();
const bucket = new s3.Bucket(stack, 'MyBucket');
const stream = new firehose.DeliveryStream(stack, 'MyStream', { destination: new firehose.S3Bucket(bucket) });
const logGroup = new logs.LogGroup(stack, 'LogGroup');

const importedRole = iam.Role.fromRoleArn(stack, 'ImportedRole', 'arn:aws:iam::123456789012:role/ImportedRoleFirehoseDestinationTest');

const firehoseDestination = new dests.FirehoseDestination(stream, { role: importedRole });

new logs.SubscriptionFilter(logGroup, 'MySubscriptionFilter', {
logGroup: logGroup,
destination: firehoseDestination,
filterPattern: logs.FilterPattern.allEvents(),
});

// THEN
const template = Template.fromStack(stack);
template.resourceCountIs('AWS::IAM::Role', 1);
template.hasResourceProperties('AWS::Logs::SubscriptionFilter', {
RoleArn: importedRole.roleArn,
});
});

test('creates a new IAM Role if not passed on new destination instance', ()=> {
// GIVEN
const stack = new cdk.Stack();
const bucket = new s3.Bucket(stack, 'MyBucket');
const stream = new firehose.DeliveryStream(stack, 'MyStream', { destination: new firehose.S3Bucket(bucket) });
const logGroup = new logs.LogGroup(stack, 'LogGroup');

const firehoseDestination = new dests.FirehoseDestination(stream);

new logs.SubscriptionFilter(logGroup, 'MySubscriptionFilter', {
logGroup: logGroup,
destination: firehoseDestination,
filterPattern: logs.FilterPattern.allEvents(),
});

// THEN
const template = Template.fromStack(stack);
template.resourceCountIs('AWS::IAM::Role', 2);
template.hasResourceProperties('AWS::Logs::SubscriptionFilter', {
RoleArn: {
'Fn::GetAtt': [
'LogGroupMySubscriptionFilterCloudWatchLogsCanPutRecords9112BD02',
'Arn',
],
},
});

// THEN: SubscriptionFilter depends on the default Role's Policy
template.hasResource('AWS::Logs::SubscriptionFilter', {
DependsOn: ['LogGroupMySubscriptionFilterCloudWatchLogsCanPutRecordsDefaultPolicyEC6729D5'],
});
});
17 changes: 17 additions & 0 deletions packages/aws-cdk-lib/aws-logs/README.md
Original file line number Diff line number Diff line change
@@ -149,6 +149,23 @@ new logs.SubscriptionFilter(this, 'Subscription', {
});
```

When you use `FirehoseDestination`, you can choose the method used to
distribute log data to the destination by setting the `distribution` property.

```ts
import * as destinations from 'aws-cdk-lib/aws-logs-destinations';
import * as firehose from 'aws-cdk-lib/aws-kinesisfirehose';

declare const deliveryStream: firehose.IDeliveryStream;
declare const logGroup: logs.LogGroup;

new logs.SubscriptionFilter(this, 'Subscription', {
logGroup,
destination: new destinations.FirehoseDestination(deliveryStream),
filterPattern: logs.FilterPattern.allEvents(),
});
```

## Metric Filters

CloudWatch Logs can extract and emit metrics based on a textual log stream.