Skip to content

hidb.cassandra_en.md

maoxiaoyue edited this page May 14, 2026 · 1 revision

hidb.cassandra — Cassandra 5.0 ORM & Admin Toolkit

pkg/hidb/cassandra is HypGo's full-featured Cassandra 5.0+ driver wrapper. Built on top of github.com/gocql/gocql, it offers:

  • Fluent DDL builders (Keyspace / Table / Index / MV / UDT / UDF / UDA / Trigger)
  • Struct-tag based model mapping and AutoMigrate
  • CRUD builders (Insert / Update / Delete / Select / Batch / LWT)
  • Vector / ANN search (native vector<float, N> in Cassandra 5.0)
  • Migration (versioned schema changes)
  • RBAC (role / grant / revoke)
  • Describe / Schema Introspection / Diff
  • Nodetool (CQL virtual-table queries + optional exec wrapper)

Contents: Connection · Keyspace · Table · Alter Table · Index · Materialized View · UDT · UDF / UDA · Trigger · Model · AutoMigrate · Insert · Update · Delete · Select · Batch & LWT · Migration · RBAC · Describe · Schema Introspection · Nodetool · Query Tracing · NoSQLBench · Defaults


Connection

import "github.com/maoxiaoyue/hypgo/pkg/hidb/cassandra"

db, err := cassandra.New(cassandra.Config{
    Hosts:       []string{"10.0.0.1", "10.0.0.2"},
    Keyspace:    "app",
    Consistency: "quorum",       // any/one/two/three/quorum/all/local_quorum/each_quorum/local_one
    Port:        9042,
    Username:    "cass",
    Password:    "secret",
    ConnectTimeout: 5 * time.Second,
    Timeout:        10 * time.Second,
    NumConns:       4,
    ProtoVersion:   4,
})
if err != nil { log.Fatal(err) }
defer db.Close()

Other APIs:

  • cassandra.NewWithoutConnect(cfg) — deferred connection (call db.Connect() to open)
  • db.Session() — get the underlying *gocql.Session (thread-safe; nil after Close)
  • db.Ping(ctx) / db.IsConnected()
  • db.Exec(ctx, stmt) — execute a single DDL/DML statement
  • db.ExecScript(ctx, script) — execute a multi-statement CQL script (handles ; splitting while preserving '...' and $$...$$)

Production hardening fields (added 2026-04-29):

cfg := cassandra.Config{
    Hosts:    []string{"10.0.0.1", "10.0.0.2"},
    Keyspace: "app",

    // === Consistency ===
    Consistency:       "local_quorum",
    StrictConsistency: true,        // unknown string → error (recommended);
                                    // false (default) → fallback to LocalOne instead of
                                    // the old silent-Quorum behavior that hung dev clusters

    // === Retry / Reconnect (omit to use defaults) ===
    NumRetries:        3,                       // -1 disables retry
    RetryMinDelay:     100 * time.Millisecond,
    RetryMaxDelay:     10 * time.Second,
    ReconnectMax:      10,                      // -1 disables reconnect
    ReconnectInterval: 1 * time.Second,

    // === TLS / mTLS ===
    TLS: cassandra.TLSConfig{
        Enabled:    true,
        CertFile:   "/etc/cass/client.crt",     // mTLS
        KeyFile:    "/etc/cass/client.key",
        CaFile:     "/etc/cass/ca.crt",
        ServerName: "cassandra.prod",
        // InsecureSkipVerify: true,            // dev only
        // Config: customTLSConfig,             // inject a custom *tls.Config directly
    },

    // === Observability ===
    QueryObserver: myObserver,                  // gocql.QueryObserver (latency / error metrics)
}
Behavior Before After
Unknown consistency string Silent fallback to Quorum — hangs single-node dev Default fallback to LocalOne; StrictConsistency=true returns error
Retry / reconnect Not configured → any blip fails Default ExponentialBackoff (3 retries) + ConstantReconnect (10 × 1s)
Close() Could panic on concurrent calls (double close) sync.Mutex guarded, idempotent; subsequent Connect() errors
Connect() repeat Overwrote session, leaking the old one Idempotent — no-op if alive, replaces only if old session is closed
TLS Unsupported TLSConfig (file paths or inject *tls.Config)
QueryObserver Not exposed Config.QueryObserver
Init(map) Only parsed hosts/auth/keyspace/port Full set incl. timeouts / retry / nested TLS block

