A Go SDK for bulk importing data into PostgreSQL via S3 external tables.
- Stream data directly to S3 without local temporary files
- Create S3 external tables in PostgreSQL
- Import data from S3 to PostgreSQL using
INSERT INTO ... SELECT FROM
orINSERT INTO ... SELECT FROM ON CONFLICT ...
for the table contains primary keys - Configurable batch size
- Automatic splitting of files based on record count
- Asynchronous processing
- Unique process ID for each task to isolate files in S3
- S3 configuration automatically loaded from the database
- Task progress tracking via checkpoint table
- Configurable error tolerance for handling failed records
- Support for data migration between main and auxiliary tables
import "github.com/powerdrillai/relyt-sdk-go/bulkprocessor"
// Initialize the bulk processor - S3 configuration is automatically loaded from database
config := bulkprocessor.Config{
// PostgreSQL configuration (required)
PostgreSQL: bulkprocessor.PostgreSQLConfig{
Host: "localhost",
Port: 5432,
Username: "postgres",
Password: "password",
Database: "mydatabase",
Table: "mytable",
Schema: "public", // Defaults to public
},
BatchSize: 100000, // Number of records per file
Concurrency: 1, // Number of concurrent imports
MaxErrorRecords: 100, // Number of error records to ignore (0 = no errors allowed)
}
processor, err := bulkprocessor.New(config)
if err != nil {
log.Fatalf("Failed to create bulk processor: %v", err)
}
defer processor.Shutdown() // Ensure processor is shut down properly
// Insert data
type MyData struct {
ID int `relyt:"id"` // relyt tag is used to determine column names
Name string `relyt:"name"`
CreatedAt time.Time `relyt:"created_at"`
}
records := []MyData{
{ID: 1, Name: "John", CreatedAt: time.Now()},
{ID: 2, Name: "Jane", CreatedAt: time.Now()},
}
// You can call Insert multiple times to insert data in batches
err = processor.Insert(records)
if err != nil {
log.Fatalf("Failed to insert data: %v", err)
}
// Get the unique processor ID
processId := processor.GetProcessId()
log.Printf("Task process ID: %s", processId)
// S3 files will be stored in: [prefix]/[date]/[processId]/[filename]
// When finished, flush all remaining data and wait for import to complete
err = processor.Flush()
if err != nil {
log.Fatalf("Failed to flush data: %v", err)
}
The SDK includes comprehensive test cases to verify functionality. You can run the tests using the provided Makefile:
# Run all tests
make test
# Run specific test cases
make test-errors # Test error handling
make test-sleep # Test intermittent data writing
make test-recovery # Test PostgreSQL recovery
make test-timeout # Test import timeout
make test-migration # Test data migration
The SDK automatically loads S3 configuration from PostgreSQL. You need to create a user-defined function (UDF) in your database:
-- Create a type to represent S3 configuration
CREATE TYPE loader_s3_config AS (
endpoint TEXT,
region TEXT,
bucket_name TEXT,
prefix TEXT,
access_key TEXT,
secret_key TEXT
);
-- Create the LOADER_CONFIG function
CREATE OR REPLACE FUNCTION relyt_sys.LOADER_CONFIG()
RETURNS loader_s3_config
LANGUAGE SQL
IMMUTABLE
AS $$
SELECT
's3.amazonaws.com'::TEXT AS endpoint,
'us-west-2'::TEXT AS region,
'your-bucket'::TEXT AS bucket_name,
'import/data'::TEXT AS prefix,
'your-access-key'::TEXT AS access_key,
'your-secret-key'::TEXT AS secret_key,
20 AS concurrency,
5242880 AS part_size,
1800 AS import_timeout,
10 AS import_error_sleep_time
;
$$;
The SDK automatically creates and maintains a checkpoint table in PostgreSQL to track task progress:
CREATE TABLE IF NOT EXISTS relyt_sys.relyt_loader_checkpoint (
process_id TEXT PRIMARY KEY,
pg_table TEXT NOT NULL,
status TEXT NOT NULL,
start_time TIMESTAMP WITH TIME ZONE NOT NULL,
last_insert_time TIMESTAMP WITH TIME ZONE,
files_total INT DEFAULT 0,
files_imported INT DEFAULT 0,
file_details JSONB DEFAULT '[]'::jsonb,
error_message TEXT,
error_records INT DEFAULT 0
);
The checkpoint table provides the following information:
- Task Status: RUNNING, COMPLETED, or FAILED
- Timestamps: Start time and last successful insert time
- File Counts: Total files generated and files successfully imported
- File Details: JSON array containing detailed information about each file
- File IDs and S3 keys/URLs
- Creation and import timestamps
- Number of records in each file
- File status (CREATED, FROZEN, IMPORTING, IMPORTED, ERROR)
- Error Records: Count of records that were skipped due to errors
This checkpoint table is useful for:
- Monitoring progress of long-running import tasks
- Identifying failed imports and their causes
- Resuming interrupted tasks
- Tracking S3 file usage and cleanup
You can query the checkpoint table directly:
-- Get overview of all tasks
SELECT
process_id, pg_table, status,
start_time, last_insert_time,
files_total, files_imported,
error_records
FROM relyt_sys.relyt_loader_checkpoint;
-- Get details of a specific task
SELECT * FROM relyt_sys.relyt_loader_checkpoint
WHERE process_id = 'your-process-id';
-- Get file details for a task
SELECT
file_id, status, s3_key, num_records, created_at, imported_at
FROM relyt_sys.relyt_loader_checkpoint,
jsonb_array_elements(file_details) AS file
WHERE process_id = 'your-process-id';
The SDK provides flexible error handling with the MaxErrorRecords
configuration option. This determines how many records with errors can be ignored during processing:
config := bulkprocessor.Config{
// ... other configuration options ...
MaxErrorRecords: 100, // Allow up to 100 error records
}
When errors are encountered during processing:
-
If the total number of error records (including the current one) is less than or equal to
MaxErrorRecords
:- The error is logged to stderr
- The error count is incremented in the checkpoint table
- Processing continues with the next record
-
If the error count would exceed
MaxErrorRecords
:- The error is returned to the caller
- Processing is halted
This feature is useful for:
- Handling occasional bad records without failing entire jobs
- Maintaining a balance between data quality and processing robustness
- Tracking the number of skipped records for later analysis
The default value for MaxErrorRecords
is 0, which means no errors are tolerated.
- On initialization, the SDK connects to PostgreSQL and calls the
relyt_sys.LOADER_CONFIG()
function - The S3 configuration is retrieved and used for all S3 operations
- The SDK generates a unique process ID for the task and creates a checkpoint record
- When you call
Insert
, data is streamed directly to S3 - When the record count reaches
BatchSize
, the file is frozen and queued for import - A background thread processes the queue of files:
- Creates a temporary S3 external table in PostgreSQL
- Executes INSERT...SELECT to import the data
- Drops the external table when done
- The checkpoint table is updated at each step of the process
- Calling
Flush
waits for all pending files to be imported - On shutdown, the task status is marked as COMPLETED
- PostgreSQL must have access to the S3 storage
- The
relyt_sys.LOADER_CONFIG()
function must be created in your database - The
relyt_sys.relyt_loader_checkpoint
table must be created in your database - Structs must use
relyt:"column_name"
tags to specify column names
Create auxiliary table and routing table
- Auxiliary table with suffix _relyt_massive_group
- Routing table with suffix _relyt_routing and DISTRIBUTED NONE
-- Create an auxiliary table to store the data, must have the same columns as the main table
-- Example:
-- Main table:
CREATE TABLE IF NOT EXISTS public.table_name (
id INT PRIMARY KEY,
group_id INT NOT NULL,
ext text,
vector vecf16(3) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE
);
-- Auxiliary table: must have suffix _relyt_massive_group
CREATE TABLE IF NOT EXISTS public.table_name_relyt_massive_group (
id INT PRIMARY KEY,
routing_id INT NOT NULL,
ext text,
vector vecf16(3) NOT NULL,
created_at TIMESTAMP WITH TIME ZONE
);
-- Routing table: must have suffix _relyt_routing, and DISTRIBUTED NONE
CREATE TABLE IF NOT EXISTS relyt_sys.table_name_relyt_routing (
routing_id TEXT PRIMARY KEY,
store_table_name TEXT NOT NULL
) USING heap DISTRIBUTED NONE;
If a routing table exists, data will be inserted into either the main table or the auxiliary table based on the group_id
in the routing table.
The routing table will be automatically updated when data is inserted into the auxiliary table.
python3 migrate/migrate_data.py --tables public.table_name --threshold 100
The migration tool will:
- Move data from the main table to the auxiliary table based on the threshold
- Update the routing table accordingly
- Maintain data consistency during the migration process