Skip to content

Commit

Permalink
feat(cli): add statement-log-file option
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmitry Kropachev committed Dec 18, 2023
1 parent 16a7413 commit 35c1511
Show file tree
Hide file tree
Showing 9 changed files with 236 additions and 56 deletions.
22 changes: 10 additions & 12 deletions cmd/gemini/root.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ var (
requestTimeout time.Duration
connectTimeout time.Duration
profilingPort int
statementLogFile string
)

func interactive() bool {
Expand Down Expand Up @@ -133,14 +134,6 @@ func readSchema(confFile string, schemaConfig typedef.SchemaConfig) (*typedef.Sc
return schemaBuilder.Build(), nil
}

type createBuilder struct {
stmt string
}

func (cb createBuilder) ToCql() (stmt string, names []string) {
return cb.stmt, nil
}

func run(_ *cobra.Command, _ []string) error {
logger := createLogger(level)
globalStatus := status.NewGlobalStatus(1000)
Expand Down Expand Up @@ -219,6 +212,7 @@ func run(_ *cobra.Command, _ []string) error {
MaxRetriesMutate: maxRetriesMutate,
MaxRetriesMutateSleep: maxRetriesMutateSleep,
UseServerSideTimestamps: useServerSideTimestamps,
LogStatementsFile: statementLogFile,
}
var tracingFile *os.File
if tracingOutFile != "" {
Expand All @@ -243,22 +237,25 @@ func run(_ *cobra.Command, _ []string) error {
defer utils.IgnoreError(st.Close)

if dropSchema && mode != jobs.ReadMode {
for _, stmt := range generators.GetDropSchema(schema) {
for _, stmt := range generators.GetDropKeyspace(schema) {
logger.Debug(stmt)
if err = st.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil {
if err = st.Mutate(context.Background(), typedef.SimpleStmt(stmt, typedef.DropKeyspaceStatementType)); err != nil {
return errors.Wrap(err, "unable to drop schema")
}
}
}

testKeyspace, oracleKeyspace := generators.GetCreateKeyspaces(schema)
if err = st.Create(context.Background(), createBuilder{stmt: testKeyspace}, createBuilder{stmt: oracleKeyspace}); err != nil {
if err = st.Create(
context.Background(),
typedef.SimpleStmt(testKeyspace, typedef.CreateKeyspaceStatementType),
typedef.SimpleStmt(oracleKeyspace, typedef.CreateKeyspaceStatementType)); err != nil {
return errors.Wrap(err, "unable to create keyspace")
}

for _, stmt := range generators.GetCreateSchema(schema) {
logger.Debug(stmt)
if err = st.Mutate(context.Background(), createBuilder{stmt: stmt}); err != nil {
if err = st.Mutate(context.Background(), typedef.SimpleStmt(stmt, typedef.CreateSchemaStatementType)); err != nil {
return errors.Wrap(err, "unable to create schema")
}
}
Expand Down Expand Up @@ -531,6 +528,7 @@ func init() {
rootCmd.Flags().DurationVarP(&connectTimeout, "connect-timeout", "", 30*time.Second, "Duration of waiting connection established")
rootCmd.Flags().IntVarP(&profilingPort, "profiling-port", "", 0, "If non-zero starts pprof profiler on given port at 'http://0.0.0.0:<port>/profile'")
rootCmd.Flags().IntVarP(&maxErrorsToStore, "max-errors-to-store", "", 1000, "Maximum number of errors to store and output at the end")
rootCmd.Flags().StringVarP(&statementLogFile, "statement-log-file", "l", "", "File to write statements flow to")
}

func printSetup(seed, schemaSeed uint64) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/generators/statement_generator.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func GetCreateSchema(s *typedef.Schema) []string {
return stmts
}

func GetDropSchema(s *typedef.Schema) []string {
func GetDropKeyspace(s *typedef.Schema) []string {
return []string{
fmt.Sprintf("DROP KEYSPACE IF EXISTS %s", s.Keyspace.Name),
}
Expand Down
8 changes: 3 additions & 5 deletions pkg/jobs/jobs.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func ddl(
if w := logger.Check(zap.DebugLevel, "ddl statement"); w != nil {
w.Write(zap.String("pretty_cql", ddlStmt.PrettyCQL()))
}
if err = s.Mutate(ctx, ddlStmt.Query); err != nil {
if err = s.Mutate(ctx, ddlStmt); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
Expand Down Expand Up @@ -376,13 +376,11 @@ func mutation(
}
return err
}
mutateQuery := mutateStmt.Query
mutateValues := mutateStmt.Values

if w := logger.Check(zap.DebugLevel, "mutation statement"); w != nil {
w.Write(zap.String("pretty_cql", mutateStmt.PrettyCQL()))
}
if err = s.Mutate(ctx, mutateQuery, mutateValues...); err != nil {
if err = s.Mutate(ctx, mutateStmt); err != nil {
if errors.Is(err, context.Canceled) {
return nil
}
Expand Down Expand Up @@ -425,7 +423,7 @@ func validation(
attempt := 1
for {
lastErr = err
err = s.Check(ctx, table, stmt.Query, attempt == maxAttempts, stmt.Values...)
err = s.Check(ctx, table, stmt, attempt == maxAttempts)

if err == nil {
if attempt > 1 {
Expand Down
117 changes: 117 additions & 0 deletions pkg/stmtlogger/filelogger.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,117 @@
// Copyright 2019 ScyllaDB
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package stmtlogger

import (
"fmt"
"log"
"os"
"sync"
"sync/atomic"
"time"

"github.com/pkg/errors"

"github.com/scylladb/gemini/pkg/typedef"
)

const (
defaultChanSize = 1000
errorsOnFileLimit = 5
)

type FileLogger struct {

Check failure on line 35 in pkg/stmtlogger/filelogger.go

View workflow job for this annotation

GitHub Actions / Lint Test and Build

fieldalignment: struct with 72 pointer bytes could be 32 (govet)
fd *os.File
filename string
isFileNonOperational bool
mt sync.RWMutex

Check failure on line 39 in pkg/stmtlogger/filelogger.go

View workflow job for this annotation

GitHub Actions / Lint Test and Build

field `mt` is unused (unused)
activeChannel atomic.Pointer[chan *typedef.Stmt]
channel chan *typedef.Stmt
}

func (fl *FileLogger) LogStmt(stmt *typedef.Stmt) {
ch := fl.activeChannel.Load()
if ch != nil {
*ch <- stmt
}
}

func (fl *FileLogger) Close() error {
return fl.fd.Close()
}

func (fl *FileLogger) committer() {
defer func() {
fl.activeChannel.Swap(nil)
close(fl.channel)
}()

errsAtRow := 0

for stmt := range fl.channel {
if fl.isFileNonOperational {
continue
}

stmtStr := stmt.PrettyCQL()
_, err := fl.fd.Write([]byte(fmt.Sprintf("%s: %s;\n", time.Now().Format(time.RFC1123Z), stmtStr)))
if err == nil {
errsAtRow = 0
continue
}

if errors.Is(err, os.ErrClosed) {
fl.isFileNonOperational = true
return
}

errsAtRow++
if errsAtRow > errorsOnFileLimit {
fl.isFileNonOperational = true
}
log.Printf("failed to write to file %q: %s", fl.filename, err.Error())
return
}
}

func NewFileLogger(filename string) (*FileLogger, error) {
fd, err := os.OpenFile(filename, os.O_CREATE|os.O_TRUNC|os.O_WRONLY, 0644)
if err != nil {
return nil, err
}

ch := make(chan *typedef.Stmt, defaultChanSize)

out := &FileLogger{
filename: filename,
fd: fd,
channel: ch,
}
out.activeChannel.Store(&ch)

go out.committer()
return out, nil
}

func drain(ch chan *typedef.Stmt) {

Check failure on line 108 in pkg/stmtlogger/filelogger.go

View workflow job for this annotation

GitHub Actions / Lint Test and Build

func `drain` is unused (unused)
loop:
for {
select {
case <-ch:
default:
break loop
}
}
}
25 changes: 12 additions & 13 deletions pkg/store/cqlstore.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,14 +45,14 @@ func (cs *cqlStore) name() string {
return cs.system
}

func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, values ...interface{}) (err error) {
func (cs *cqlStore) mutate(ctx context.Context, stmt *typedef.Stmt) (err error) {
var i int
for i = 0; i < cs.maxRetriesMutate; i++ {
// retry with new timestamp as list modification with the same ts
// will produce duplicated values, see https://github.com/scylladb/scylladb/issues/7937
err = cs.doMutate(ctx, builder, time.Now(), values...)
err = cs.doMutate(ctx, stmt, time.Now())
if err == nil {
cs.ops.WithLabelValues(cs.system, opType(builder)).Inc()
cs.ops.WithLabelValues(cs.system, opType(stmt)).Inc()
return nil
}
select {
Expand All @@ -67,10 +67,9 @@ func (cs *cqlStore) mutate(ctx context.Context, builder qb.Builder, values ...in
return err
}

func (cs *cqlStore) doMutate(ctx context.Context, builder qb.Builder, ts time.Time, values ...interface{}) error {
queryBody, _ := builder.ToCql()

query := cs.session.Query(queryBody, values...).WithContext(ctx)
func (cs *cqlStore) doMutate(ctx context.Context, builder *typedef.Stmt, ts time.Time) error {
queryBody, _ := builder.Query.ToCql()
query := cs.session.Query(queryBody, builder.Values...).WithContext(ctx)
if cs.useServerSideTimestamps {
query = query.DefaultTimestamp(false)
} else {
Expand All @@ -90,10 +89,10 @@ func (cs *cqlStore) doMutate(ctx context.Context, builder qb.Builder, ts time.Ti
return nil
}

func (cs *cqlStore) load(ctx context.Context, builder qb.Builder, values []interface{}) (result []map[string]interface{}, err error) {
query, _ := builder.ToCql()
iter := cs.session.Query(query, values...).WithContext(ctx).Iter()
cs.ops.WithLabelValues(cs.system, opType(builder)).Inc()
func (cs *cqlStore) load(ctx context.Context, stmt *typedef.Stmt) (result []map[string]interface{}, err error) {
query, _ := stmt.Query.ToCql()
iter := cs.session.Query(query, stmt.Values...).WithContext(ctx).Iter()
cs.ops.WithLabelValues(cs.system, opType(stmt)).Inc()
return loadSet(iter), iter.Close()
}

Expand Down Expand Up @@ -126,8 +125,8 @@ func ignore(err error) bool {
}
}

func opType(builder qb.Builder) string {
switch builder.(type) {
func opType(stmt *typedef.Stmt) string {
switch stmt.Query.(type) {
case *qb.InsertBuilder:
return "insert"
case *qb.DeleteBuilder:
Expand Down
Loading

0 comments on commit 35c1511

Please sign in to comment.