Keyspace

// SimpleStrategy (single datacenter)
db.Keyspace("app").Simple(3).Create(ctx)

// NetworkTopologyStrategy (production)
db.Keyspace("app").
    NetworkTopology(map[string]int{"dc1": 3, "dc2": 2}).
    DurableWrites(true).
    Create(ctx)

// Get the CQL string
cql := db.Keyspace("app").Simple(3).CreateCQL()

// Alter
db.Keyspace("app").Simple(5).Alter(ctx)

// Drop
db.Keyspace("app").Drop(ctx)  // DROP KEYSPACE IF EXISTS app

// Switch default keyspace
db.Use(ctx, "app")

Defaults: replication / DURABLE_WRITES are auto-filled when not specified (see Defaults). Disable via .NoDefaults().


Table

Complete Cassandra 5.0 CREATE TABLE options:

db.Table("events").
    Column("tenant_id", cassandra.TypeUUID).
    Column("bucket", cassandra.TypeInt).
    Column("ts", cassandra.TypeTimestamp).
    Column("payload", cassandra.TypeText).
    Static("tenant_name", cassandra.TypeText).
    PartitionKey("tenant_id", "bucket").
    ClusteringKey("ts").
    ClusteringOrder("ts", cassandra.Desc).
    WithTTL(604800).
    WithComment("append-only log").
    WithCompaction(cassandra.CompactionOptions{
        Class: cassandra.CompactionTWCS,
        Extra: map[string]interface{}{
            "compaction_window_size": 1,
            "compaction_window_unit": "DAYS",
        },
    }).
    WithCompression(cassandra.CompressionOptions{
        SSTableCompression: cassandra.CompressionZstd,
        ChunkLengthKB:      16,
    }).
    WithCaching(cassandra.CachingOptions{Keys: "ALL", RowsPerPartition: "NONE"}).
    WithCDC(true).
    Create(ctx)

Common types

Constant CQL type
TypeText, TypeVarchar, TypeAscii text / varchar / ascii
TypeInt, TypeBigInt, TypeSmallInt, TypeTinyInt, TypeVarInt int / bigint / smallint / tinyint / varint
TypeFloat, TypeDouble, TypeDecimal float / double / decimal
TypeBoolean, TypeUUID, TimeUUID, TypeInet, TypeBlob, TypeDuration
TypeTimestamp, TypeDate, TypeTime
TypeCounter counter

Vector serialization caveat (Cassandra 5.0)

vector<float, N> is a Cassandra 5.0 type that gocql v1.7.0 does not yet handle natively (it arrives as a CUSTOM type at the wire level). Binding []float32 directly may be marshalled as list<float>, causing a server-side type mismatch.

Workaround:

// Insert — serialize manually to 4N bytes big-endian binary32
blob, _ := cassandra.MarshalVectorFloat32(emb, 384)
db.Insert("embeddings").
    Value("id", id).
    Value("vector", blob).      // bind as []byte
    Exec(ctx)

// Read — decode the []byte
var raw []byte
db.Select("vector").From("embeddings").Where("id", "=", id).
    One(ctx, &raw)
vec, _ := cassandra.UnmarshalVectorFloat32(raw, 384)

Once the driver lands native vector<float, N> support these two helpers can be retired and []float32 bound directly. Smoke-test driver behavior before standardizing on either path in a new project.

Collections / generics

