Skip to content

hidb.cassandra.md

maoxiaoyue edited this page May 14, 2026 · 2 revisions

hidb.cassandra — Cassandra 5.0 ORM 與管理工具

pkg/hidb/cassandra 是 HypGo 內建的 Cassandra 5.0+ 完整驅動封裝。它建構在 github.com/gocql/gocql 之上,提供:

  • Fluent DDL builder(Keyspace / Table / Index / MV / UDT / UDF / UDA / Trigger)
  • 基於 struct tag 的 Model 對映與 AutoMigrate
  • CRUD builder(Insert / Update / Delete / Select / Batch / LWT)
  • Vector / ANN 查詢(Cassandra 5.0 內建 vector<float, N>)
  • Migration(版本化 schema 變更)
  • RBAC(role / grant / revoke)
  • Describe / Schema Introspection / Diff
  • Nodetool(CQL 虛擬表 + 可選的 exec 包裝)

目錄:連線 · Keyspace · Table · Alter Table · Index · Materialized View · UDT · UDF--uda · Trigger · Model · AutoMigrate · Insert · Update · Delete · Select · Batch--LWT · Migration · RBAC · Describe · Schema Introspection · Nodetool · Query Tracing · NoSQLBench · 預設值


連線

import "github.com/maoxiaoyue/hypgo/pkg/hidb/cassandra"

db, err := cassandra.New(cassandra.Config{
    Hosts:       []string{"10.0.0.1", "10.0.0.2"},
    Keyspace:    "app",
    Consistency: "quorum",       // any/one/two/three/quorum/all/local_quorum/each_quorum/local_one
    Port:        9042,
    Username:    "cass",
    Password:    "secret",
    ConnectTimeout: 5 * time.Second,
    Timeout:        10 * time.Second,
    NumConns:       4,
    ProtoVersion:   4,
})
if err != nil { log.Fatal(err) }
defer db.Close()

