Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Empty KinesisEndpoint doesn't default to generated endpoint #5

Closed
errorhandler opened this issue Feb 17, 2022 · 2 comments · Fixed by #13
Closed

Empty KinesisEndpoint doesn't default to generated endpoint #5

errorhandler opened this issue Feb 17, 2022 · 2 comments · Fixed by #13
Labels
bug Something isn't working

Comments

@errorhandler
Copy link

Describe the bug

The comments for the AWS endpoint fields state that:

		// KinesisEndpoint is an optional endpoint URL that overrides the default generated endpoint for a Kinesis client.
		// If this is empty, the default generated endpoint will be used.
...
		// DynamoDBEndpoint is an optional endpoint URL that overrides the default generated endpoint for a DynamoDB client.
		// If this is empty, the default generated endpoint will be used.

However, if they are empty then the AWS clients are configured with the endpoints "", rather than defaulting to the default endpoint resolver.

Reproduction steps

1.Configure the KCL worker without specifying the DynamoDB and Kinesis endpoints

Expected behavior

The default endpoint resolver is used.

Additional context

No response

@errorhandler errorhandler added the bug Something isn't working label Feb 17, 2022
@pandar00
Copy link

I noticed the same thing. The DynamoDB checkpointer implementation does the empty string check and make sure to only override the Endpoint if the config's URL length is greater than 1

https://github.com/vmware/vmware-go-kcl-v2/blob/main/clientlibrary/checkpoint/dynamodb-checkpointer.go#L94

			if service == dynamodb.ServiceID && len(checkpointer.kclConfig.DynamoDBEndpoint) > 0 {
				return aws.Endpoint{
					PartitionID:   "aws",
					URL:           checkpointer.kclConfig.DynamoDBEndpoint,
					SigningRegion: checkpointer.kclConfig.RegionName,
				}, nil
			}
			// returning EndpointNotFoundError will allow the service to fallback to it's default resolution
			return aws.Endpoint{}, &aws.EndpointNotFoundError{}

But the worker which reads from Kinesis doesn't do this
https://github.com/vmware/vmware-go-kcl-v2/blob/main/clientlibrary/worker/worker.go#L165

		resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
			return aws.Endpoint{
				PartitionID:   "aws",
				URL:           w.kclConfig.KinesisEndpoint,
				SigningRegion: w.regionName,
			}, nil
		})

AWS specifies resolver to return EndpointNotFoundError to fall back to default implementation
https://github.com/aws/aws-sdk-go-v2/blob/c214cb61990441aa165e216a3f7e845c50d21939/aws/endpoints.go#L187

// EndpointResolverWithOptions is an endpoint resolver that can be used to provide or
// override an endpoint for the given service, region, and the service client's EndpointOptions. API clients will
// attempt to use the EndpointResolverWithOptions first to resolve an endpoint if
// available. If the EndpointResolverWithOptions returns an EndpointNotFoundError error,
// API clients will fallback to attempting to resolve the endpoint using its
// internal default endpoint resolver.

@pranay-1995
Copy link

pranay-1995 commented Jun 10, 2022

I managed to achieve this by doing workaround like this,

func CreateKinesisClient(kclConfig *cfg.KinesisClientLibConfiguration) *kinesis.Client {
	ctx := context.Background()
	resolver := aws.EndpointResolverWithOptionsFunc(func(service, region string, options ...interface{}) (aws.Endpoint, error) {
		if service == kinesis.ServiceID && len(kclConfig.KinesisEndpoint) > 0 {
			return aws.Endpoint{
				PartitionID:   "aws",
				URL:           kclConfig.KinesisEndpoint,
				SigningRegion: kclConfig.RegionName,
			}, nil
		}
		// returning EndpointNotFoundError will allow the service to fallback to it's default resolution
		return aws.Endpoint{}, &aws.EndpointNotFoundError{}
	})

	cfg, err := config.LoadDefaultConfig(ctx, config.WithEndpointResolverWithOptions(resolver))
	if err != nil {
		log.Panic().Err(err).Msg("Failed loading default config for kinesis client")
	}

	kc := kinesis.NewFromConfig(cfg)
	return kc
}
worker := wk.NewWorker(&RecordProcessorFactory{EventsProcessor: processor}, kclConfig).WithKinesis(CreateKinesisClient(kclConfig))

But ideally this code should be added to the part mentioned by pandar00 - https://github.com/vmware/vmware-go-kcl-v2/blob/main/clientlibrary/worker/worker.go#L165