cassandra.List(cassandra.TypeInt)            // list<int>
cassandra.Set(cassandra.TypeText)            // set<text>
cassandra.Map(cassandra.TypeText, TypeInt)   // map<text, int>
cassandra.Frozen(cassandra.Set(TypeText))    // frozen<set<text>>
cassandra.Tuple(TypeText, TypeInt)           // tuple<text, int>
cassandra.UDT("address")                     // frozen<address>
cassandra.Vector(TypeFloat, 384)             // vector<float, 384>
cassandra.VectorFloat(384)                   // shorthand

Compaction constants

CompactionUnified (recommended on 5.0) / CompactionSTCS / CompactionLCS / CompactionTWCS / CompactionIncremental

Compression constants

CompressionLZ4 / CompressionSnappy / CompressionDeflate / CompressionZstd / CompressionNone


Alter Table

db.AlterTable("users").
    AddColumn("bio", cassandra.TypeText).
    DropColumn("legacy").
    RenameColumn("fullname", "full_name").
    WithOptions(cassandra.TableOptions{DefaultTTL: 3600}).
    Exec(ctx)

// Or just produce the CQL strings
cql := db.AlterTable("users").AddColumn("bio", cassandra.TypeText).CQL()
// ALTER TABLE users ADD bio text;

Index

SAI (Storage-Attached Index, recommended on Cassandra 5.0):

db.Index("users_name_idx").
    On("users", "name").
    SAI().
    Option("case_sensitive", false).
    Create(ctx)

Legacy 2i:

db.Index("users_email_idx").On("users", "email").Create(ctx)

Collection-column index:

db.Index("tags_idx").On("items", "tags").
    Target(cassandra.IndexTargetKeys).    // KEYS / VALUES / ENTRIES / FULL
    Create(ctx)

Custom index:

db.Index("custom_idx").On("t", "col").
    Custom("com.example.CustomIndex").
    Create(ctx)

Materialized View

db.MaterializedView("users_by_email").
    FromTable("users").
    Select("*").
    WhereNotNull("email", "id").
    PartitionKey("email").
    ClusteringKey("id").
    Create(ctx)

UDT

db.Type("address").
    Field("street", cassandra.TypeText).
    Field("city", cassandra.TypeText).
    Field("zip", cassandra.TypeText).
    Create(ctx)

// Alter
db.Type("address").AddField("country", cassandra.TypeText).Alter(ctx)
db.Type("address").Rename("zip", "postal_code").Alter(ctx)

UDF / UDA

// User-defined function
db.Function("state_add").
    Arg("s", cassandra.TypeInt).
    Arg("val", cassandra.TypeInt).
    Returns(cassandra.TypeInt).
    Language("java").
    Body("return s + val;").
    Deterministic(true).
    Monotonic(true).
    CalledOnNullInput(false).
    Create(ctx)

// User-defined aggregate
db.Aggregate("my_sum").
    Arg(cassandra.TypeInt).
    SFunc("state_add").
    StateType(cassandra.TypeInt).
    InitCond("0").
    Create(ctx)

Trigger

// After deploying the Java trigger class into each node's
// $CASSANDRA_HOME/conf/triggers directory:
db.Trigger("audit").
    On("logs.events").
    Using("com.example.triggers.AuditTrigger").
    Create(ctx)

db.Trigger("audit").On("logs.events").Drop(ctx)

Model

Declare mappings via struct tags:

type User struct {
    ID        gocql.UUID `cql:"id,pk"`
    CreatedAt time.Time  `cql:"created_at,ck,order=desc"`
    Email     string     `cql:"email,omitempty"`
    Tags      []string   `cql:"tags,type=set<text>"`
    Vector    []float32  `cql:"vector,type=vector<float, 384>"`
}

func (User) TableName() string { return "users" }

Tag grammar

cql:"name[,kind][,type=<cql>][,order=asc|desc][,position=N][,omitempty][,static][,counter]"
Keyword Meaning
pk / partition / partition_key Partition key
ck / clustering / clustering_key Clustering key
static Static column
counter Counter column
type=... Explicit CQL type (required for collections / tuple / UDT / vector)
`order=asc desc`
position=N Order in a composite key
omitempty Skip zero values on Insert/Update

API

