Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 58 additions & 2 deletions zebra/lib/zebra/workers/agent/hosted_agent.ex
Original file line number Diff line number Diff line change
Expand Up @@ -62,13 +62,15 @@ defmodule Zebra.Workers.Agent.HostedAgent do
def occupy(job) do
Task.async(fn ->
Watchman.benchmark("zebra.external.chmura.occupy", fn ->
{machine_type, machine_os_image} = translate_machine(job)

request =
OccupyAgentRequest.new(
request_id: job.id,
machine:
InternalApi.Chmura.Agent.Machine.new(
type: job.machine_type,
os_image: job.machine_os_image
type: machine_type,
os_image: machine_os_image
)
)

Expand Down Expand Up @@ -122,4 +124,58 @@ defmodule Zebra.Workers.Agent.HostedAgent do
{:ok, channel} = GRPC.Stub.connect(endpoint)
channel
end

def translate_machine(job) do
original_type = job.machine_type || ""
original_os_image = job.machine_os_image || ""

cond do
not e1_family?(original_type) ->
{original_type, original_os_image}

not migration_enabled?(job.organization_id) ->
{original_type, original_os_image}

true ->
{new_type, new_os_image} = map_machine_type(original_type, original_os_image)

Watchman.increment(
{"zebra.occupy.translation", [original_type, new_type, job.organization_id]}
)

{new_type, new_os_image}
end
end

@spec map_machine_type(String.t(), String.t()) :: {String.t(), String.t()}
defp map_machine_type(original_type, original_os_image) do
original_type
|> case do
"e1-standard-2" ->
{"f1-standard-2", original_os_image}

"e1-standard-4" ->
{"f1-standard-2", original_os_image}

"e1-standard-8" ->
{"f1-standard-4", original_os_image}

_ ->
{original_type, original_os_image}
end
end

@spec migration_enabled?(nil | String.t()) :: boolean()
defp migration_enabled?(nil), do: false

defp migration_enabled?(org_id) do
FeatureProvider.feature_enabled?("e1_to_f1_migration", param: org_id)
end

@spec e1_family?(String.t()) :: boolean()
defp e1_family?(machine_type) when is_binary(machine_type) do
String.starts_with?(machine_type, "e1-")
end

defp e1_family?(_), do: false
end
10 changes: 6 additions & 4 deletions zebra/lib/zebra/workers/scheduler/selector.ex
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,9 @@ defmodule Zebra.Workers.Scheduler.Selector do

defp select(jobs = [job | rest], org, state, result) do
alias __MODULE__.{State, Result}
{machine_type, _} = Zebra.Workers.Agent.HostedAgent.translate_machine(job)

running_jobs_for_machine_type = State.running_jobs(state, job.machine_type)
running_jobs_for_machine_type = State.running_jobs(state, machine_type)
machine_quota = get_machine_quota_for_job(job, org.id)
org_id = org.id

Expand Down Expand Up @@ -94,7 +95,7 @@ defmodule Zebra.Workers.Scheduler.Selector do
# can be scheduled, however we continue in order to look up all the jobs
# that need to be force finished.
#
result = Result.add_no_capacity(result, job.machine_type)
result = Result.add_no_capacity(result, machine_type)
select(rest, org, state, result)

running_jobs_for_machine_type < machine_quota ->
Expand All @@ -106,7 +107,7 @@ defmodule Zebra.Workers.Scheduler.Selector do

running_jobs_for_machine_type >= machine_quota ->
# Can't run this job, continue selection on the rest of the jobs.
result = Result.add_no_capacity(result, job.machine_type)
result = Result.add_no_capacity(result, machine_type)
select(rest, org, state, result)

true ->
Expand All @@ -122,7 +123,8 @@ defmodule Zebra.Workers.Scheduler.Selector do
0
end
else
with {:ok, machine} <- FeatureProvider.find_machine(job.machine_type, param: org_id),
with {machine_type, _} <- Zebra.Workers.Agent.HostedAgent.translate_machine(job),
{:ok, machine} <- FeatureProvider.find_machine(machine_type, param: org_id),
true <- FeatureProvider.Machine.enabled?(machine),
true <- Enum.member?(machine.available_os_images, job.machine_os_image),
quota <- FeatureProvider.Machine.quota(machine) do
Expand Down
10 changes: 9 additions & 1 deletion zebra/test/support/stubbed_provider.ex
Original file line number Diff line number Diff line change
@@ -1,17 +1,22 @@
defmodule Support.StubbedProvider do
use FeatureProvider.Provider

@e1_to_f1_org_id "org-e1-to-f1-enabled"

@impl FeatureProvider.Provider
def provide_features(org_id \\ nil, _opts \\ []) do
{:ok,
[
feature("max_paralellism_in_org", [:enabled, {:quantity, 500}]),
feature("cache_cli_parallel_archive_method", [:hidden]),
feature("some_custom_feature", [:hidden]),
max_job_time_limit_feature(org_id)
max_job_time_limit_feature(org_id),
feature("e1_to_f1_migration", e1_to_f1_traits(org_id))
]}
end

def e1_to_f1_org_id, do: @e1_to_f1_org_id

defp max_job_time_limit_feature("enabled_30") do
feature("max_job_execution_time_limit", [:enabled, {:quantity, 30}])
end
Expand All @@ -24,6 +29,9 @@ defmodule Support.StubbedProvider do
feature("max_job_execution_time_limit", [:hidden])
end

defp e1_to_f1_traits(@e1_to_f1_org_id), do: [:enabled]
defp e1_to_f1_traits(_org_id), do: [:hidden]

@impl FeatureProvider.Provider
def provide_machines(_org_id \\ nil, _opts \\ []) do
{:ok,
Expand Down
117 changes: 117 additions & 0 deletions zebra/test/zebra/workers/agent/hosted_agent.exs
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,123 @@ defmodule Zebra.Workers.Agent.HostedAgentTest do
assert Agent.occupy(job) == {:error, error.message}
end
end

test "uses original machine values when migration flag is disabled" do
{:ok, job} =
Support.Factories.Job.create(:scheduled, %{
machine_type: "e1-standard-2",
machine_os_image: "ubuntu2004"
})

response = %InternalApi.Chmura.OccupyAgentResponse{
agent: %InternalApi.Chmura.Agent{id: @agent_id}
}

with_mock Stub,
occupy_agent: fn _, request, _ ->
assert request.machine.type == "e1-standard-2"
assert request.machine.os_image == "ubuntu2004"
{:ok, response}
end do
assert Agent.occupy(job) == {:ok, Agent.construct_agent(response)}
end
end

test "remaps e1-standard-2 machines to f1-standard-2 when migration flag is enabled" do
org_id = Support.StubbedProvider.e1_to_f1_org_id()

{:ok, job} =
Support.Factories.Job.create(:scheduled, %{
organization_id: org_id,
machine_type: "e1-standard-2",
machine_os_image: "ubuntu2004"
})

response = %InternalApi.Chmura.OccupyAgentResponse{
agent: %InternalApi.Chmura.Agent{id: @agent_id}
}

with_mock Stub,
occupy_agent: fn _, request, _ ->
assert request.machine.type == "f1-standard-2"
assert request.machine.os_image == "ubuntu2004"
{:ok, response}
end do
assert Agent.occupy(job) == {:ok, Agent.construct_agent(response)}
end
end

test "remaps e1-standard-4 machines to f1-standard-2 when migration flag is enabled" do
org_id = Support.StubbedProvider.e1_to_f1_org_id()

{:ok, job} =
Support.Factories.Job.create(:scheduled, %{
organization_id: org_id,
machine_type: "e1-standard-4",
machine_os_image: "ubuntu2004"
})

response = %InternalApi.Chmura.OccupyAgentResponse{
agent: %InternalApi.Chmura.Agent{id: @agent_id}
}

with_mock Stub,
occupy_agent: fn _, request, _ ->
assert request.machine.type == "f1-standard-2"
assert request.machine.os_image == "ubuntu2004"
{:ok, response}
end do
assert Agent.occupy(job) == {:ok, Agent.construct_agent(response)}
end
end

test "remaps e1-standard-8 machines to f1-standard-4 when migration flag is enabled" do
org_id = Support.StubbedProvider.e1_to_f1_org_id()

{:ok, job} =
Support.Factories.Job.create(:scheduled, %{
organization_id: org_id,
machine_type: "e1-standard-8",
machine_os_image: "ubuntu2004"
})

response = %InternalApi.Chmura.OccupyAgentResponse{
agent: %InternalApi.Chmura.Agent{id: @agent_id}
}

with_mock Stub,
occupy_agent: fn _, request, _ ->
assert request.machine.type == "f1-standard-4"
assert request.machine.os_image == "ubuntu2004"
{:ok, response}
end do
assert Agent.occupy(job) == {:ok, Agent.construct_agent(response)}
end
end

test "does not remap different machine types when migration flag is enabled" do
org_id = Support.StubbedProvider.e1_to_f1_org_id()

{:ok, job} =
Support.Factories.Job.create(:scheduled, %{
organization_id: org_id,
machine_type: "e2-standard-2",
machine_os_image: "ubuntu2004"
})

response = %InternalApi.Chmura.OccupyAgentResponse{
agent: %InternalApi.Chmura.Agent{id: @agent_id}
}

with_mock Stub,
occupy_agent: fn _, request, _ ->
assert request.machine.type == "e2-standard-2"
assert request.machine.os_image == "ubuntu2004"
{:ok, response}
end do
assert Agent.occupy(job) == {:ok, Agent.construct_agent(response)}
end
end
end

describe ".release" do
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ defmodule Zebra.Workers.FeatureProviderInvalidatorWorkerTest do
Worker.features_changed(callback_message)

{:ok, features} = FeatureProvider.list_features()
assert length(features) == 4
assert length(features) == 5
end

test "when the organization feature state changes, organization feature caches are invalidated" do
Expand Down