Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kinesisfirehose): add HTTP Endpoint and Datadog destination #33657

Open
wants to merge 20 commits into
base: main
Choose a base branch
from
Open
Prev Previous commit
Next Next commit
fix: refactor
  • Loading branch information
Benjamin Pottier authored and benjaminpottier committed Mar 5, 2025
commit 11214bfab717a992435754602c3f219d0a3b5b57
136 changes: 21 additions & 115 deletions packages/aws-cdk-lib/aws-kinesisfirehose/lib/datadog.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,6 @@
import { Construct } from 'constructs';
import { BackupMode, CommonDestinationProps } from './common';
import { DestinationBindOptions, DestinationConfig, IDestination } from './destination';
import { CfnDeliveryStream } from './kinesisfirehose.generated';
import * as iam from '../../aws-iam';
import { Attribute, HTTPBackupMode, HTTPEndpoint } from './http-endpoint';
import { ISecret } from '../../aws-secretsmanager';
import { Duration, Size } from '../../core';
import { createBackupConfig, createLoggingOptions, createProcessingConfig } from './private/helpers';
import { Bucket } from '../../aws-s3';

/**
* Datadog logs HTTP endpoint URLs
@@ -90,40 +84,10 @@ export enum DatadogConfigurationsEndpointUrl {
DATADOG_CONFIGURATION_US_GOV = 'https://cloudplatform-intake.ddog-gov.com/api/v2/cloudchanges?dd-protocol=aws-kinesis-firehose',
}

/**
* The buffering options that can be used before data is delivered to the specified destination.
*/
export interface BufferHints {
/**
* The higher interval allows more time to collect data and the size of data may be bigger. The lower interval sends the data more frequently and may be more advantageous when looking at shorter cycles of data activity.
* @default 60 seconds
*/
readonly interval?: Duration;
/**
* The higher buffer size may be lower in cost with higher latency. The lower buffer size will be faster in delivery with higher cost and less latency.
* @default 4 MiB
*/
readonly size?: Size;
}

/**
* Datadog tag
*/
export interface DatadogTag {
/**
* Tag key
*/
readonly key: string;
/**
* Tag value
*/
readonly value: string;
}