其他 API:

  • cassandra.NewWithoutConnect(cfg) — 延遲連線(呼叫 db.Connect() 才連)
  • db.Session() — 取得底層 *gocql.Session(thread-safe;已關閉時回 nil)
  • db.Ping(ctx) / db.IsConnected()
  • db.Exec(ctx, stmt) — 執行單一 DDL/DML
  • db.ExecScript(ctx, script) — 執行多語句 CQL 腳本(支援 ; 分隔、保留 '...'$$...$$

Production 硬化欄位(2026-04-29 補強):

cfg := cassandra.Config{
    Hosts:    []string{"10.0.0.1", "10.0.0.2"},
    Keyspace: "app",

    // === Consistency ===
    Consistency:       "local_quorum",
    StrictConsistency: true,        // 拼錯字串直接回 error(推薦);
                                    // false(預設)→ fallback 為 LocalOne,不再像舊版悄悄變 Quorum

    // === Retry / Reconnect(不設值即用預設)===
    NumRetries:        3,                       // -1 = 停用 retry
    RetryMinDelay:     100 * time.Millisecond,
    RetryMaxDelay:     10 * time.Second,
    ReconnectMax:      10,                      // -1 = 停用 reconnect
    ReconnectInterval: 1 * time.Second,

    // === TLS / mTLS ===
    TLS: cassandra.TLSConfig{
        Enabled:    true,
        CertFile:   "/etc/cass/client.crt",     // mTLS 用
        KeyFile:    "/etc/cass/client.key",
        CaFile:     "/etc/cass/ca.crt",
        ServerName: "cassandra.prod",
        // InsecureSkipVerify: true,            // 僅 dev
        // Config: customTLSConfig,             // 直接注入 *tls.Config
    },

    // === 可觀測性 ===
    QueryObserver: myObserver,                  // gocql.QueryObserver(latency/error metrics)
}
行為 修正前 修正後
Consistency 拼錯 悄悄 fallback 成 Quorum,單節點 dev 卡死 預設 fallback 為 LocalOne;StrictConsistency=true 直接 error
Retry / Reconnect 完全沒設定 → 抖動即失敗 預設 ExponentialBackoff (3 retries) + ConstantReconnect (10 retries × 1s)
Close() 多 goroutine 同呼會 panic(double close) sync.Mutex 守護;可重複呼叫;closed 後 Connect 直接 error
Connect() 重複呼叫 直接覆寫 session,舊的 leak idempotent — 已連線回 nil,舊 session 關閉後重連
TLS 完全無 TLSConfig 支援檔案路徑或直接注入 *tls.Config
QueryObserver 無暴露 Config.QueryObserver 直接傳入
Init(map) 只解析 hosts/auth/keyspace/port 全欄位(含 timeout / retry / TLS nested block)

Keyspace

// SimpleStrategy(適合單資料中心)
db.Keyspace("app").Simple(3).Create(ctx)

// NetworkTopologyStrategy(production)
db.Keyspace("app").
    NetworkTopology(map[string]int{"dc1": 3, "dc2": 2}).
    DurableWrites(true).
    Create(ctx)

// 取得 CQL 字串
cql := db.Keyspace("app").Simple(3).CreateCQL()

// Alter
db.Keyspace("app").Simple(5).Alter(ctx)

// Drop
db.Keyspace("app").Drop(ctx)  // DROP KEYSPACE IF EXISTS app

// 切換預設 keyspace
db.Use(ctx, "app")

預設值行為: 未指定 replication / DURABLE_WRITES 時自動帶入(見 預設值)。使用 .NoDefaults() 關閉。


Table

完整 Cassandra 5.0 CREATE TABLE 選項:

db.Table("events").
    Column("tenant_id", cassandra.TypeUUID).
    Column("bucket", cassandra.TypeInt).
    Column("ts", cassandra.TypeTimestamp).
    Column("payload", cassandra.TypeText).
    Static("tenant_name", cassandra.TypeText).
    PartitionKey("tenant_id", "bucket").
    ClusteringKey("ts").
    ClusteringOrder("ts", cassandra.Desc).
    WithTTL(604800).
    WithComment("append-only log").
    WithCompaction(cassandra.CompactionOptions{
        Class: cassandra.CompactionTWCS,
        Extra: map[string]interface{}{
            "compaction_window_size": 1,
            "compaction_window_unit": "DAYS",
        },
    }).
    WithCompression(cassandra.CompressionOptions{
        SSTableCompression: cassandra.CompressionZstd,
        ChunkLengthKB:      16,
    }).
    WithCaching(cassandra.CachingOptions{Keys: "ALL", RowsPerPartition: "NONE"}).
    WithCDC(true).
    Create(ctx)

常用型別

常數 CQL 型別
TypeText, TypeVarchar, TypeAscii text / varchar / ascii
TypeInt, TypeBigInt, TypeSmallInt, TypeTinyInt, TypeVarInt int / bigint / smallint / tinyint / varint
TypeFloat, TypeDouble, TypeDecimal float / double / decimal
TypeBoolean, TypeUUID, TimeUUID, TypeInet, TypeBlob, TypeDuration
TypeTimestamp, TypeDate, TypeTime
TypeCounter counter

Vector 序列化注意事項(Cassandra 5.0)

vector<float, N> 是 Cassandra 5.0 新增類型,gocql v1.7.0 尚未原生支援(在 wire 層以 CUSTOM type 傳輸)。直接 session.Query(..., []float32{...}) 可能會被 driver 編成 list<float> 而 server 回 type mismatch。

對策:

// 寫入:手動序列化成 4N bytes big-endian binary32
blob, _ := cassandra.MarshalVectorFloat32(emb, 384)
db.Insert("embeddings").
    Value("id", id).
    Value("vector", blob).      // bind 成 []byte 即可
    Exec(ctx)

// 讀出:從 []byte 解
var raw []byte
db.Select("vector").From("embeddings").Where("id", "=", id).
    One(ctx, &raw)
vec, _ := cassandra.UnmarshalVectorFloat32(raw, 384)

等 gocql 升到原生支援後即可直接傳 []float32;屆時這兩個 helper 可下線。建議在新專案先 smoke test 一下 driver 行為。

集合 / 泛型

cassandra.List(cassandra.TypeInt)            // list<int>
cassandra.Set(cassandra.TypeText)            // set<text>
cassandra.Map(cassandra.TypeText, TypeInt)   // map<text, int>
cassandra.Frozen(cassandra.Set(TypeText))    // frozen<set<text>>
cassandra.Tuple(TypeText, TypeInt)           // tuple<text, int>
cassandra.UDT("address")                     // frozen<address>
cassandra.Vector(TypeFloat, 384)             // vector<float, 384>
cassandra.VectorFloat(384)                   // 簡寫

Compaction 常數

CompactionUnified (5.0 推薦) / CompactionSTCS / CompactionLCS / CompactionTWCS / CompactionIncremental

Compression 常數

CompressionLZ4 / CompressionSnappy / CompressionDeflate / CompressionZstd / CompressionNone


Alter Table

db.AlterTable("users").
    AddColumn("bio", cassandra.TypeText).
    DropColumn("legacy").
    RenameColumn("fullname", "full_name").
    WithOptions(cassandra.TableOptions{DefaultTTL: 3600}).
    Exec(ctx)

// 或單純產出多條 CQL 字串
cql := db.AlterTable("users").AddColumn("bio", cassandra.TypeText).CQL()
// ALTER TABLE users ADD bio text;

Index

SAI(Storage-Attached Index,Cassandra 5.0 推薦):

db.Index("users_name_idx").
    On("users", "name").
    SAI().
    Option("case_sensitive", false).
    Create(ctx)

傳統 2i:

db.Index("users_email_idx").On("users", "email").Create(ctx)

集合欄位索引:

db.Index("tags_idx").On("items", "tags").
    Target(cassandra.IndexTargetKeys).    // KEYS / VALUES / ENTRIES / FULL
    Create(ctx)

自訂索引:

db.Index("custom_idx").On("t", "col").
    Custom("com.example.CustomIndex").
    Create(ctx)

Materialized View

db.MaterializedView("users_by_email").
    FromTable("users").
    Select("*").
    WhereNotNull("email", "id").
    PartitionKey("email").
    ClusteringKey("id").
    Create(ctx)

UDT

db.Type("address").
    Field("street", cassandra.TypeText).
    Field("city", cassandra.TypeText).
    Field("zip", cassandra.TypeText).
    Create(ctx)

// Alter
db.Type("address").AddField("country", cassandra.TypeText).Alter(ctx)
db.Type("address").Rename("zip", "postal_code").Alter(ctx)

UDF / UDA

// User-defined function
db.Function("state_add").
    Arg("s", cassandra.TypeInt).
    Arg("val", cassandra.TypeInt).
    Returns(cassandra.TypeInt).
    Language("java").
    Body("return s + val;").
    Deterministic(true).
    Monotonic(true).
    CalledOnNullInput(false).
    Create(ctx)

// User-defined aggregate
db.Aggregate("my_sum").
    Arg(cassandra.TypeInt).
    SFunc("state_add").
    StateType(cassandra.TypeInt).
    InitCond("0").
    Create(ctx)

Trigger

// 部署 Java 實作類到每個節點的 $CASSANDRA_HOME/conf/triggers 後:
db.Trigger("audit").
    On("logs.events").
    Using("com.example.triggers.AuditTrigger").
    Create(ctx)

db.Trigger("audit").On("logs.events").Drop(ctx)

Model

用 struct tag 宣告對映:

type User struct {
    ID        gocql.UUID `cql:"id,pk"`
    CreatedAt time.Time  `cql:"created_at,ck,order=desc"`
    Email     string     `cql:"email,omitempty"`
    Tags      []string   `cql:"tags,type=set<text>"`
    Vector    []float32  `cql:"vector,type=vector<float, 384>"`
}

func (User) TableName() string { return "users" }

標籤語法

cql:"name[,kind][,type=<cql>][,order=asc|desc][,position=N][,omitempty][,static][,counter]"
關鍵字 說明
pk / partition / partition_key Partition key
ck / clustering / clustering_key Clustering key
static 靜態欄位
counter counter 欄位
type=... 明確指定 CQL 型別(集合/tuple/UDT/vector 必填)
`order=asc desc`
position=N 複合 key 的順序
omitempty Insert/Update 時零值跳過

API

info, _ := cassandra.ParseModel(&User{})
fmt.Println(info.Table, info.PartitionKey, info.Clustering, info.Columns())

tb, _ := db.TableFromModel(&User{})
tb.Create(ctx)

AutoMigrate

一次把多個 model 對應的 table 建出來:

db.AutoMigrate(ctx, &User{}, &Event{}, &Embedding{})

Insert

// 手動
stmt, args := db.Insert("users").
    Value("id", id).Value("email", "a@b.c").
    IfNotExists().
    TTL(3600).
    CQL()

// 執行
err := db.Insert("users").
    Value("id", id).Value("email", "a@b.c").
    Exec(ctx)

// LWT(CAS)— IF NOT EXISTS,回傳是否真的寫入
applied, err := db.Insert("users").
    Value("id", id).Value("email", "a@b.c").
    IfNotExists().
    ExecCAS(ctx)

// 從 model struct 插入
db.Save(ctx, &User{ID: id, Email: "a@b.c"})

// 或用 model 的 options
db.Save(ctx, &user, cassandra.SaveTTL(3600), cassandra.SaveIfNotExists())

Update

// 一般更新
db.Update("users").
    Set("email", "new@x.y").
    Where("id", "=", id).
    Exec(ctx)

// TTL + LWT
applied, _ := db.Update("users").
    Set("email", "new@x.y").
    Where("id", "=", id).
    If("email", "=", "old@x.y").
    ExecCAS(ctx)

// Counter
db.Update("page_views").Increment("views", 1).Where("page", "=", "/home").Exec(ctx)
db.Update("page_views").Decrement("views", 1).Where("page", "=", "/home").Exec(ctx)

// Collection 操作
db.Update("items").Append("tags", []string{"new"}).Where("id", "=", id).Exec(ctx)
db.Update("items").Prepend("tags", []string{"first"}).Where("id", "=", id).Exec(ctx)
db.Update("items").Remove("tags", []string{"old"}).Where("id", "=", id).Exec(ctx)

Delete

// 整行
db.Delete().From("users").Where("id", "=", id).Exec(ctx)

// 只刪欄位
db.Delete().Columns("email").From("users").Where("id", "=", id).Exec(ctx)

// 刪集合中某元素
db.Delete().ElementAt("tags", 0).From("items").Where("id", "=", id).Exec(ctx)

// LWT
applied, _ := db.Delete().From("users").Where("id", "=", id).IfExists().ExecCAS(ctx)

Select

// One
var u User
err := db.Select("id", "email").From("users").
    Where("id", "=", id).
    One(ctx, &u)

// All
var users []User
db.Select("*").From("users").All(ctx, &users)

// Count
n, _ := db.Select().From("users").Count(ctx)

// IN / LIMIT / ALLOW FILTERING
db.Select("*").From("users").
    WhereIn("id", ids).
    Limit(100).
    AllowFiltering().
    All(ctx, &users)

// ANN(vector similarity search)
var results []Embedding
db.Select("*").From("embeddings").
    OrderByANN("vector", queryVector).
    Limit(10).
    All(ctx, &results)

// Pagination (page state)
iter := db.Select("*").From("users").PageSize(100).Iter(ctx)

Batch & LWT

b := db.BatchBuilder(gocql.LoggedBatch)
b.Insert("users").Value("id", id1).Value("email", "a@b")
b.Update("events").Set("status", "done").Where("id", "=", id2)
b.Exec(ctx)

// Counter batch
b := db.BatchBuilder(gocql.CounterBatch)
b.Update("counters").Increment("views", 1).Where("page", "=", "/")
b.Exec(ctx)

// Batch CAS
applied, _ := b.ExecCAS(ctx)

Migration

mig := cassandra.NewMigrator(db, "app", "schema_migrations")
mig.Register(
    cassandra.Migration{
        Version: 202604170001,
        Name:    "create_users",
        Up:   func(ctx context.Context, db *cassandra.CassandraDB) error { /* ... */ },
        Down: func(ctx context.Context, db *cassandra.CassandraDB) error { /* ... */ },
    },
)
mig.Up(ctx)          // 套用所有未套用的
mig.Down(ctx)        // 回滾最後一筆
report, _ := mig.Status(ctx)

RBAC

Role

db.Role("alice").
    Password("secret").
    Login(true).
    Superuser(false).
    Create(ctx)

db.Role("alice").Password("newpw").Alter(ctx)
db.Role("alice").Drop(ctx)

Grant / Revoke

// 權限等級
cassandra.PermAll / PermSelect / PermModify / PermCreate / PermAlter / PermDrop / PermAuthorize / PermDescribe / PermExecute / PermUnmask / PermSelectMask

// 資源
cassandra.AllKeyspaces()
cassandra.KeyspaceResource("app")
cassandra.TableResource("app.users")
cassandra.RoleResource("alice")

// Grant
db.Grant(ctx, cassandra.PermSelect, cassandra.TableResource("app.users"), "reader")
db.Revoke(ctx, cassandra.PermModify, cassandra.KeyspaceResource("app"), "writer")

// Role hierarchy
db.GrantRole(ctx, "admin", "alice")   // GRANT admin TO alice
db.RevokeRole(ctx, "admin", "alice")

// 僅取 CQL 字串
stmt, _ := cassandra.GrantCQL(cassandra.PermSelect, cassandra.TableResource("app.users"), "reader")
stmt := cassandra.ListRolesCQL()
stmt := cassandra.ListPermissionsCQL("alice")

Describe

// DESCRIBE 字串產生
cassandra.DescribeCQL(cassandra.DescTable, "logs.events")
// → DESCRIBE TABLE logs.events
cassandra.DescribeCQL(cassandra.DescFullSchema, "")

// 透過 system_schema 取得結構化資料
ks, _ := db.DescribeKeyspace(ctx, "app")
tables, _ := db.DescribeTables(ctx, "app")
cols, _ := db.DescribeColumns(ctx, "app", "users")
tbl, cols, _ := db.DescribeTable(ctx, "app", "users")

// 從 system_schema 結果重建 CREATE TABLE 語句
ddl := cassandra.RenderTableDDL(*tbl, cols)

Schema Introspection

完整抓取 system_schema.*

// 全部 keyspace(預設排除 system_*)
schema, _ := db.Introspect(ctx, cassandra.IntrospectOptions{})

// 單一 keyspace
ks, _ := db.IntrospectKeyspace(ctx, "app")

// JSON 輸出(AI context / migration diff 用)
raw, _ := schema.MarshalJSON()

// Diff:算出從 old 變到 new 的變更
changes := cassandra.SchemaDiff(oldSchema, newSchema)
for _, c := range changes {
    fmt.Printf("%s  %s.%s.%s  %s\n", c.Kind, c.Keyspace, c.Table, c.Column, c.Detail)
}

變更類型:ChangeAddKeyspace / DropKeyspace / AlterKeyspace / AddTable / DropTable / AlterTable / AddColumn / DropColumn / AlterColumn


Nodetool

方案 B:純 CQL(system_views 虛擬表)

// nodetool info + status subset
info, _ := db.LocalInfo(ctx)
peers, _ := db.Peers(ctx)
snap, _ := db.ClusterStatus(ctx)

// nodetool tpstats
pools, _ := db.ThreadPools(ctx)

// nodetool compactionstats
tasks, _ := db.CompactionTasks(ctx)

// nodetool clientstats
clients, _ := db.Clients(ctx)

// nodetool getlogginglevels / get*
all, _ := db.Settings(ctx)
val, _ := db.Setting(ctx, "compaction_throughput_mb_per_sec")

// cache info
caches, _ := db.Caches(ctx)

// tablestats(size 部分)
totals, _ := db.TableStats(ctx, "app", "users")

// 等待 compaction 跑完
elapsed, _ := db.PollCompactionsUntilIdle(ctx, time.Second)

// 泛型 MapScan
rows, _ := db.RawSystemViewRow(ctx, "thread_pools", nil)

方案 A:包裝 nodetool 二進位(動作類)

nt := &cassandra.NodetoolExec{
    Binary: "nodetool",  // 預設 "nodetool"
    Host:   "10.0.0.1",
    Port:   7199,
    Username: "admin",
    Password: "secret",
}

nt.Flush(ctx, "app", "events")
nt.Compact(ctx, "app", "events")
nt.Repair(ctx, "app", nil, cassandra.RepairOptions{Full: true, PrimaryRange: true})
nt.Cleanup(ctx, "app")
nt.Drain(ctx)
nt.Snapshot(ctx, "backup-2026-04-17", "app", "events")
nt.ClearSnapshot(ctx, "backup-2026-04-17", "app")
nt.Refresh(ctx, "app", "events")
nt.SetCompactionThroughput(ctx, 64)
nt.SetLoggingLevel(ctx, "org.apache.cassandra.db", "DEBUG")
out, _ := nt.Status(ctx, "")

安全性: 使用 exec.CommandContext 傳參,無 shell 解譯,不受命令注入影響。


Query Tracing

啟用 Cassandra 原生 tracing(寫入 system_traces.sessions / system_traces.events),可查每條查詢的 coordinator 路徑、每個階段耗時、replica 分派等。

一次性探針(最常用)

tr, err := db.TraceProbe(ctx, 2*time.Second,
    "SELECT * FROM app.users WHERE id = ?", uid)
if err != nil { log.Fatal(err) }

fmt.Println(tr.Format())
// → Trace <uuid>
//     command      : QUERY
//     coordinator  : 10.0.0.1
//     duration     : 1.523ms (server-reported)
//     total elapsed: 1.41ms (max source_elapsed)
//     events:
//       [     120µs] 10.0.0.1        Native-Transport-1   Parsing SELECT ...
//       [     340µs] 10.0.0.1        ReadStage-1          Executing single-partition query
//       ...

針對既有 gocql.Query 開啟 tracing

q := db.Session().Query("SELECT * FROM app.users").WithContext(ctx)
tracer := cassandra.TraceQuery(q)      // 掛上 MemTracer
_ = q.Iter().Close()                   // 正常執行 query

if id, ok := tracer.Last(); ok {
    trace, _ := db.WaitForTrace(ctx, id, 2*time.Second, 50*time.Millisecond)
    fmt.Println(trace.Format())
}

取得結構化資料

type Trace struct {
    Session TraceSession    // command / coordinator / duration / request / client
    Events  []TraceEvent    // 依 source_elapsed 排序
}

tr, _ := db.GetTrace(ctx, sessionID)
fmt.Println(tr.Duration(), tr.TotalElapsed())
for _, e := range tr.Events {
    fmt.Printf("%s @%dµs: %s\n", e.Source, e.SourceElapsed, e.Activity)
}

cqlsh 風格即時輸出

tracer := cassandra.NewTraceWriter(db.Session(), os.Stdout)
q := db.Session().Query("SELECT * FROM users").WithContext(ctx)
q.Trace(tracer)
_ = q.Iter().Close()
// 每個 event 直接 print 到 stdout

API

函式 / 型別 用途
db.TraceProbe(ctx, wait, stmt, args...) 最簡介面:執行 + 等 trace flush + 回傳 *Trace
db.GetTrace(ctx, sessionID) 直接讀 system_traces.*
db.WaitForTrace(ctx, id, timeout, interval) 輪詢等待 trace 可見(server 寫入有延遲)
cassandra.TraceQuery(q) *gocql.Query 上掛 MemTracer
MemTracer.Last() / .Sessions() / .Reset() 取得 / 重設收集到的 session ID
cassandra.NewTraceWriter(sess, w) 邊跑邊 print 每個 event(cqlsh 風格)
Trace.Format() 人類可讀多行輸出
Trace.Duration() / .TotalElapsed() 伺服器自報 / 最大 source_elapsed

注意事項

  • Cassandra 寫入 system_traces.*非同步的,直接 GetTrace 可能拿到 ErrNotFound;用 WaitForTraceTraceProbe
  • Tracing 預設 TTL 24 小時(tracetype_query_ttl / tracetype_repair_ttl)。
  • 開 tracing 會加 coordinator 負擔;不要全量開,做 sampling(例如 1%)或只針對慢查詢診斷。
  • TraceProbe 會丟棄查詢結果,只供診斷使用;不要拿來取資料。

NoSQLBench (nb5)

pkg/hidb/cassandra/nb5.go 提供 NoSQLBench v5 (nb5 二進位) 的安全 exec wrapper,用來對 Cassandra 5.0 做壓力測試 / benchmark / 容量規劃。

前置: 需另外安裝 nb5,並可在 PATH 上找到(或設定 Binary 指定完整路徑)。nb5 是獨立的 Java fat-jar 工具,不隨 HypGo 發佈。使用 exec.CommandContext 以 argv 方式傳參,不經 shell,無命令注入風險。

基礎設定

nb := &cassandra.NB5Exec{
    Binary:   "nb5",                 // 預設 "nb5"
    Host:     "10.0.0.1",
    Port:     9042,
    LocalDC:  "dc1",                 // 現代 driver 必填
    Driver:   "cqld4",               // nb5 預設
    Username: "cass",
    Password: "secret",
    WorkingDir: "/var/benchmarks",   // cwd(讀本地 YAML 時有用)
}

// 探查
out, _ := nb.Version(ctx)
out, _  = nb.ListWorkloads(ctx)
out, _  = nb.ListDrivers(ctx)

最簡:跑 bundled workload 的某個階段

// schema / rampup / main / read / write
out, _ := nb.RunPhase(ctx, "cql-iot", cassandra.PhaseSchema, "", 1, nil)
out, _  = nb.RunPhase(ctx, "cql-iot", cassandra.PhaseRampup, "100000", 0, nil)
out, _  = nb.RunPhase(ctx, "cql-iot", cassandra.PhaseMain, "1M", 32, map[string]string{
    "keycount":  "1000000",
    "cyclerate": "10000",
})

Fluent scenario builder(推薦)

out, err := nb.Scenario("cql-iot").
    Phase(cassandra.PhaseMain).
    Cycles("1M").
    Rampup("100k").
    Threads(32).
    CycleRate("10000").              // 固定 ops/s 目標
    Param("keycount", "1000000").
    Errors("count").                 // count | warn | stop | retry | ignore
    Alias("steady-state").
    Extra("--report-summary-to", "stdout:60s").
    Extra("--log-histograms", "metrics.hdr:.*:10s").
    Run(ctx)

// 除錯:先印出完整命令列
fmt.Println(nb.Scenario("cql-iot").Phase(cassandra.PhaseSchema).CommandLine())

自己的 workload YAML

out, _ := nb.RunActivity(ctx, "/var/bench/my-workload.yaml", map[string]string{
    "tags":    "block:main",
    "cycles":  "5M",
    "threads": "64",
})

即生成 inline workload(快速打煙霧測試)

path, _ := cassandra.WriteInlineWorkload("/tmp/kv.yaml", "app", "kv")

// 1. 建 schema
nb.Scenario(path).Phase(cassandra.PhaseSchema).Run(ctx)

// 2. rampup 寫入
nb.Scenario(path).Phase(cassandra.PhaseRampup).
    Cycles("100000").Threads(16).Run(ctx)

// 3. 混合讀寫
out, _ := nb.Scenario(path).Phase(cassandra.PhaseMain).
    Cycles("1M").Threads(32).CycleRate("20000").Run(ctx)

模板包含 scenariosbindingsblocks: schema / rampup / main,並定義 {seq_key}{rw_key}{payload} 三個 binding。

解析結果摘要

summary := cassandra.ParseSummary(out)
fmt.Printf("cycles=%d rate=%.0f ops/s duration=%s errors=%d\n",
    summary.TotalCycles, summary.OpsPerSec, summary.Duration, summary.Errors)

完整指標: ParseSummary 只擷取 stdout 結尾的 headline。詳細 latency 分位數 / timer histogram 請用 nb5 內建匯出器:

  • --report-summary-to stdout:60s — 每 60 秒印
  • --log-histograms metrics.hdr:.*:10s — HDR histogram 檔
  • --report-graphite-to graphite:2003 / --report-prometheus ...

API 一覽

型別 / 方法 用途
NB5Exec 組態(binary / host / localdc / driver / auth)
.Run(ctx, args...) 原始呼叫,回傳 stdout
.RunActivity(ctx, activity, params) 單 activity,params 自動排序
.RunPhase(ctx, workload, phase, cycles, threads, extra) 最常用:直接指定某階段
.Scenario(activity) Fluent builder
.Version / ListWorkloads / ListDrivers 探查
Phase 常數 PhaseSchemaPhaseRampupPhaseMainPhaseReadPhaseWrite
WriteInlineWorkload(path, ks, tbl) 產生 inline workload YAML
ParseSummary(stdout) 擷取 cycles / rate / duration / errors

建議工作流

1. Introspect 既有 schema(或設計好 model)
2. WriteInlineWorkload 或撰寫 workload YAML
3. RunPhase(..., PhaseSchema, ...) 建表
4. RunPhase(..., PhaseRampup, ...) 寫入 baseline 資料
5. 用 Scenario builder 跑 main phase(固定 cyclerate + threads)
6. ParseSummary 或直接收集 HDR histogram
7. 期間同時用 ThreadPools / CompactionTasks 觀察叢集負載

預設值

pkg/hidb/cassandra/defaults.go 定義的全域預設變數(可在 init() 覆寫):

變數 預設值 用途
DefaultKeyspaceReplication SimpleStrategy RF=1 Keyspace 未指定 replication 時
DefaultDurableWrites true Keyspace 未指定 DURABLE_WRITES 時
DefaultCompaction UnifiedCompactionStrategy Table 未指定 compaction 時(5.0 推薦)
DefaultCompression LZ4 + 16 KB chunk Table 未指定 compression 時
DefaultCaching keys=ALL, rows=NONE Table 未指定 caching 時
DefaultGCGraceSeconds 864000(10 天)
DefaultBloomFilterFPChance 0.01
DefaultSpeculativeRetry "99p"

行為:

  • 預設自動套用KeyspaceBuilder.CreateCQL()TableBuilder.CreateCQL()
  • 呼叫者已顯式指定的值絕不會被覆寫
  • .NoDefaults() 關閉(產生「只含顯式設定」的 CQL)
// 全域覆寫(舉例改為 LCS)
cassandra.DefaultCompaction = cassandra.CompactionOptions{Class: cassandra.CompactionLCS}

// 單 builder 關閉
db.Table("t").Column("id", cassandra.TypeUUID).PartitionKey("id").NoDefaults().CreateCQL()

完整範例

func bootstrap(ctx context.Context) {
    db, _ := cassandra.New(cassandra.Config{
        Hosts: []string{"127.0.0.1"}, Keyspace: "app",
    })
    defer db.Close()

    // 1. 建 keyspace
    db.Keyspace("app").NetworkTopology(map[string]int{"dc1": 3}).Create(ctx)
    db.Use(ctx, "app")

    // 2. 建 UDT
    db.Type("address").
        Field("city", cassandra.TypeText).
        Field("zip", cassandra.TypeText).
        Create(ctx)

    // 3. 用 model AutoMigrate
    db.AutoMigrate(ctx, &User{}, &Event{})

    // 4. SAI 索引
    db.Index("users_email_idx").On("users", "email").SAI().Create(ctx)

    // 5. 寫入 + 查詢
    u := User{ID: gocql.TimeUUID(), Email: "a@b.c"}
    db.Save(ctx, &u)

    var got User
    db.Select("*").From("users").Where("id", "=", u.ID).One(ctx, &got)

    // 6. 觀察叢集狀態
    pools, _ := db.ThreadPools(ctx)
    for _, p := range pools { fmt.Println(p.Name, p.ActiveTasks) }

    // 7. Snapshot 一份 schema 備 AI 用
    schema, _ := db.Introspect(ctx, cassandra.IntrospectOptions{})
    os.WriteFile("schema.json", mustJSON(schema), 0644)
}

HypGo

繁體中文 | English


中文文件

設計文件

套件

AI 協作工具鏈

CLI 命令


English Docs

Design Docs

Packages

AI Collaboration Toolchain

CLI Commands

Clone this wiki locally