diff --git a/Makefile b/Makefile index 694ddc59..349e9ea7 100644 --- a/Makefile +++ b/Makefile @@ -103,8 +103,8 @@ help: @printf " $(BOLD)$(GREEN)supv:api:logs-err$(NC): Show the the API service supervisor error logs.\n\n" @printf "$(BOLD)$(BLUE)Caddy Commands:$(NC)\n" - @printf " $(BOLD)$(GREEN)caddy-gen-cert$(NC) : Generate the caddy's mtls certificates.\n" - @printf " $(BOLD)$(GREEN)caddy-del-cert$(NC) : Remove the caddy's mtls certificates.\n" + @printf " $(BOLD)$(GREEN)caddy-gen-certs$(NC) : Generate the caddy's mtls certificates.\n" + @printf " $(BOLD)$(GREEN)caddy-del-certs$(NC) : Remove the caddy's mtls certificates.\n" @printf " $(BOLD)$(GREEN)caddy-validate$(NC) : Validates caddy's files syntax.\n\n" @printf "$(BOLD)$(BLUE)Monitoring Commands:$(NC)\n" diff --git a/database/seeder/importer/runner.go b/database/seeder/importer/runner.go index 3ddb2ad6..762fb69b 100644 --- a/database/seeder/importer/runner.go +++ b/database/seeder/importer/runner.go @@ -643,10 +643,38 @@ func executeCopy(ctx context.Context, tx *sql.Tx, stmt statement) error { insertSQL := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", table, strings.Join(columns, ", "), strings.Join(placeholder, ", ")) - for _, row := range rows { + for idx, row := range rows { + rowNumber := idx + 1 + rowSavepoint := fmt.Sprintf("importer_copy_row_%d", rowNumber) + if _, err := tx.ExecContext(ctx, "SAVEPOINT "+rowSavepoint); err != nil { + return fmt.Errorf("importer: begin savepoint for COPY row %d of %s failed: %w", rowNumber, table, err) + } + if _, err := tx.ExecContext(ctx, insertSQL, row...); err != nil { + if skip, reason := shouldSkipCopyInsertError(table, err); skip { + if rbErr := rollbackToSavepoint(ctx, tx, rowSavepoint); rbErr != nil { + return errors.Join(fmt.Errorf("importer: skipped COPY row for %s: %s", table, reason), rbErr) + } + + fmt.Fprintf(os.Stderr, "importer: skipped COPY row for %s: %s\n", table, reason) + + if _, err := tx.ExecContext(ctx, "RELEASE SAVEPOINT "+rowSavepoint); err != nil { + return fmt.Errorf("importer: release copy savepoint %s failed: %w", rowSavepoint, err) + } + + continue + } + + if rbErr := rollbackToSavepoint(ctx, tx, rowSavepoint); rbErr != nil { + return errors.Join(fmt.Errorf("importer: insert from COPY failed: %w", err), rbErr) + } + return fmt.Errorf("importer: insert from COPY failed: %w", err) } + + if _, err := tx.ExecContext(ctx, "RELEASE SAVEPOINT "+rowSavepoint); err != nil { + return fmt.Errorf("importer: release copy savepoint %s failed: %w", rowSavepoint, err) + } } return nil @@ -683,10 +711,8 @@ func shouldSkipExecError(stmt statement, err error) (bool, string) { case "23505": if strings.HasPrefix(upper, "INSERT INTO ") { target, _ := statementTarget(stmt.sql) - normalized := normalizeQualifiedIdentifier(target) - switch normalized { - case "schema_migrations", "public.schema_migrations": - return true, fmt.Sprintf("duplicate migration row skipped (%s)", message) + if skip, reason := shouldSkipDuplicateSchemaMigrationInsert(target, message); skip { + return true, reason } } case "42704": @@ -702,6 +728,29 @@ func shouldSkipExecError(stmt statement, err error) (bool, string) { return false, "" } +func shouldSkipCopyInsertError(table string, err error) (bool, string) { + code, message := sqlStateFromError(err) + if code == "" { + return false, "" + } + + if code != "23505" { + return false, "" + } + + return shouldSkipDuplicateSchemaMigrationInsert(table, message) +} + +func shouldSkipDuplicateSchemaMigrationInsert(target, message string) (bool, string) { + normalized := normalizeQualifiedIdentifier(target) + switch normalized { + case "schema_migrations", "public.schema_migrations": + return true, fmt.Sprintf("duplicate migration row skipped (%s)", message) + } + + return false, "" +} + func shouldSkipStatement(stmt statement, skipTables map[string]struct{}) (bool, string) { if len(skipTables) == 0 { return false, "" diff --git a/database/seeder/importer/runner_sql_test.go b/database/seeder/importer/runner_sql_test.go index bdf180bf..0e7f24bf 100644 --- a/database/seeder/importer/runner_sql_test.go +++ b/database/seeder/importer/runner_sql_test.go @@ -109,9 +109,13 @@ func TestExecuteCopyInsertsRows(t *testing.T) { defer sqlDB.Close() mock.ExpectBegin() + mock.ExpectExec("SAVEPOINT importer_copy_row_1").WillReturnResult(sqlmock.NewResult(0, 0)) insert := regexp.QuoteMeta("INSERT INTO public.widgets (id, name) VALUES ($1, $2)") mock.ExpectExec(insert).WithArgs("1", "foo").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("RELEASE SAVEPOINT importer_copy_row_1").WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectExec("SAVEPOINT importer_copy_row_2").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectExec(insert).WithArgs("2", "bar").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("RELEASE SAVEPOINT importer_copy_row_2").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectRollback() tx, err := sqlDB.Begin() @@ -150,7 +154,9 @@ func TestExecuteCopyResolvesColumns(t *testing.T) { WithArgs("public", "widgets"). WillReturnRows(sqlmock.NewRows([]string{"column_name"}).AddRow("id").AddRow("name")) insert := regexp.QuoteMeta("INSERT INTO public.widgets (id, name) VALUES ($1, $2)") + mock.ExpectExec("SAVEPOINT importer_copy_row_1").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectExec(insert).WithArgs("1", "foo").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("RELEASE SAVEPOINT importer_copy_row_1").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectRollback() tx, err := sqlDB.Begin() @@ -187,9 +193,11 @@ func TestExecuteCopyHandlesLargeRows(t *testing.T) { mock.ExpectBegin() largePayload := strings.Repeat("a", 70_000) insert := regexp.QuoteMeta("INSERT INTO public.widgets (id, payload) VALUES ($1, $2)") + mock.ExpectExec("SAVEPOINT importer_copy_row_1").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectExec(insert). WithArgs("1", largePayload). WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("RELEASE SAVEPOINT importer_copy_row_1").WillReturnResult(sqlmock.NewResult(0, 0)) mock.ExpectRollback() tx, err := sqlDB.Begin() @@ -216,6 +224,53 @@ func TestExecuteCopyHandlesLargeRows(t *testing.T) { } } +func TestExecuteCopySkipsDuplicateCopyRows(t *testing.T) { + sqlDB, mock, err := sqlmock.New() + if err != nil { + t.Fatalf("sqlmock: %v", err) + } + defer sqlDB.Close() + + mock.ExpectBegin() + insert := regexp.QuoteMeta("INSERT INTO public.schema_migrations (version, dirty) VALUES ($1, $2)") + mock.ExpectExec("SAVEPOINT importer_copy_row_1").WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectExec(insert).WithArgs("41", "t").WillReturnResult(sqlmock.NewResult(1, 1)) + mock.ExpectExec("RELEASE SAVEPOINT importer_copy_row_1").WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectExec("SAVEPOINT importer_copy_row_2").WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectExec(insert). + WithArgs("41", "t"). + WillReturnError(errors.New(`ERROR: duplicate key value violates unique constraint "schema_migrations_pkey" (SQLSTATE 23505)`)) + mock.ExpectExec("ROLLBACK TO SAVEPOINT importer_copy_row_2").WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectExec("RELEASE SAVEPOINT importer_copy_row_2").WillReturnResult(sqlmock.NewResult(0, 0)) + mock.ExpectRollback() + + tx, err := sqlDB.Begin() + if err != nil { + t.Fatalf("begin: %v", err) + } + + stmt := statement{ + sql: "COPY public.schema_migrations (version, dirty) FROM stdin", + copyData: []byte(strings.Join([]string{ + "41\tt", + "41\tt", + }, "\n")), + isCopy: true, + } + + if err := executeCopy(context.Background(), tx, stmt); err != nil { + t.Fatalf("executeCopy unexpected error: %v", err) + } + + if err := tx.Rollback(); err != nil { + t.Fatalf("rollback: %v", err) + } + + if err := mock.ExpectationsWereMet(); err != nil { + t.Fatalf("unmet expectations: %v", err) + } +} + func TestResolveCopyColumnsErrorsOnEmptyMetadata(t *testing.T) { sqlDB, mock, err := sqlmock.New() if err != nil { diff --git a/database/seeder/importer/runner_test.go b/database/seeder/importer/runner_test.go index c91286d0..2b25cdc4 100644 --- a/database/seeder/importer/runner_test.go +++ b/database/seeder/importer/runner_test.go @@ -406,6 +406,40 @@ func TestSeedFromFileSkipsDuplicateSchemaMigrationsInserts(t *testing.T) { } } +func TestSeedFromFileSkipsDuplicateSchemaMigrationsCopy(t *testing.T) { + conn, environment, cleanup := setupPostgresConnection(t) + t.Cleanup(cleanup) + + contents := strings.Join([]string{ + "CREATE TABLE IF NOT EXISTS public.schema_migrations (version BIGINT PRIMARY KEY, dirty BOOLEAN NOT NULL);", + "COPY public.schema_migrations (version, dirty) FROM stdin;", + "42\tf", + "42\tf", + "\\.", + "", + }, "\n") + + fileName := writeStorageFile(t, withSuffix(t, ".sql"), contents) + + if err := importer.SeedFromFile(conn, environment, fileName); err != nil { + t.Fatalf("seed from file: %v", err) + } + + // Run the seed a second time to ensure COPY duplicate handling stays idempotent. + if err := importer.SeedFromFile(conn, environment, fileName); err != nil { + t.Fatalf("second seed from file: %v", err) + } + + var migrationCount int64 + if err := conn.Sql().Table("schema_migrations").Where("version = ?", 42).Count(&migrationCount).Error; err != nil { + t.Fatalf("count schema_migrations: %v", err) + } + + if migrationCount != 1 { + t.Fatalf("expected 1 schema_migrations row for version 42, got %d", migrationCount) + } +} + func TestSeedFromFileSkipsDuplicateCreates(t *testing.T) { conn, environment, cleanup := setupPostgresConnection(t) t.Cleanup(cleanup) diff --git a/docker-compose.yml b/docker-compose.yml index 16048754..4406edf4 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -88,7 +88,6 @@ services: - api ports: - "18080:80" - - "8443:443" - "127.0.0.1:2019:2019" # Admin API - localhost only for debugging # --- Dedicated /metrics endpoint for Prometheus (internal network only) @@ -396,6 +395,8 @@ services: api-db: condition: service_healthy expose: + # local: use Caddy on port 18080 (handles /api prefix like production) + # prod : Caddy handles all traffic (no direct API access) - ${ENV_HTTP_PORT} networks: oullin_net: {} diff --git a/go.mod b/go.mod index 837d0b83..dc69d057 100644 --- a/go.mod +++ b/go.mod @@ -14,7 +14,6 @@ require ( github.com/klauspost/compress v1.18.0 github.com/lib/pq v1.10.9 github.com/prometheus/client_golang v1.20.5 - github.com/rs/cors v1.11.1 github.com/testcontainers/testcontainers-go v0.39.0 github.com/testcontainers/testcontainers-go/modules/postgres v0.39.0 golang.org/x/crypto v0.43.0 diff --git a/go.sum b/go.sum index f2d5a7aa..2fe990b9 100644 --- a/go.sum +++ b/go.sum @@ -150,8 +150,6 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk= github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ= github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc= -github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA= -github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU= github.com/shirou/gopsutil/v4 v4.25.9 h1:JImNpf6gCVhKgZhtaAHJ0serfFGtlfIlSC08eaKdTrU= github.com/shirou/gopsutil/v4 v4.25.9/go.mod h1:gxIxoC+7nQRwUl/xNhutXlD8lq+jxTgpIkEf3rADHL8= github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ= diff --git a/infra/caddy/Caddyfile.local b/infra/caddy/Caddyfile.local index 9b2b70df..01489e4f 100644 --- a/infra/caddy/Caddyfile.local +++ b/infra/caddy/Caddyfile.local @@ -44,16 +44,21 @@ respond 204 } - # Block protected paths - @protected path /metrics /generate-signature* - handle @protected { - respond 403 + # API handler - strips /api prefix before forwarding + handle_path /api/* { + # Block protected paths (after /api prefix is stripped) + @protected path /metrics + handle @protected { + respond 403 + } + + reverse_proxy api:8080 } - # Reverse proxy all incoming requests to the 'api' service. - # - The service name 'api' is resolved by Docker's internal DNS to the correct container IP on the 'caddy_net' network. - # - The API container listens on port 8080 (from the ENV_HTTP_PORT). - reverse_proxy api:8080 + # Fallback handler for non-API routes + handle { + respond 404 + } } # INTERNAL metrics endpoint for Prometheus scraping diff --git a/pkg/endpoint/server.go b/pkg/endpoint/server.go index 1e0cf010..c7630cc4 100644 --- a/pkg/endpoint/server.go +++ b/pkg/endpoint/server.go @@ -8,11 +8,8 @@ import ( "net/http" "os" "os/signal" - "strings" "syscall" "time" - - "github.com/rs/cors" ) // RunServer starts the provided HTTP server, listens for shutdown signals, and @@ -84,9 +81,8 @@ type ServerHandlerConfig struct { } // NewServerHandler constructs the HTTP handler using the provided configuration. -// In development environments it applies permissive CORS settings so the -// client app can communicate with the API, and it optionally wraps the handler -// with Sentry instrumentation when supplied. +// CORS is handled by Caddy reverse proxy to avoid duplicate headers. +// The handler is optionally wrapped with Sentry instrumentation when supplied. func NewServerHandler(cfg ServerHandlerConfig) http.Handler { if cfg.Mux == nil { return http.NotFoundHandler() @@ -94,43 +90,6 @@ func NewServerHandler(cfg ServerHandlerConfig) http.Handler { handler := cfg.Mux - if !cfg.IsProduction { - headers := []string{ - "Accept", - "Authorization", - "Content-Type", - "X-CSRF-Token", - "User-Agent", - "X-API-Key", - "X-API-Username", - "X-API-Signature", - "X-API-Timestamp", - "X-API-Nonce", - "X-Request-ID", - "If-None-Match", - "X-API-Intended-Origin", - } - - origins := []string{"http://localhost:5173"} - if host := cfg.DevHost; host != "" { - if !strings.Contains(host, "://") { - host = "http://" + host - } - - origins = append(origins, host) - } - - c := cors.New(cors.Options{ - AllowedOrigins: origins, - AllowedMethods: []string{http.MethodGet, http.MethodPost, http.MethodPut, http.MethodDelete, http.MethodOptions}, - AllowedHeaders: headers, - AllowCredentials: true, - Debug: true, - }) - - handler = c.Handler(handler) - } - if cfg.Wrap != nil { handler = cfg.Wrap(handler) }