diff --git a/src/api/http.zig b/src/api/http.zig index 0d71eb3b..4d49777d 100644 --- a/src/api/http.zig +++ b/src/api/http.zig @@ -29,6 +29,7 @@ pub const StatusCode = enum(u16) { too_many_requests = 429, request_header_fields_too_large = 431, internal_server_error = 500, + service_unavailable = 503, pub fn phrase(self: StatusCode) []const u8 { return switch (self) { @@ -43,6 +44,7 @@ pub const StatusCode = enum(u16) { .too_many_requests => "Too Many Requests", .request_header_fields_too_large => "Request Header Fields Too Large", .internal_server_error => "Internal Server Error", + .service_unavailable => "Service Unavailable", }; } }; @@ -391,6 +393,7 @@ test "status code phrases" { try std.testing.expectEqualStrings("OK", StatusCode.ok.phrase()); try std.testing.expectEqualStrings("Not Found", StatusCode.not_found.phrase()); try std.testing.expectEqualStrings("Internal Server Error", StatusCode.internal_server_error.phrase()); + try std.testing.expectEqualStrings("Service Unavailable", StatusCode.service_unavailable.phrase()); try std.testing.expectEqualStrings("Method Not Allowed", StatusCode.method_not_allowed.phrase()); try std.testing.expectEqualStrings("Too Many Requests", StatusCode.too_many_requests.phrase()); } diff --git a/src/network/proxy/reverse_proxy.zig b/src/network/proxy/reverse_proxy.zig index 20ec1cf9..fffcf684 100644 --- a/src/network/proxy/reverse_proxy.zig +++ b/src/network/proxy/reverse_proxy.zig @@ -1,5 +1,42 @@ const std = @import("std"); +const http = @import("../../api/http.zig"); +const proxy_runtime = @import("runtime.zig"); const router = @import("router.zig"); +const upstream_mod = @import("upstream.zig"); + +pub const ProxyResponse = struct { + status: http.StatusCode, + body: []const u8, +}; + +pub const ForwardPlan = struct { + method: http.Method, + path: []const u8, + host: []const u8, + outbound_host: []const u8, + route: proxy_runtime.RouteSnapshot, + upstream: upstream_mod.Upstream, + + pub fn deinit(self: ForwardPlan, alloc: std.mem.Allocator) void { + alloc.free(self.path); + alloc.free(self.host); + alloc.free(self.outbound_host); + self.route.deinit(alloc); + self.upstream.deinit(alloc); + } +}; + +pub const HandleResult = union(enum) { + forward: ForwardPlan, + response: ProxyResponse, + + pub fn deinit(self: HandleResult, alloc: std.mem.Allocator) void { + switch (self) { + .forward => |plan| plan.deinit(alloc), + .response => {}, + } + } +}; pub const ReverseProxy = struct { allocator: std.mem.Allocator, @@ -28,8 +65,78 @@ pub const ReverseProxy = struct { pub fn isRunning(self: *const ReverseProxy) bool { return self.running; } + + pub fn handleRequest(self: *const ReverseProxy, raw_request: []const u8) !HandleResult { + const request = http.parseRequest(raw_request) catch { + return .{ .response = .{ + .status = .bad_request, + .body = "{\"error\":\"invalid request\"}", + } }; + } orelse return .{ .response = .{ + .status = .bad_request, + .body = "{\"error\":\"incomplete request\"}", + } }; + + const host_header = http.findHeaderValue(request.headers_raw, "Host") orelse return .{ .response = .{ + .status = .bad_request, + .body = "{\"error\":\"missing host header\"}", + } }; + const host = normalizeHost(host_header); + + const matched_route = router.matchRoute(self.routes, host, request.path_only) orelse return .{ .response = .{ + .status = .not_found, + .body = "{\"error\":\"route not found\"}", + } }; + const route = try cloneRouteSnapshot(self.allocator, matched_route); + errdefer route.deinit(self.allocator); + + const upstream = proxy_runtime.resolveUpstream(self.allocator, route.service) catch |err| switch (err) { + error.NoHealthyUpstream => return .{ .response = .{ + .status = .service_unavailable, + .body = "{\"error\":\"no eligible upstream\"}", + } }, + else => return err, + }; + errdefer upstream.deinit(self.allocator); + + return .{ .forward = .{ + .method = request.method, + .path = try self.allocator.dupe(u8, request.path), + .host = try self.allocator.dupe(u8, host), + .outbound_host = if (route.preserve_host) + try self.allocator.dupe(u8, host) + else + try self.allocator.dupe(u8, route.service), + .route = route, + .upstream = upstream, + } }; + } }; +fn normalizeHost(host_header: []const u8) []const u8 { + if (std.mem.indexOfScalar(u8, host_header, ':')) |port_sep| { + return host_header[0..port_sep]; + } + return host_header; +} + +fn cloneRouteSnapshot(alloc: std.mem.Allocator, route: router.Route) !proxy_runtime.RouteSnapshot { + return .{ + .name = try alloc.dupe(u8, route.name), + .service = try alloc.dupe(u8, route.service), + .vip_address = try alloc.dupe(u8, route.vip_address), + .host = try alloc.dupe(u8, route.match.host orelse ""), + .path_prefix = try alloc.dupe(u8, route.match.path_prefix), + .eligible_endpoints = route.eligible_endpoints, + .healthy_endpoints = route.healthy_endpoints, + .degraded = route.degraded, + .retries = route.retries, + .connect_timeout_ms = route.connect_timeout_ms, + .request_timeout_ms = route.request_timeout_ms, + .preserve_host = route.preserve_host, + }; +} + test "reverse proxy starts and stops" { const alloc = std.testing.allocator; const routes = [_]router.Route{ @@ -68,3 +175,205 @@ test "reverse proxy retains configured routes" { try std.testing.expectEqual(@as(usize, 1), proxy.routes.len); try std.testing.expectEqualStrings("api-v1", proxy.routes[0].name); } + +test "handleRequest returns forward plan for a routable request" { + const store = @import("../../state/store.zig"); + const service_rollout = @import("../service_rollout.zig"); + const service_registry_runtime = @import("../service_registry_runtime.zig"); + + try store.initTestDb(); + defer store.deinitTestDb(); + service_registry_runtime.resetForTest(); + defer service_registry_runtime.resetForTest(); + proxy_runtime.resetForTest(); + defer proxy_runtime.resetForTest(); + service_rollout.setForTest(.{ + .service_registry_v2 = true, + .l7_proxy_http = true, + }); + defer service_rollout.resetForTest(); + + try store.createService(.{ + .service_name = "api", + .vip_address = "10.43.0.2", + .lb_policy = "consistent_hash", + .http_proxy_host = "api.internal", + .http_proxy_path_prefix = "/v1", + .http_proxy_preserve_host = false, + .created_at = 1000, + .updated_at = 1000, + }); + try store.upsertServiceEndpoint(.{ + .service_name = "api", + .endpoint_id = "api-1", + .container_id = "ctr-1", + .node_id = null, + .ip_address = "10.42.0.9", + .port = 8080, + .weight = 1, + .admin_state = "active", + .generation = 1, + .registered_at = 1000, + .last_seen_at = 1000, + }); + proxy_runtime.bootstrapIfEnabled(); + + const routes = [_]router.Route{ + .{ + .name = "api:/v1", + .service = "api", + .vip_address = "10.43.0.2", + .match = .{ .host = "api.internal", .path_prefix = "/v1" }, + .eligible_endpoints = 1, + }, + }; + + var proxy = ReverseProxy.init(std.testing.allocator, &routes); + defer proxy.deinit(); + + const result = try proxy.handleRequest( + "GET /v1/users HTTP/1.1\r\nHost: api.internal\r\n\r\n", + ); + defer result.deinit(std.testing.allocator); + + switch (result) { + .forward => |plan| { + try std.testing.expectEqual(http.Method.GET, plan.method); + try std.testing.expectEqualStrings("/v1/users", plan.path); + try std.testing.expectEqualStrings("api", plan.route.service); + try std.testing.expectEqualStrings("10.42.0.9", plan.upstream.address); + try std.testing.expectEqualStrings("api", plan.outbound_host); + }, + .response => return error.TestUnexpectedResult, + } +} + +test "handleRequest returns bad request when Host is missing" { + const routes = [_]router.Route{ + .{ + .name = "api:/", + .service = "api", + .vip_address = "10.43.0.2", + .match = .{ .host = "api.internal", .path_prefix = "/" }, + }, + }; + + var proxy = ReverseProxy.init(std.testing.allocator, &routes); + defer proxy.deinit(); + + const result = try proxy.handleRequest( + "GET / HTTP/1.1\r\nUser-Agent: test\r\n\r\n", + ); + defer result.deinit(std.testing.allocator); + + switch (result) { + .response => |resp| try std.testing.expectEqual(http.StatusCode.bad_request, resp.status), + .forward => return error.TestUnexpectedResult, + } +} + +test "handleRequest returns not found when no route matches" { + const store = @import("../../state/store.zig"); + const service_rollout = @import("../service_rollout.zig"); + const service_registry_runtime = @import("../service_registry_runtime.zig"); + + try store.initTestDb(); + defer store.deinitTestDb(); + service_registry_runtime.resetForTest(); + defer service_registry_runtime.resetForTest(); + proxy_runtime.resetForTest(); + defer proxy_runtime.resetForTest(); + service_rollout.setForTest(.{ + .service_registry_v2 = true, + .l7_proxy_http = true, + }); + defer service_rollout.resetForTest(); + + try store.createService(.{ + .service_name = "api", + .vip_address = "10.43.0.2", + .lb_policy = "consistent_hash", + .http_proxy_host = "api.internal", + .http_proxy_path_prefix = "/v1", + .created_at = 1000, + .updated_at = 1000, + }); + proxy_runtime.bootstrapIfEnabled(); + + const routes = [_]router.Route{}; + var proxy = ReverseProxy.init(std.testing.allocator, &routes); + defer proxy.deinit(); + + const result = try proxy.handleRequest( + "GET /missing HTTP/1.1\r\nHost: unknown.internal\r\n\r\n", + ); + defer result.deinit(std.testing.allocator); + + switch (result) { + .response => |resp| try std.testing.expectEqual(http.StatusCode.not_found, resp.status), + .forward => return error.TestUnexpectedResult, + } +} + +test "handleRequest returns service unavailable when route has no eligible upstream" { + const store = @import("../../state/store.zig"); + const service_rollout = @import("../service_rollout.zig"); + const service_registry_runtime = @import("../service_registry_runtime.zig"); + + try store.initTestDb(); + defer store.deinitTestDb(); + service_registry_runtime.resetForTest(); + defer service_registry_runtime.resetForTest(); + proxy_runtime.resetForTest(); + defer proxy_runtime.resetForTest(); + service_rollout.setForTest(.{ + .service_registry_v2 = true, + .l7_proxy_http = true, + }); + defer service_rollout.resetForTest(); + + try store.createService(.{ + .service_name = "api", + .vip_address = "10.43.0.2", + .lb_policy = "consistent_hash", + .http_proxy_host = "api.internal", + .http_proxy_path_prefix = "/v1", + .created_at = 1000, + .updated_at = 1000, + }); + try store.upsertServiceEndpoint(.{ + .service_name = "api", + .endpoint_id = "api-1", + .container_id = "ctr-1", + .node_id = null, + .ip_address = "10.42.0.9", + .port = 8080, + .weight = 1, + .admin_state = "draining", + .generation = 1, + .registered_at = 1000, + .last_seen_at = 1000, + }); + proxy_runtime.bootstrapIfEnabled(); + + const routes = [_]router.Route{ + .{ + .name = "api:/v1", + .service = "api", + .vip_address = "10.43.0.2", + .match = .{ .host = "api.internal", .path_prefix = "/v1" }, + }, + }; + var proxy = ReverseProxy.init(std.testing.allocator, &routes); + defer proxy.deinit(); + + const result = try proxy.handleRequest( + "GET /v1/users HTTP/1.1\r\nHost: api.internal\r\n\r\n", + ); + defer result.deinit(std.testing.allocator); + + switch (result) { + .response => |resp| try std.testing.expectEqual(http.StatusCode.service_unavailable, resp.status), + .forward => return error.TestUnexpectedResult, + } +} diff --git a/src/network/proxy/runtime.zig b/src/network/proxy/runtime.zig index 9b00dd3d..cff6aec4 100644 --- a/src/network/proxy/runtime.zig +++ b/src/network/proxy/runtime.zig @@ -1,6 +1,7 @@ const std = @import("std"); const log = @import("../../lib/log.zig"); const router = @import("router.zig"); +const upstream_mod = @import("upstream.zig"); const service_registry_runtime = @import("../service_registry_runtime.zig"); const service_rollout = @import("../service_rollout.zig"); @@ -133,6 +134,48 @@ pub fn snapshotServiceRoutes(alloc: std.mem.Allocator, service_name: []const u8) return routes_snapshot; } +pub fn resolveRoute(alloc: std.mem.Allocator, host: []const u8, path: []const u8) !RouteSnapshot { + mutex.lock(); + defer mutex.unlock(); + + const matched = router.matchRoute(materialized_routes.items, host, path) orelse return error.RouteNotFound; + return cloneRouteSnapshot(alloc, matched); +} + +pub fn resolveUpstream(alloc: std.mem.Allocator, service_name: []const u8) !upstream_mod.Upstream { + var endpoints = try service_registry_runtime.snapshotServiceEndpoints(alloc, service_name); + defer { + for (endpoints.items) |endpoint| endpoint.deinit(alloc); + endpoints.deinit(alloc); + } + + var candidates: std.ArrayList(upstream_mod.Upstream) = .empty; + defer { + for (candidates.items) |candidate| candidate.deinit(alloc); + candidates.deinit(alloc); + } + + for (endpoints.items) |endpoint| { + const port: u16 = if (endpoint.port < 0) 0 else @intCast(endpoint.port); + try candidates.append(alloc, .{ + .service = try alloc.dupe(u8, service_name), + .endpoint_id = try alloc.dupe(u8, endpoint.endpoint_id), + .address = try alloc.dupe(u8, endpoint.ip_address), + .port = port, + .eligible = endpoint.eligible, + }); + } + + const selected = upstream_mod.selectFirstEligible(candidates.items) orelse return error.NoHealthyUpstream; + return .{ + .service = try alloc.dupe(u8, selected.service), + .endpoint_id = try alloc.dupe(u8, selected.endpoint_id), + .address = try alloc.dupe(u8, selected.address), + .port = selected.port, + .eligible = selected.eligible, + }; +} + fn cloneRouteSnapshot(alloc: std.mem.Allocator, route: router.Route) !RouteSnapshot { return .{ .name = try alloc.dupe(u8, route.name), @@ -420,3 +463,96 @@ test "materialized routes include service endpoint readiness counts" { try std.testing.expectEqual(@as(u32, 0), routes_snapshot.items[0].healthy_endpoints); try std.testing.expect(!routes_snapshot.items[0].degraded); } + +test "resolveRoute matches by host and path" { + const store = @import("../../state/store.zig"); + + try store.initTestDb(); + defer store.deinitTestDb(); + service_registry_runtime.resetForTest(); + defer service_registry_runtime.resetForTest(); + service_rollout.setForTest(.{ + .service_registry_v2 = true, + .l7_proxy_http = true, + }); + defer service_rollout.resetForTest(); + resetForTest(); + defer resetForTest(); + + try store.createService(.{ + .service_name = "api", + .vip_address = "10.43.0.2", + .lb_policy = "consistent_hash", + .http_proxy_host = "api.internal", + .http_proxy_path_prefix = "/v1", + .created_at = 1000, + .updated_at = 1000, + }); + + bootstrapIfEnabled(); + + const route = try resolveRoute(std.testing.allocator, "api.internal", "/v1/users"); + defer route.deinit(std.testing.allocator); + + try std.testing.expectEqualStrings("api", route.service); + try std.testing.expectEqualStrings("/v1", route.path_prefix); +} + +test "resolveUpstream returns the first eligible endpoint" { + const store = @import("../../state/store.zig"); + + try store.initTestDb(); + defer store.deinitTestDb(); + service_registry_runtime.resetForTest(); + defer service_registry_runtime.resetForTest(); + service_rollout.setForTest(.{ + .service_registry_v2 = true, + .l7_proxy_http = true, + }); + defer service_rollout.resetForTest(); + resetForTest(); + defer resetForTest(); + + try store.createService(.{ + .service_name = "api", + .vip_address = "10.43.0.2", + .lb_policy = "consistent_hash", + .http_proxy_host = "api.internal", + .http_proxy_path_prefix = "/v1", + .created_at = 1000, + .updated_at = 1000, + }); + try store.upsertServiceEndpoint(.{ + .service_name = "api", + .endpoint_id = "api-1", + .container_id = "ctr-1", + .node_id = null, + .ip_address = "10.42.0.9", + .port = 8080, + .weight = 1, + .admin_state = "draining", + .generation = 1, + .registered_at = 1000, + .last_seen_at = 1000, + }); + try store.upsertServiceEndpoint(.{ + .service_name = "api", + .endpoint_id = "api-2", + .container_id = "ctr-2", + .node_id = null, + .ip_address = "10.42.0.10", + .port = 8081, + .weight = 1, + .admin_state = "active", + .generation = 1, + .registered_at = 1001, + .last_seen_at = 1001, + }); + + const upstream = try resolveUpstream(std.testing.allocator, "api"); + defer upstream.deinit(std.testing.allocator); + + try std.testing.expectEqualStrings("api-2", upstream.endpoint_id); + try std.testing.expectEqualStrings("10.42.0.10", upstream.address); + try std.testing.expectEqual(@as(u16, 8081), upstream.port); +} diff --git a/src/network/proxy/upstream.zig b/src/network/proxy/upstream.zig index c5c36f5c..ac396925 100644 --- a/src/network/proxy/upstream.zig +++ b/src/network/proxy/upstream.zig @@ -2,9 +2,16 @@ const std = @import("std"); pub const Upstream = struct { service: []const u8, + endpoint_id: []const u8, address: []const u8, port: u16, eligible: bool = true, + + pub fn deinit(self: Upstream, alloc: std.mem.Allocator) void { + alloc.free(self.service); + alloc.free(self.endpoint_id); + alloc.free(self.address); + } }; pub fn selectFirstEligible(upstreams: []const Upstream) ?Upstream { @@ -16,8 +23,8 @@ pub fn selectFirstEligible(upstreams: []const Upstream) ?Upstream { test "selectFirstEligible returns first eligible upstream" { const upstreams = [_]Upstream{ - .{ .service = "api", .address = "10.0.0.2", .port = 8080, .eligible = false }, - .{ .service = "api", .address = "10.0.0.3", .port = 8080, .eligible = true }, + .{ .service = "api", .endpoint_id = "api-1", .address = "10.0.0.2", .port = 8080, .eligible = false }, + .{ .service = "api", .endpoint_id = "api-2", .address = "10.0.0.3", .port = 8080, .eligible = true }, }; const selected = selectFirstEligible(&upstreams) orelse return error.TestExpectedNonNull; @@ -26,8 +33,8 @@ test "selectFirstEligible returns first eligible upstream" { test "selectFirstEligible returns null when all upstreams are ineligible" { const upstreams = [_]Upstream{ - .{ .service = "api", .address = "10.0.0.2", .port = 8080, .eligible = false }, - .{ .service = "api", .address = "10.0.0.3", .port = 8080, .eligible = false }, + .{ .service = "api", .endpoint_id = "api-1", .address = "10.0.0.2", .port = 8080, .eligible = false }, + .{ .service = "api", .endpoint_id = "api-2", .address = "10.0.0.3", .port = 8080, .eligible = false }, }; try std.testing.expect(selectFirstEligible(&upstreams) == null);