/**
* Props for defining a Datadog destination of a Kinesis Data Firehose delivery stream.
*/
export interface DatadogProps extends CommonDestinationProps {
export interface DatadogProps {
/**
* The API key required to enable data delivery from Amazon Data Firehose.
*/
@@ -132,93 +96,35 @@ export interface DatadogProps extends CommonDestinationProps {
* The URL of the Datadog endpoint.
*/
readonly url: DatadogLogsEndpointUrl | DatadogMetricsEndpointUrl | DatadogConfigurationsEndpointUrl;
/**
* Amazon Data Firehose buffers incoming records before delivering them to your Datadog domain.
* @default - 60 second interval with 4MiB size.
*/
readonly bufferHints?: BufferHints;
/**
* The time period during which Amazon Data Firehose retries sending data to the selected HTTP endpoint.
* @default 60 seconds
*/
readonly retryDuration?: Duration;
/**
* Datadog tags to apply for filtering.
* @default - No tags.
*/
readonly tags?: DatadogTag[];
readonly tags?: Attribute[];
/**
* @default - failed only
*/
readonly backupMode?: HTTPBackupMode;
}

/**
* A Datadog destination for data from a Kinesis Data Firehose delivery stream.
*/
export class Datadog implements IDestination {
constructor(private readonly props: DatadogProps) { }

bind(scope: Construct, _options: DestinationBindOptions): DestinationConfig {
const role = this.props.role ?? new iam.Role(scope, 'Datadog Destination Role', {
assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
});

const { loggingOptions, dependables: loggingDependables } = createLoggingOptions(scope, {
loggingConfig: this.props.loggingConfig,
role,
streamId: 'DatadogDestination',
}) ?? {};

const bucket = new Bucket(scope, 'S3 Configuration', {});
bucket.grantReadWrite(role);

const { dependables: backupDependables } = createBackupConfig(scope, role, this.props.s3Backup) ?? {};

this.props.apiKey.grantRead(role);

return {
httpEndpointDestinationConfiguration: {
cloudWatchLoggingOptions: loggingOptions,
processingConfiguration: createProcessingConfig(scope, role, this.props.processor),
roleArn: role.roleArn,
s3Configuration: {
bucketArn: bucket.bucketArn,
roleArn: role.roleArn,
},
s3BackupMode: this.getS3BackupMode(),
endpointConfiguration: {
url: this.props.url,
},
requestConfiguration: {
contentEncoding: 'GZIP',
commonAttributes: this.createAttributesFromTags(),
},
retryOptions: {
durationInSeconds: this.props.retryDuration?.toSeconds() ?? 60,
},
bufferingHints: {
sizeInMBs: this.props.bufferHints?.size?.toMebibytes() ?? 4,
intervalInSeconds: this.props.bufferHints?.interval?.toSeconds() ?? 60,
},
secretsManagerConfiguration: {
enabled: true,
roleArn: role.roleArn,
secretArn: this.props.apiKey.secretArn,
},
export class Datadog extends HTTPEndpoint {
constructor(props: DatadogProps) {
super({
endpoint: {
url: props.url,
secret: props.apiKey,
},
dependables: [...(loggingDependables ?? []), ...(backupDependables ?? [])],
};
}
private getS3BackupMode(): string | undefined {
return this.props.s3Backup?.bucket || this.props.s3Backup?.mode === BackupMode.ALL
? 'Enabled'
: undefined;
}
private createAttributesFromTags(): CfnDeliveryStream.HttpEndpointCommonAttributeProperty[] {
let attributes: any = [];
this.props.tags?.forEach((tag) => {
attributes.push({
attributeName: tag.key,
attributeValue: tag.value,
});
bufferingHints: {
interval: Duration.seconds(60),
size: Size.mebibytes(4),
},
retryOptions: {
duration: Duration.seconds(60),
},
attributes: props.tags ?? [],
});
return attributes;
}
}
194 changes: 194 additions & 0 deletions packages/aws-cdk-lib/aws-kinesisfirehose/lib/http-endpoint.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,194 @@
import { Construct } from 'constructs';
import { CommonDestinationProps, BackupMode as S3BackupMode } from './common';
import { DestinationBindOptions, DestinationConfig, IDestination } from './destination';
import * as iam from '../../aws-iam';
import { createBackupConfig, createLoggingOptions, createProcessingConfig } from './private/helpers';
import { ISecret } from '../../aws-secretsmanager';
import { Duration, Size } from '../../core';

/**
*
*/
export enum HTTPCompression {
/**
*
*/
GZIP = 'GZIP',
/**
*
*/
NONE = 'NONE',
}

/**
*
*/
export enum HTTPBackupMode {
/**
*
*/
FAILED = 'FailedDataOnly',
/**
*
*/
ALL = 'AllData',
}

/**
* The buffering options that can be used before data is delivered to the specified destination.
*/
export interface BufferingHints {
/**
* The higher interval allows more time to collect data and the size of data may be bigger. The lower interval sends the data more frequently and may be more advantageous when looking at shorter cycles of data activity.
* @default 60 seconds
*/
readonly interval?: Duration;
/**
* The higher buffer size may be lower in cost with higher latency. The lower buffer size will be faster in delivery with higher cost and less latency.
* @default 4 MiB
*/
readonly size?: Size;
}

/**
*
*/
export interface RetryOptions {
/**
*
*/
readonly duration: Duration;
}

/**
*
*/
export interface Endpoint {
/**
*
*/
readonly url: string;
/**
* @default - Not used
*/
readonly accessKey?: string;
/**
*
* @default - Not used
*/
readonly secret?: ISecret;
/**
* @default - Not used
*/
readonly name?: string;
}

/**
*
*/
export interface Attribute {
/**
*
*/
readonly name: string;
/**
*
*/
readonly value: string;
}

/**
*
*/
export interface HTTPEndpointProps extends CommonDestinationProps {
/**
*
*/
readonly endpoint: Endpoint;
/**
* @default - FailedDataOnly
*/
readonly backupMode?: HTTPBackupMode;
/**
* @default - GZIP
*/
readonly requestCompression?: HTTPCompression;
/**
* @default -
*/
readonly bufferingHints?: BufferingHints;
/**
* @default -
*/
readonly retryOptions?: RetryOptions;
/**
* @default - None
*/
readonly attributes?: Attribute[];
}

/**
*
*/
export class HTTPEndpoint implements IDestination {
constructor(private readonly props: HTTPEndpointProps) {}
bind(scope: Construct, _options: DestinationBindOptions): DestinationConfig {
const role = this.props.role ?? new iam.Role(scope, 'HTTP Destination Role', {
assumedBy: new iam.ServicePrincipal('firehose.amazonaws.com'),
});

const { loggingOptions, dependables: loggingDependables } = createLoggingOptions(scope, {
loggingConfig: this.props.loggingConfig,
role,
streamId: 'HTTPDestination',
}) ?? {};

const { backupConfig, dependables: backupDependables } = createBackupConfig(scope, role, {
mode: S3BackupMode.FAILED,
})!; // Probably not a good idea?

if (this.props.endpoint.secret) {
this.props.endpoint.secret.grantRead(role);
}

return {
httpEndpointDestinationConfiguration: {
endpointConfiguration: {
url: this.props.endpoint.url,
...this.props.endpoint.accessKey && { accessKey: this.props.endpoint.accessKey },
...this.props.endpoint.name && { name: this.props.endpoint.name },
},
...this.props.retryOptions && {
retryOptions: {
durationInSeconds: this.props.retryOptions?.duration.toSeconds(),
},
},
...this.props.bufferingHints && {
bufferingHints: {
...this.props.bufferingHints.interval && { intervalInSeconds: this.props.bufferingHints.interval.toSeconds() },
...this.props.bufferingHints.size && { sizeInMBs: this.props.bufferingHints.size.toMebibytes() },
},
},
requestConfiguration: {
contentEncoding: this.props.requestCompression ?? HTTPCompression.GZIP,
...this.props.attributes && {
commonAttributes: [...this.props.attributes.map(attr => ({ attributeName: attr.name, attributeValue: attr.value }))],
},
},
cloudWatchLoggingOptions: loggingOptions,
processingConfiguration: createProcessingConfig(scope, role, this.props.processor),
roleArn: role.roleArn,
s3BackupMode: this.props.backupMode ?? HTTPBackupMode.FAILED,
s3Configuration: backupConfig,
...this.props.endpoint.secret && {
secretsManagerConfiguration: {
secretArn: this.props.endpoint.secret.secretArn,
enabled: true,
roleArn: role.roleArn,
},
},
},
dependables: [...(loggingDependables ?? []), ...(backupDependables ?? [])],
};
}
}
1 change: 1 addition & 0 deletions packages/aws-cdk-lib/aws-kinesisfirehose/lib/index.ts
Original file line number Diff line number Diff line change
@@ -8,6 +8,7 @@ export * from './common';
export * from './s3-bucket';
export * from './logging-config';
export * from './datadog';
export * from './http-endpoint';

// AWS::KinesisFirehose CloudFormation Resources:
export * from './kinesisfirehose.generated';
Loading
Oops, something went wrong.