info, _ := cassandra.ParseModel(&User{})
fmt.Println(info.Table, info.PartitionKey, info.Clustering, info.Columns())

tb, _ := db.TableFromModel(&User{})
tb.Create(ctx)

AutoMigrate

Create multiple model-mapped tables in one call:

db.AutoMigrate(ctx, &User{}, &Event{}, &Embedding{})

Insert

// Manual
stmt, args := db.Insert("users").
    Value("id", id).Value("email", "a@b.c").
    IfNotExists().
    TTL(3600).
    CQL()

// Execute
err := db.Insert("users").
    Value("id", id).Value("email", "a@b.c").
    Exec(ctx)

// LWT (CAS) — IF NOT EXISTS, returns whether the write applied
applied, err := db.Insert("users").
    Value("id", id).Value("email", "a@b.c").
    IfNotExists().
    ExecCAS(ctx)

// Insert from a model struct
db.Save(ctx, &User{ID: id, Email: "a@b.c"})

// With options
db.Save(ctx, &user, cassandra.SaveTTL(3600), cassandra.SaveIfNotExists())

Update

// Basic update
db.Update("users").
    Set("email", "new@x.y").
    Where("id", "=", id).
    Exec(ctx)

// TTL + LWT
applied, _ := db.Update("users").
    Set("email", "new@x.y").
    Where("id", "=", id).
    If("email", "=", "old@x.y").
    ExecCAS(ctx)

// Counter
db.Update("page_views").Increment("views", 1).Where("page", "=", "/home").Exec(ctx)
db.Update("page_views").Decrement("views", 1).Where("page", "=", "/home").Exec(ctx)

// Collection ops
db.Update("items").Append("tags", []string{"new"}).Where("id", "=", id).Exec(ctx)
db.Update("items").Prepend("tags", []string{"first"}).Where("id", "=", id).Exec(ctx)
db.Update("items").Remove("tags", []string{"old"}).Where("id", "=", id).Exec(ctx)

Delete

// Whole row
db.Delete().From("users").Where("id", "=", id).Exec(ctx)

// Specific columns
db.Delete().Columns("email").From("users").Where("id", "=", id).Exec(ctx)

// One element from a collection
db.Delete().ElementAt("tags", 0).From("items").Where("id", "=", id).Exec(ctx)

// LWT
applied, _ := db.Delete().From("users").Where("id", "=", id).IfExists().ExecCAS(ctx)

Select

// One
var u User
err := db.Select("id", "email").From("users").
    Where("id", "=", id).
    One(ctx, &u)

// All
var users []User
db.Select("*").From("users").All(ctx, &users)

// Count
n, _ := db.Select().From("users").Count(ctx)

// IN / LIMIT / ALLOW FILTERING
db.Select("*").From("users").
    WhereIn("id", ids).
    Limit(100).
    AllowFiltering().
    All(ctx, &users)

// ANN (vector similarity search)
var results []Embedding
db.Select("*").From("embeddings").
    OrderByANN("vector", queryVector).
    Limit(10).
    All(ctx, &results)

// Pagination (page state)
iter := db.Select("*").From("users").PageSize(100).Iter(ctx)

Batch & LWT

b := db.BatchBuilder(gocql.LoggedBatch)
b.Insert("users").Value("id", id1).Value("email", "a@b")
b.Update("events").Set("status", "done").Where("id", "=", id2)
b.Exec(ctx)

// Counter batch
b := db.BatchBuilder(gocql.CounterBatch)
b.Update("counters").Increment("views", 1).Where("page", "=", "/")
b.Exec(ctx)

// Batch CAS
applied, _ := b.ExecCAS(ctx)

Migration

mig := cassandra.NewMigrator(db, "app", "schema_migrations")
mig.Register(
    cassandra.Migration{
        Version: 202604170001,
        Name:    "create_users",
        Up:   func(ctx context.Context, db *cassandra.CassandraDB) error { /* ... */ },
        Down: func(ctx context.Context, db *cassandra.CassandraDB) error { /* ... */ },
    },
)
mig.Up(ctx)          // apply all pending
mig.Down(ctx)        // roll back the latest
report, _ := mig.Status(ctx)

