Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Enable spot instances #11

Merged
merged 8 commits into from
Jan 27, 2024

Conversation

thodson-usgs
Copy link
Contributor

@thodson-usgs thodson-usgs commented Jan 14, 2024

This PR enables optionally using spot instances. Ref #10.

NOTE: This deploys without error, but I haven't started testing the checkpointing yet.

Ideas for checkpoint tuning

References

@thodson-usgs thodson-usgs marked this pull request as ready for review January 21, 2024 15:21
@thodson-usgs
Copy link
Contributor Author

thodson-usgs commented Jan 21, 2024

@ranchodeluxe, @yuvipanda
I've come to understand that Flink does not use checkpointing in batch mode, so that extra configuration is unnecessary. Running the cluster on spot instances may be as simple as setting capacity_type="SPOT". However, I have left the default as "ON_DEMAND" and will continue testing "SPOT" on the USGS runner.

One thing that is a little unclear is whether we need to configure the job manager and autoscaler to always run "ON_DEMAND". Some blogs suggest this is a best practice, but their fault-tolerance might be covered in the managed K8 service (EKS).

P.S. One more thought
My limited understanding is that in batch mode, Flink well restart the job (task?) on failure. So, if a recipe includes a particularly long and costly job, like a big rechunk and transfer, it might be advisable to stick with "ON_DEMAND".

@thodson-usgs
Copy link
Contributor Author

thodson-usgs commented Jan 21, 2024

Digging in a little further. I think we would:

  1. leave the core node group as on demand
  2. create a second node group of spot instances
  3. configure autoscaler to scale each node group: job managers to core and task managers to spot.
    I'll need to investigate the last piece further.

@thodson-usgs
Copy link
Contributor Author

thodson-usgs commented Jan 22, 2024

Learning more, we might want to take advantage of Flink's High Availability (HA) Kubernetes Services:
"The Operator supports both Kubernetes HA Services and Zookeeper HA Services for providing High-availability for Flink jobs. The HA solution can benefit form using additional Standby replicas, it will result in a faster recovery time, but Flink jobs will still restart when the Leader JobManager goes down."

Essentially, we would deploy an all-spot cluster and use standby replicas in case an instance terminates. So flink-config.yaml would include something like:

      # Enable HA cluster
      "high-availability.type" : "kubernetes",
      "high-availability.storageDir" : "s3://${aws_s3_bucket.flink_store.id}/recovery",
      "kubernetes.cluster-id" :  <cluster_id>,

where <cluster_id> is the id of the job manager (?). I don't think we need to worry about flink-operator because it's on the control plane, which is managed by AWS.

@@ -38,6 +38,8 @@ resource "aws_eks_node_group" "core_nodes" {

instance_types = [var.instance_type]

capacity_type = "SPOT"
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I assume this should be in fact using the variable var.capacity_type?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Fixed.

@@ -1,3 +1,8 @@
resource "aws_s3_bucket" "flink_store" {
bucket = "${var.cluster_name}-flink-store"
force_destroy = true
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I generally suggest leaving this out, and hand-editing it in when you want to acutally destroy it.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Removing this bucket for now as the cluster seemed unable to write to it. Eventually, we'll want to add a shared filesystem to the cluster where flink can cache metadata for restarting failed jobs. https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/ha/kubernetes_ha/

@thodson-usgs thodson-usgs changed the title Switch instance type to spot Enable spot instances Jan 27, 2024
@thodson-usgs
Copy link
Contributor Author

This PR will enable spot instances for testing. Before this is used in production, we'll want to make some additional changes to handle failures (this will serve "on demand" clusters too).

@yuvipanda yuvipanda merged commit 306c5d4 into pangeo-forge:main Jan 27, 2024
@yuvipanda
Copy link
Collaborator

Thanks @thodson-usgs

@thodson-usgs thodson-usgs mentioned this pull request Jan 28, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants