Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -103,8 +103,8 @@ help:
@printf " $(BOLD)$(GREEN)supv:api:logs-err$(NC): Show the the API service supervisor error logs.\n\n"

@printf "$(BOLD)$(BLUE)Caddy Commands:$(NC)\n"
@printf " $(BOLD)$(GREEN)caddy-gen-cert$(NC) : Generate the caddy's mtls certificates.\n"
@printf " $(BOLD)$(GREEN)caddy-del-cert$(NC) : Remove the caddy's mtls certificates.\n"
@printf " $(BOLD)$(GREEN)caddy-gen-certs$(NC) : Generate the caddy's mtls certificates.\n"
@printf " $(BOLD)$(GREEN)caddy-del-certs$(NC) : Remove the caddy's mtls certificates.\n"
@printf " $(BOLD)$(GREEN)caddy-validate$(NC) : Validates caddy's files syntax.\n\n"

@printf "$(BOLD)$(BLUE)Monitoring Commands:$(NC)\n"
Expand Down
59 changes: 54 additions & 5 deletions database/seeder/importer/runner.go
Original file line number Diff line number Diff line change
Expand Up @@ -643,10 +643,38 @@ func executeCopy(ctx context.Context, tx *sql.Tx, stmt statement) error {

insertSQL := fmt.Sprintf("INSERT INTO %s (%s) VALUES (%s)", table, strings.Join(columns, ", "), strings.Join(placeholder, ", "))

for _, row := range rows {
for idx, row := range rows {
rowNumber := idx + 1
rowSavepoint := fmt.Sprintf("importer_copy_row_%d", rowNumber)
if _, err := tx.ExecContext(ctx, "SAVEPOINT "+rowSavepoint); err != nil {
return fmt.Errorf("importer: begin savepoint for COPY row %d of %s failed: %w", rowNumber, table, err)
}

if _, err := tx.ExecContext(ctx, insertSQL, row...); err != nil {
if skip, reason := shouldSkipCopyInsertError(table, err); skip {
if rbErr := rollbackToSavepoint(ctx, tx, rowSavepoint); rbErr != nil {
return errors.Join(fmt.Errorf("importer: skipped COPY row for %s: %s", table, reason), rbErr)
}

fmt.Fprintf(os.Stderr, "importer: skipped COPY row for %s: %s\n", table, reason)

if _, err := tx.ExecContext(ctx, "RELEASE SAVEPOINT "+rowSavepoint); err != nil {
return fmt.Errorf("importer: release copy savepoint %s failed: %w", rowSavepoint, err)
}

continue
}

if rbErr := rollbackToSavepoint(ctx, tx, rowSavepoint); rbErr != nil {
return errors.Join(fmt.Errorf("importer: insert from COPY failed: %w", err), rbErr)
}

return fmt.Errorf("importer: insert from COPY failed: %w", err)
}

if _, err := tx.ExecContext(ctx, "RELEASE SAVEPOINT "+rowSavepoint); err != nil {
return fmt.Errorf("importer: release copy savepoint %s failed: %w", rowSavepoint, err)
}
}

return nil
Expand Down Expand Up @@ -683,10 +711,8 @@ func shouldSkipExecError(stmt statement, err error) (bool, string) {
case "23505":
if strings.HasPrefix(upper, "INSERT INTO ") {
target, _ := statementTarget(stmt.sql)
normalized := normalizeQualifiedIdentifier(target)
switch normalized {
case "schema_migrations", "public.schema_migrations":
return true, fmt.Sprintf("duplicate migration row skipped (%s)", message)
if skip, reason := shouldSkipDuplicateSchemaMigrationInsert(target, message); skip {
return true, reason
}
}
case "42704":
Expand All @@ -702,6 +728,29 @@ func shouldSkipExecError(stmt statement, err error) (bool, string) {
return false, ""
}

func shouldSkipCopyInsertError(table string, err error) (bool, string) {
code, message := sqlStateFromError(err)
if code == "" {
return false, ""
}

if code != "23505" {
return false, ""
}

return shouldSkipDuplicateSchemaMigrationInsert(table, message)
}

func shouldSkipDuplicateSchemaMigrationInsert(target, message string) (bool, string) {
normalized := normalizeQualifiedIdentifier(target)
switch normalized {
case "schema_migrations", "public.schema_migrations":
return true, fmt.Sprintf("duplicate migration row skipped (%s)", message)
}

return false, ""
}

func shouldSkipStatement(stmt statement, skipTables map[string]struct{}) (bool, string) {
if len(skipTables) == 0 {
return false, ""
Expand Down
55 changes: 55 additions & 0 deletions database/seeder/importer/runner_sql_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,9 +109,13 @@ func TestExecuteCopyInsertsRows(t *testing.T) {
defer sqlDB.Close()

mock.ExpectBegin()
mock.ExpectExec("SAVEPOINT importer_copy_row_1").WillReturnResult(sqlmock.NewResult(0, 0))
insert := regexp.QuoteMeta("INSERT INTO public.widgets (id, name) VALUES ($1, $2)")
mock.ExpectExec(insert).WithArgs("1", "foo").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("RELEASE SAVEPOINT importer_copy_row_1").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec("SAVEPOINT importer_copy_row_2").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec(insert).WithArgs("2", "bar").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("RELEASE SAVEPOINT importer_copy_row_2").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectRollback()

tx, err := sqlDB.Begin()
Expand Down Expand Up @@ -150,7 +154,9 @@ func TestExecuteCopyResolvesColumns(t *testing.T) {
WithArgs("public", "widgets").
WillReturnRows(sqlmock.NewRows([]string{"column_name"}).AddRow("id").AddRow("name"))
insert := regexp.QuoteMeta("INSERT INTO public.widgets (id, name) VALUES ($1, $2)")
mock.ExpectExec("SAVEPOINT importer_copy_row_1").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec(insert).WithArgs("1", "foo").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("RELEASE SAVEPOINT importer_copy_row_1").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectRollback()

tx, err := sqlDB.Begin()
Expand Down Expand Up @@ -187,9 +193,11 @@ func TestExecuteCopyHandlesLargeRows(t *testing.T) {
mock.ExpectBegin()
largePayload := strings.Repeat("a", 70_000)
insert := regexp.QuoteMeta("INSERT INTO public.widgets (id, payload) VALUES ($1, $2)")
mock.ExpectExec("SAVEPOINT importer_copy_row_1").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec(insert).
WithArgs("1", largePayload).
WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("RELEASE SAVEPOINT importer_copy_row_1").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectRollback()

tx, err := sqlDB.Begin()
Expand All @@ -216,6 +224,53 @@ func TestExecuteCopyHandlesLargeRows(t *testing.T) {
}
}

func TestExecuteCopySkipsDuplicateCopyRows(t *testing.T) {
sqlDB, mock, err := sqlmock.New()
if err != nil {
t.Fatalf("sqlmock: %v", err)
}
defer sqlDB.Close()

mock.ExpectBegin()
insert := regexp.QuoteMeta("INSERT INTO public.schema_migrations (version, dirty) VALUES ($1, $2)")
mock.ExpectExec("SAVEPOINT importer_copy_row_1").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec(insert).WithArgs("41", "t").WillReturnResult(sqlmock.NewResult(1, 1))
mock.ExpectExec("RELEASE SAVEPOINT importer_copy_row_1").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec("SAVEPOINT importer_copy_row_2").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec(insert).
WithArgs("41", "t").
WillReturnError(errors.New(`ERROR: duplicate key value violates unique constraint "schema_migrations_pkey" (SQLSTATE 23505)`))
mock.ExpectExec("ROLLBACK TO SAVEPOINT importer_copy_row_2").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectExec("RELEASE SAVEPOINT importer_copy_row_2").WillReturnResult(sqlmock.NewResult(0, 0))
mock.ExpectRollback()

tx, err := sqlDB.Begin()
if err != nil {
t.Fatalf("begin: %v", err)
}

stmt := statement{
sql: "COPY public.schema_migrations (version, dirty) FROM stdin",
copyData: []byte(strings.Join([]string{
"41\tt",
"41\tt",
}, "\n")),
isCopy: true,
}

if err := executeCopy(context.Background(), tx, stmt); err != nil {
t.Fatalf("executeCopy unexpected error: %v", err)
}

if err := tx.Rollback(); err != nil {
t.Fatalf("rollback: %v", err)
}

if err := mock.ExpectationsWereMet(); err != nil {
t.Fatalf("unmet expectations: %v", err)
}
}

func TestResolveCopyColumnsErrorsOnEmptyMetadata(t *testing.T) {
sqlDB, mock, err := sqlmock.New()
if err != nil {
Expand Down
34 changes: 34 additions & 0 deletions database/seeder/importer/runner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -406,6 +406,40 @@ func TestSeedFromFileSkipsDuplicateSchemaMigrationsInserts(t *testing.T) {
}
}

func TestSeedFromFileSkipsDuplicateSchemaMigrationsCopy(t *testing.T) {
conn, environment, cleanup := setupPostgresConnection(t)
t.Cleanup(cleanup)

contents := strings.Join([]string{
"CREATE TABLE IF NOT EXISTS public.schema_migrations (version BIGINT PRIMARY KEY, dirty BOOLEAN NOT NULL);",
"COPY public.schema_migrations (version, dirty) FROM stdin;",
"42\tf",
"42\tf",
"\\.",
"",
}, "\n")

fileName := writeStorageFile(t, withSuffix(t, ".sql"), contents)

if err := importer.SeedFromFile(conn, environment, fileName); err != nil {
t.Fatalf("seed from file: %v", err)
}

// Run the seed a second time to ensure COPY duplicate handling stays idempotent.
if err := importer.SeedFromFile(conn, environment, fileName); err != nil {
t.Fatalf("second seed from file: %v", err)
}

var migrationCount int64
if err := conn.Sql().Table("schema_migrations").Where("version = ?", 42).Count(&migrationCount).Error; err != nil {
t.Fatalf("count schema_migrations: %v", err)
}

if migrationCount != 1 {
t.Fatalf("expected 1 schema_migrations row for version 42, got %d", migrationCount)
}
}

func TestSeedFromFileSkipsDuplicateCreates(t *testing.T) {
conn, environment, cleanup := setupPostgresConnection(t)
t.Cleanup(cleanup)
Expand Down
3 changes: 2 additions & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,6 @@ services:
- api
ports:
- "18080:80"
- "8443:443"
- "127.0.0.1:2019:2019" # Admin API - localhost only for debugging

# --- Dedicated /metrics endpoint for Prometheus (internal network only)
Expand Down Expand Up @@ -396,6 +395,8 @@ services:
api-db:
condition: service_healthy
expose:
# local: use Caddy on port 18080 (handles /api prefix like production)
# prod : Caddy handles all traffic (no direct API access)
- ${ENV_HTTP_PORT}
networks:
oullin_net: {}
Expand Down
1 change: 0 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ require (
github.com/klauspost/compress v1.18.0
github.com/lib/pq v1.10.9
github.com/prometheus/client_golang v1.20.5
github.com/rs/cors v1.11.1
github.com/testcontainers/testcontainers-go v0.39.0
github.com/testcontainers/testcontainers-go/modules/postgres v0.39.0
golang.org/x/crypto v0.43.0
Expand Down
2 changes: 0 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,6 @@ github.com/prometheus/procfs v0.15.1 h1:YagwOFzUgYfKKHX6Dr+sHT7km/hxC76UB0leargg
github.com/prometheus/procfs v0.15.1/go.mod h1:fB45yRUv8NstnjriLhBQLuOUt+WW4BsoGhij/e3PBqk=
github.com/rogpeppe/go-internal v1.14.1 h1:UQB4HGPB6osV0SQTLymcB4TgvyWu6ZyliaW0tI/otEQ=
github.com/rogpeppe/go-internal v1.14.1/go.mod h1:MaRKkUm5W0goXpeCfT7UZI6fk/L7L7so1lCWt35ZSgc=
github.com/rs/cors v1.11.1 h1:eU3gRzXLRK57F5rKMGMZURNdIG4EoAmX8k94r9wXWHA=
github.com/rs/cors v1.11.1/go.mod h1:XyqrcTp5zjWr1wsJ8PIRZssZ8b/WMcMf71DJnit4EMU=
github.com/shirou/gopsutil/v4 v4.25.9 h1:JImNpf6gCVhKgZhtaAHJ0serfFGtlfIlSC08eaKdTrU=
github.com/shirou/gopsutil/v4 v4.25.9/go.mod h1:gxIxoC+7nQRwUl/xNhutXlD8lq+jxTgpIkEf3rADHL8=
github.com/sirupsen/logrus v1.9.3 h1:dueUQJ1C2q9oE3F7wvmSGAaVtTmUizReu6fjN8uqzbQ=
Expand Down
21 changes: 13 additions & 8 deletions infra/caddy/Caddyfile.local
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,21 @@
respond 204
}

# Block protected paths
@protected path /metrics /generate-signature*
handle @protected {
respond 403
# API handler - strips /api prefix before forwarding
handle_path /api/* {
# Block protected paths (after /api prefix is stripped)
@protected path /metrics
handle @protected {
respond 403
}

reverse_proxy api:8080
}

# Reverse proxy all incoming requests to the 'api' service.
# - The service name 'api' is resolved by Docker's internal DNS to the correct container IP on the 'caddy_net' network.
# - The API container listens on port 8080 (from the ENV_HTTP_PORT).
reverse_proxy api:8080
# Fallback handler for non-API routes
handle {
respond 404
}
}

# INTERNAL metrics endpoint for Prometheus scraping
Expand Down
45 changes: 2 additions & 43 deletions pkg/endpoint/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,8 @@ import (
"net/http"
"os"
"os/signal"
"strings"
"syscall"
"time"

"github.com/rs/cors"
)

// RunServer starts the provided HTTP server, listens for shutdown signals, and
Expand Down Expand Up @@ -84,53 +81,15 @@ type ServerHandlerConfig struct {
}

// NewServerHandler constructs the HTTP handler using the provided configuration.
// In development environments it applies permissive CORS settings so the
// client app can communicate with the API, and it optionally wraps the handler
// with Sentry instrumentation when supplied.
// CORS is handled by Caddy reverse proxy to avoid duplicate headers.
// The handler is optionally wrapped with Sentry instrumentation when supplied.
func NewServerHandler(cfg ServerHandlerConfig) http.Handler {
if cfg.Mux == nil {
return http.NotFoundHandler()
}

handler := cfg.Mux

if !cfg.IsProduction {
headers := []string{
"Accept",
"Authorization",
"Content-Type",
"X-CSRF-Token",
"User-Agent",
"X-API-Key",
"X-API-Username",
"X-API-Signature",
"X-API-Timestamp",
"X-API-Nonce",
"X-Request-ID",
"If-None-Match",
"X-API-Intended-Origin",
}

origins := []string{"http://localhost:5173"}
if host := cfg.DevHost; host != "" {
if !strings.Contains(host, "://") {
host = "http://" + host
}

origins = append(origins, host)
}

c := cors.New(cors.Options{
AllowedOrigins: origins,
AllowedMethods: []string{http.MethodGet, http.MethodPost, http.MethodPut, http.MethodDelete, http.MethodOptions},
AllowedHeaders: headers,
AllowCredentials: true,
Debug: true,
})

handler = c.Handler(handler)
}

if cfg.Wrap != nil {
handler = cfg.Wrap(handler)
}
Expand Down
Loading