RBAC

Role

db.Role("alice").
    Password("secret").
    Login(true).
    Superuser(false).
    Create(ctx)

db.Role("alice").Password("newpw").Alter(ctx)
db.Role("alice").Drop(ctx)

Grant / Revoke

// Permission levels
cassandra.PermAll / PermSelect / PermModify / PermCreate / PermAlter / PermDrop / PermAuthorize / PermDescribe / PermExecute / PermUnmask / PermSelectMask

// Resources
cassandra.AllKeyspaces()
cassandra.KeyspaceResource("app")
cassandra.TableResource("app.users")
cassandra.RoleResource("alice")

// Grant
db.Grant(ctx, cassandra.PermSelect, cassandra.TableResource("app.users"), "reader")
db.Revoke(ctx, cassandra.PermModify, cassandra.KeyspaceResource("app"), "writer")

// Role hierarchy
db.GrantRole(ctx, "admin", "alice")   // GRANT admin TO alice
db.RevokeRole(ctx, "admin", "alice")

// Just get the CQL string
stmt, _ := cassandra.GrantCQL(cassandra.PermSelect, cassandra.TableResource("app.users"), "reader")
stmt := cassandra.ListRolesCQL()
stmt := cassandra.ListPermissionsCQL("alice")

Describe

// Build a DESCRIBE statement
cassandra.DescribeCQL(cassandra.DescTable, "logs.events")
// → DESCRIBE TABLE logs.events
cassandra.DescribeCQL(cassandra.DescFullSchema, "")

// Fetch structured data from system_schema
ks, _ := db.DescribeKeyspace(ctx, "app")
tables, _ := db.DescribeTables(ctx, "app")
cols, _ := db.DescribeColumns(ctx, "app", "users")
tbl, cols, _ := db.DescribeTable(ctx, "app", "users")

// Reconstruct CREATE TABLE from system_schema rows
ddl := cassandra.RenderTableDDL(*tbl, cols)

Schema Introspection

Full dump of system_schema.*:

// All keyspaces (system_* excluded by default)
schema, _ := db.Introspect(ctx, cassandra.IntrospectOptions{})

// Single keyspace
ks, _ := db.IntrospectKeyspace(ctx, "app")

// JSON output (for AI context / migration diff)
raw, _ := schema.MarshalJSON()

// Diff: compute changes from old → new
changes := cassandra.SchemaDiff(oldSchema, newSchema)
for _, c := range changes {
    fmt.Printf("%s  %s.%s.%s  %s\n", c.Kind, c.Keyspace, c.Table, c.Column, c.Detail)
}

Change kinds: ChangeAddKeyspace / DropKeyspace / AlterKeyspace / AddTable / DropTable / AlterTable / AddColumn / DropColumn / AlterColumn


Nodetool

Option B: pure CQL (system_views virtual tables)

// nodetool info + status subset
info, _ := db.LocalInfo(ctx)
peers, _ := db.Peers(ctx)
snap, _ := db.ClusterStatus(ctx)

// nodetool tpstats
pools, _ := db.ThreadPools(ctx)

// nodetool compactionstats
tasks, _ := db.CompactionTasks(ctx)

// nodetool clientstats
clients, _ := db.Clients(ctx)

// nodetool getlogginglevels / get*
all, _ := db.Settings(ctx)
val, _ := db.Setting(ctx, "compaction_throughput_mb_per_sec")

// cache info
caches, _ := db.Caches(ctx)

// tablestats (size portion)
totals, _ := db.TableStats(ctx, "app", "users")

// Wait until compactions finish
elapsed, _ := db.PollCompactionsUntilIdle(ctx, time.Second)

// Generic MapScan
rows, _ := db.RawSystemViewRow(ctx, "thread_pools", nil)

Option A: wrap the nodetool binary (action commands)

nt := &cassandra.NodetoolExec{
    Binary: "nodetool",  // default "nodetool"
    Host:   "10.0.0.1",
    Port:   7199,
    Username: "admin",
    Password: "secret",
}