calebstewart added a commit to calebstewart/vmware-go-kcl-v2 that referenced this issue Oct 13, 2022
This commit fixes vmware#5 by returning `aws.EndpointNotFoundError` from the
endpoint resolver when no `KinesisEndpoint` is defined, which will
resolve the default AWS endpoint. This is the same process used by the
DynamoDB checkpointer to resolve the default endpoint.

Signed-off-by: Caleb Stewart <caleb.stewart94@gmail.com>
@vmwjc vmwjc closed this as completed in #13 Apr 4, 2023
magiusdarrigo added a commit to magiusdarrigo/vmware-go-kcl-v2 that referenced this issue Jun 20, 2023
* fixing infinite worker loop

Signed-off-by: Mike Monaghan <mike_monaghan@live.ca>

* Automatically resolve default KinesisEndpoint

This commit fixes vmware#5 by returning `aws.EndpointNotFoundError` from the
endpoint resolver when no `KinesisEndpoint` is defined, which will
resolve the default AWS endpoint. This is the same process used by the
DynamoDB checkpointer to resolve the default endpoint.

Signed-off-by: Caleb Stewart <caleb.stewart94@gmail.com>

* fix: catch DynamoDB Scan error when trying to scan nonexistent table/index in syncLeases()

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>

* chore: Adding periods to copyright comment to satisfy gofmt

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>

* feat: Sending renewed lease metric

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>

* fix: add DeleteMetricMillisBehindLatest for error case

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>

* Refactor in prep for testing rate limiting improvements

Signed-off-by: John Calixto <jcalixto@vmware.com>

* fix: add getRecords TPS rate limiting

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>

* fix: add hard cap maxRetries for getRecord errors

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>

* fix: add maxBytes per second getRecord check

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>

* chore: log RemoveLeaseOwner errors with debug instead of error

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>

* fix: add check for GetRecords error within callGetRecordsAPI

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>

* fix: use nanosecond precision in lease comparisons

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>

* chore: add info logs in sleep case for kinesis backoff errors

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>

* fix: Check token bucket corner cases correctly.

Signed-off-by: John Calixto <jcalixto@vmware.com>

* Bump github.com/prometheus/client_golang from 1.11.0 to 1.11.1

Bumps [github.com/prometheus/client_golang](https://github.com/prometheus/client_golang) from 1.11.0 to 1.11.1.
- [Release notes](https://github.com/prometheus/client_golang/releases)
- [Changelog](https://github.com/prometheus/client_golang/blob/main/CHANGELOG.md)
- [Commits](prometheus/client_golang@v1.11.0...v1.11.1)

---
updated-dependencies:
- dependency-name: github.com/prometheus/client_golang
  dependency-type: direct:production
...

Signed-off-by: dependabot[bot] <support@github.com>

* Bump golang.org/x/sys from 0.0.0-20211216021012-1d35b9e2eb4e to 0.1.0

Bumps [golang.org/x/sys](https://github.com/golang/sys) from 0.0.0-20211216021012-1d35b9e2eb4e to 0.1.0.
- [Release notes](https://github.com/golang/sys/releases)
- [Commits](https://github.com/golang/sys/commits/v0.1.0)

---
updated-dependencies:
- dependency-name: golang.org/x/sys
  dependency-type: indirect
...

Signed-off-by: dependabot[bot] <support@github.com>

* fix: add shutdown and leaseExpired error cases for checkpoint function

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>

* feat: make lease renewal async

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>

* fix: return err log in case of ErrLeaseNotAcquired

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>

* chore: Remove extraneous err check

After checking the scan result above this line, checking err here no
longer has any effect.

Signed-off-by: John Calixto <jcalixto@vmware.com>

* fix: pass in ctx with cancel for renewLease

Signed-off-by: Shiva Pentakota <spentakota@vmware.com>

* ran go mod tidy

---------

Signed-off-by: Mike Monaghan <mike_monaghan@live.ca>
Signed-off-by: Caleb Stewart <caleb.stewart94@gmail.com>
Signed-off-by: Shiva Pentakota <spentakota@vmware.com>
Signed-off-by: John Calixto <jcalixto@vmware.com>
Signed-off-by: dependabot[bot] <support@github.com>
Co-authored-by: Mike Monaghan <mike_monaghan@live.ca>
Co-authored-by: Caleb Stewart <caleb.stewart94@gmail.com>
Co-authored-by: Shiva Pentakota <spentakota@vmware.com>
Co-authored-by: spentakota <120056013+spentakota@users.noreply.github.com>
Co-authored-by: John Calixto <jcalixto@vmware.com>
Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: cmckelvey-vmware <85366153+cmckelvey-vmware@users.noreply.github.com>
Co-authored-by: vmwjc <108959326+vmwjc@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working
Projects
None yet
Development

Successfully merging a pull request may close this issue.

3 participants