nt.Flush(ctx, "app", "events")
nt.Compact(ctx, "app", "events")
nt.Repair(ctx, "app", nil, cassandra.RepairOptions{Full: true, PrimaryRange: true})
nt.Cleanup(ctx, "app")
nt.Drain(ctx)
nt.Snapshot(ctx, "backup-2026-04-17", "app", "events")
nt.ClearSnapshot(ctx, "backup-2026-04-17", "app")
nt.Refresh(ctx, "app", "events")
nt.SetCompactionThroughput(ctx, 64)
nt.SetLoggingLevel(ctx, "org.apache.cassandra.db", "DEBUG")
out, _ := nt.Status(ctx, "")

Security: uses exec.CommandContext with argument vectors — no shell interpretation, no command-injection exposure.


Query Tracing

Enable Cassandra's native tracing (rows written to system_traces.sessions / system_traces.events) to inspect coordinator path, per-stage timing, replica dispatch, etc.

One-shot probe (most common)

tr, err := db.TraceProbe(ctx, 2*time.Second,
    "SELECT * FROM app.users WHERE id = ?", uid)
if err != nil { log.Fatal(err) }

fmt.Println(tr.Format())
// → Trace <uuid>
//     command      : QUERY
//     coordinator  : 10.0.0.1
//     duration     : 1.523ms (server-reported)
//     total elapsed: 1.41ms (max source_elapsed)
//     events:
//       [     120µs] 10.0.0.1        Native-Transport-1   Parsing SELECT ...
//       [     340µs] 10.0.0.1        ReadStage-1          Executing single-partition query
//       ...

Attach tracing to an existing gocql.Query

q := db.Session().Query("SELECT * FROM app.users").WithContext(ctx)
tracer := cassandra.TraceQuery(q)      // attach MemTracer
_ = q.Iter().Close()                   // run query normally

if id, ok := tracer.Last(); ok {
    trace, _ := db.WaitForTrace(ctx, id, 2*time.Second, 50*time.Millisecond)
    fmt.Println(trace.Format())
}

Access structured data

type Trace struct {
    Session TraceSession    // command / coordinator / duration / request / client
    Events  []TraceEvent    // sorted by source_elapsed
}

tr, _ := db.GetTrace(ctx, sessionID)
fmt.Println(tr.Duration(), tr.TotalElapsed())
for _, e := range tr.Events {
    fmt.Printf("%s @%dµs: %s\n", e.Source, e.SourceElapsed, e.Activity)
}

cqlsh-style live output

tracer := cassandra.NewTraceWriter(db.Session(), os.Stdout)
q := db.Session().Query("SELECT * FROM users").WithContext(ctx)
q.Trace(tracer)
_ = q.Iter().Close()
// Each event is printed directly to stdout as it arrives.

API

Function / type Purpose
db.TraceProbe(ctx, wait, stmt, args...) Simplest path: execute + wait for trace flush + return *Trace
db.GetTrace(ctx, sessionID) Read system_traces.* directly
db.WaitForTrace(ctx, id, timeout, interval) Poll until trace is visible (server writes with delay)
cassandra.TraceQuery(q) Attach a MemTracer to *gocql.Query
MemTracer.Last() / .Sessions() / .Reset() Get / reset collected session IDs
cassandra.NewTraceWriter(sess, w) Print each event to a writer as it's received (cqlsh style)
Trace.Format() Human-readable multi-line output
Trace.Duration() / .TotalElapsed() Server-reported vs max source_elapsed

Notes

  • Cassandra writes system_traces.* asynchronously — calling GetTrace too soon returns ErrNotFound. Use WaitForTrace or TraceProbe.
  • Traces expire after 24 h by default (tracetype_query_ttl / tracetype_repair_ttl).
  • Tracing adds coordinator overhead; don't enable it everywhere. Sample (e.g. 1%) or scope to slow queries only.
  • TraceProbe discards query results — it's for diagnostics only, not for fetching data.

NoSQLBench (nb5)

pkg/hidb/cassandra/nb5.go provides a safe exec wrapper around NoSQLBench v5 (nb5 binary) for stress testing, benchmarking, and capacity planning against Cassandra 5.0.

Prerequisite: install nb5 separately and make it available on PATH (or set Binary to the full path). nb5 is a standalone Java fat-jar tool — it is not shipped with HypGo. Launches use exec.CommandContext with an argv vector, so there is no shell interpretation and no command-injection risk.

Basic setup

nb := &cassandra.NB5Exec{
    Binary:   "nb5",                 // default "nb5"
    Host:     "10.0.0.1",
    Port:     9042,
    LocalDC:  "dc1",                 // required by modern drivers
    Driver:   "cqld4",               // nb5 default
    Username: "cass",
    Password: "secret",
    WorkingDir: "/var/benchmarks",   // cwd (useful when loading local YAMLs)
}

// Discovery
out, _ := nb.Version(ctx)
out, _  = nb.ListWorkloads(ctx)
out, _  = nb.ListDrivers(ctx)

Simplest: run a phase of a bundled workload

// schema / rampup / main / read / write
out, _ := nb.RunPhase(ctx, "cql-iot", cassandra.PhaseSchema, "", 1, nil)
out, _  = nb.RunPhase(ctx, "cql-iot", cassandra.PhaseRampup, "100000", 0, nil)
out, _  = nb.RunPhase(ctx, "cql-iot", cassandra.PhaseMain, "1M", 32, map[string]string{
    "keycount":  "1000000",
    "cyclerate": "10000",
})

Fluent scenario builder (recommended)

out, err := nb.Scenario("cql-iot").
    Phase(cassandra.PhaseMain).
    Cycles("1M").
    Rampup("100k").
    Threads(32).
    CycleRate("10000").              // fixed ops/s target
    Param("keycount", "1000000").
    Errors("count").                 // count | warn | stop | retry | ignore
    Alias("steady-state").
    Extra("--report-summary-to", "stdout:60s").
    Extra("--log-histograms", "metrics.hdr:.*:10s").
    Run(ctx)

// Debug: print the full command line first
fmt.Println(nb.Scenario("cql-iot").Phase(cassandra.PhaseSchema).CommandLine())

Your own workload YAML

out, _ := nb.RunActivity(ctx, "/var/bench/my-workload.yaml", map[string]string{
    "tags":    "block:main",
    "cycles":  "5M",
    "threads": "64",
})

Generate an inline workload (quick smoke test)

path, _ := cassandra.WriteInlineWorkload("/tmp/kv.yaml", "app", "kv")

// 1. create schema
nb.Scenario(path).Phase(cassandra.PhaseSchema).Run(ctx)

// 2. rampup writes
nb.Scenario(path).Phase(cassandra.PhaseRampup).
    Cycles("100000").Threads(16).Run(ctx)

// 3. mixed read/write
out, _ := nb.Scenario(path).Phase(cassandra.PhaseMain).
    Cycles("1M").Threads(32).CycleRate("20000").Run(ctx)

The template contains scenarios, bindings, and blocks: schema / rampup / main with {seq_key}, {rw_key}, {payload} bindings predefined.

Parse the summary

summary := cassandra.ParseSummary(out)
fmt.Printf("cycles=%d rate=%.0f ops/s duration=%s errors=%d\n",
    summary.TotalCycles, summary.OpsPerSec, summary.Duration, summary.Errors)

Full metrics: ParseSummary only extracts the headline on stdout. For detailed latency percentiles / timer histograms use nb5's built-in exporters:

  • --report-summary-to stdout:60s — periodic summary
  • --log-histograms metrics.hdr:.*:10s — HDR histogram file
  • --report-graphite-to graphite:2003 / --report-prometheus ...

API at a glance

Type / method Purpose
NB5Exec Configuration (binary / host / localdc / driver / auth)
.Run(ctx, args...) Raw invocation, returns stdout
.RunActivity(ctx, activity, params) Single activity, params sorted deterministically
.RunPhase(ctx, workload, phase, cycles, threads, extra) Most common: target a specific phase
.Scenario(activity) Fluent builder
.Version / ListWorkloads / ListDrivers Discovery
Phase constants PhaseSchema, PhaseRampup, PhaseMain, PhaseRead, PhaseWrite
WriteInlineWorkload(path, ks, tbl) Emit an inline workload YAML
ParseSummary(stdout) Extract cycles / rate / duration / errors

Suggested workflow

1. Introspect existing schema (or design the model first)
2. WriteInlineWorkload or author a workload YAML
3. RunPhase(..., PhaseSchema, ...)  → create tables
4. RunPhase(..., PhaseRampup, ...)  → seed baseline data
5. Scenario builder for main phase (fixed cyclerate + threads)
6. ParseSummary or collect HDR histograms
7. Meanwhile watch ThreadPools / CompactionTasks for cluster load

Defaults

Package-level variables defined in pkg/hidb/cassandra/defaults.go (override in init()):

Variable Default Purpose
DefaultKeyspaceReplication SimpleStrategy RF=1 Keyspace without explicit replication
DefaultDurableWrites true Keyspace without explicit DURABLE_WRITES
DefaultCompaction UnifiedCompactionStrategy Table without explicit compaction (recommended on 5.0)
DefaultCompression LZ4 + 16 KB chunks Table without explicit compression
DefaultCaching keys=ALL, rows=NONE Table without explicit caching
DefaultGCGraceSeconds 864000 (10 days)
DefaultBloomFilterFPChance 0.01
DefaultSpeculativeRetry "99p"

Behavior:

  • Defaults are auto-applied inside KeyspaceBuilder.CreateCQL() and TableBuilder.CreateCQL().
  • Values explicitly set by the caller are never overwritten.
  • Call .NoDefaults() to opt out (produces CQL with only what you explicitly set).
// Global override (example: switch to LCS)
cassandra.DefaultCompaction = cassandra.CompactionOptions{Class: cassandra.CompactionLCS}

// Per-builder opt-out
db.Table("t").Column("id", cassandra.TypeUUID).PartitionKey("id").NoDefaults().CreateCQL()

Full example

func bootstrap(ctx context.Context) {
    db, _ := cassandra.New(cassandra.Config{
        Hosts: []string{"127.0.0.1"}, Keyspace: "app",
    })
    defer db.Close()

    // 1. Keyspace
    db.Keyspace("app").NetworkTopology(map[string]int{"dc1": 3}).Create(ctx)
    db.Use(ctx, "app")

    // 2. UDT
    db.Type("address").
        Field("city", cassandra.TypeText).
        Field("zip", cassandra.TypeText).
        Create(ctx)

    // 3. AutoMigrate from models
    db.AutoMigrate(ctx, &User{}, &Event{})

    // 4. SAI index
    db.Index("users_email_idx").On("users", "email").SAI().Create(ctx)

    // 5. Write + read
    u := User{ID: gocql.TimeUUID(), Email: "a@b.c"}
    db.Save(ctx, &u)

    var got User
    db.Select("*").From("users").Where("id", "=", u.ID).One(ctx, &got)

    // 6. Observe cluster state
    pools, _ := db.ThreadPools(ctx)
    for _, p := range pools { fmt.Println(p.Name, p.ActiveTasks) }

    // 7. Snapshot schema for AI usage
    schema, _ := db.Introspect(ctx, cassandra.IntrospectOptions{})
    os.WriteFile("schema.json", mustJSON(schema), 0644)
}

References

  • Source: workspace/pkg/hidb/cassandra/
  • Dependency: github.com/gocql/gocql v1.7.0
  • Tests: go test ./pkg/hidb/cassandra/... -count=1

HypGo

繁體中文 | English


中文文件

設計文件

套件

AI 協作工具鏈

CLI 命令


English Docs

Design Docs

Packages

AI Collaboration Toolchain

CLI Commands

Clone this wiki locally