Skip to content

Commit

Permalink
Add in-memory only storage option (closes #14)
Browse files Browse the repository at this point in the history
  • Loading branch information
jbeemster committed Aug 18, 2018
1 parent 521dbd4 commit 5d0c22d
Show file tree
Hide file tree
Showing 5 changed files with 183 additions and 34 deletions.
15 changes: 12 additions & 3 deletions tracker/emitter.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,8 +86,10 @@ func InitEmitter(options ...func(*Emitter)) *Emitter {
}
}

// Setup Event Storage
e.Storage = *InitStorage(e.DbName)
// Setup default event storage
if e.Storage == nil {
e.Storage = *InitStorageSQLite3(e.DbName)
}

// Setup HttpClient
timeout := time.Duration(5 * time.Second)
Expand Down Expand Up @@ -132,11 +134,18 @@ func OptionByteLimitPost(byteLimitPost int) func(e *Emitter) {
return func(e *Emitter) { e.ByteLimitPost = byteLimitPost }
}

// OptionDbName sets the name of the storage database.
// OptionDbName overrides the default name of the storage database.
func OptionDbName(dbName string) func(e *Emitter) {
return func(e *Emitter) { e.DbName = dbName }
}

// OptionStorage sets a custom event Storage target which implements the Storage interface
//
// Note: If this option is used OptionDbName will be ignored
func OptionStorage(storage Storage) func(e *Emitter) {
return func(e *Emitter) { e.Storage = storage }
}

// OptionCallback sets a custom callback for the emitter loop.
func OptionCallback(callback func(successCount []CallbackResult, failureCount []CallbackResult)) func(e *Emitter) {
return func(e *Emitter) { e.Callback = callback }
Expand Down
6 changes: 6 additions & 0 deletions tracker/emitter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"testing"
"github.com/stretchr/testify/assert"
"log"
"reflect"
)

func TestEmitterInit(t *testing.T) {
Expand All @@ -33,6 +34,7 @@ func TestEmitterInit(t *testing.T) {
log.Println("Successes: " + IntToString(len(g)))
log.Println("Failures: " + IntToString(len(b)))
}),
OptionStorage(*InitStorageMemory()),
)

// Assert the option builders
Expand All @@ -49,6 +51,8 @@ func TestEmitterInit(t *testing.T) {
assert.Nil(emitter.SendChannel)
assert.NotNil(emitter.Callback)
assert.NotNil(emitter.HttpClient)
assert.NotNil(emitter.Storage)
assert.Equal("tracker.StorageMemory", reflect.TypeOf(emitter.Storage).String())

// Assert defaults
emitter = InitEmitter(RequireCollectorUri("com.acme"), OptionDbName("/home/vagrant/test.db"))
Expand All @@ -65,6 +69,8 @@ func TestEmitterInit(t *testing.T) {
assert.Nil(emitter.SendChannel)
assert.Nil(emitter.Callback)
assert.NotNil(emitter.HttpClient)
assert.NotNil(emitter.Storage)
assert.Equal("tracker.StorageSQLite3", reflect.TypeOf(emitter.Storage).String())

// Assert the set functions
emitter.SetCollectorUri("com.snplow")
Expand Down
131 changes: 122 additions & 9 deletions tracker/storage.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ import (
"log"
"database/sql"
_ "github.com/mattn/go-sqlite3"
"github.com/hashicorp/go-memdb"
"sync/atomic"
)

const (
Expand All @@ -26,21 +28,132 @@ const (
DB_COLUMN_EVENT = "event"
)

type Storage struct {
DbName string
type Storage interface {
AddEventRow(payload Payload) bool
DeleteAllEventRows() int64
DeleteEventRows(ids []int) int64
GetAllEventRows() []EventRow
GetEventRowsWithinRange(eventRange int) []EventRow
}

type RawEventRow struct {
id int
event []byte
}

type RawEventRowUint struct {
id uint
event []byte
}

type EventRow struct {
id int
event Payload
}

func InitStorage(dbName string) *Storage {
// --- Memory Storage Implementation

type StorageMemory struct {
Db *memdb.MemDB
Index *uint32
}

func InitStorageMemory() *StorageMemory {
schema := &memdb.DBSchema{
Tables: map[string]*memdb.TableSchema{
DB_TABLE_NAME: &memdb.TableSchema{
Name: DB_TABLE_NAME,
Indexes: map[string]*memdb.IndexSchema{
DB_COLUMN_ID: &memdb.IndexSchema{
Name: DB_COLUMN_ID,
Unique: true,
Indexer: &memdb.UintFieldIndex{Field: DB_COLUMN_ID},
},
DB_COLUMN_EVENT: &memdb.IndexSchema{
Name: DB_COLUMN_EVENT,
Indexer: &memdb.StringFieldIndex{Field: DB_COLUMN_EVENT},
},
},
},
},
}

db, err := memdb.NewMemDB(schema); checkErr(err)

return &StorageMemory{ Db: db, Index: new(uint32) }
}

// AddEventRow adds a new event to the database
//
// NOTE: As entries are not auto-incremeneting the id is incremented manually which
// limits inserts to 4,294,967,295 in single session
func (s StorageMemory) AddEventRow(payload Payload) bool {
txn := s.Db.Txn(true)
byteBuffer := SerializeMap(payload.Get())
rer := &RawEventRowUint{ event: byteBuffer, id: uint(atomic.AddUint32(s.Index, 1)) }
err := txn.Insert(DB_TABLE_NAME, rer); checkErr(err)
txn.Commit()

return true
}

// DeleteAllEventRows removes all rows within the memory store
func (s StorageMemory) DeleteAllEventRows() int64 {
txn := s.Db.Txn(true)
result, err := txn.DeleteAll(DB_TABLE_NAME, DB_COLUMN_ID); checkErr(err)
txn.Commit()

return int64(result)
}

// DeleteEventRows removes all rows with matching identifiers
func (s StorageMemory) DeleteEventRows(ids []int) int64 {
txn := s.Db.Txn(true)
deleteCount := 0

for _, id := range ids {
result, err := txn.DeleteAll(DB_TABLE_NAME, DB_COLUMN_ID, uint(id)); checkErr(err)
deleteCount += result
}

txn.Commit()

return int64(deleteCount)
}

// GetAllEventRows returns all rows within the memory store
func (s StorageMemory) GetAllEventRows() []EventRow {
eventItems := []EventRow{}
txn := s.Db.Txn(false)
defer txn.Abort()

result, err := txn.Get(DB_TABLE_NAME, DB_COLUMN_ID); checkErr(err)
for row := result.Next(); row != nil; row = result.Next() {
item := row.(*RawEventRowUint)
eventMap, _ := DeserializeMap(item.event)
eventItems = append(eventItems, EventRow{ int(item.id), Payload{ eventMap }})
}

return eventItems
}

// GetEventRowsWithinRange returns all available events or a maximal slice
func (s StorageMemory) GetEventRowsWithinRange(eventRange int) []EventRow {
eventItems := s.GetAllEventRows()
if len(eventItems) <= eventRange {
return eventItems
} else {
return eventItems[:eventRange]
}
}

// --- SQLite3 Storage Implementation

type StorageSQLite3 struct {
DbName string
}

func InitStorageSQLite3(dbName string) *StorageSQLite3 {
db, err := getDbConn(dbName)
checkErr(err)
defer db.Close()
Expand All @@ -60,7 +173,7 @@ func InitStorage(dbName string) *Storage {
_, err2 := db.Exec(query)
checkErr(err2)

return &Storage{ DbName: dbName }
return &StorageSQLite3{ DbName: dbName }
}

func getDbConn(dbName string) (*sql.DB, error) {
Expand All @@ -70,7 +183,7 @@ func getDbConn(dbName string) (*sql.DB, error) {
// --- ADD

// Add stores an event payload in the database.
func (s Storage) AddEventRow(payload Payload) bool {
func (s StorageSQLite3) AddEventRow(payload Payload) bool {
db, err := getDbConn(s.DbName)
checkErr(err)
defer db.Close()
Expand Down Expand Up @@ -104,7 +217,7 @@ func execAddStatement(stmt *sql.Stmt, byteBuffer []byte) bool {
// --- DELETE

// DeleteAllEventRows removes all events from the database.
func (s Storage) DeleteAllEventRows() int64 {
func (s StorageSQLite3) DeleteAllEventRows() int64 {
db, err := getDbConn(s.DbName)
checkErr(err)
defer db.Close()
Expand All @@ -114,7 +227,7 @@ func (s Storage) DeleteAllEventRows() int64 {
}

// DeleteEventRows removes a range of ids from the database.
func (s Storage) DeleteEventRows(ids []int) int64 {
func (s StorageSQLite3) DeleteEventRows(ids []int) int64 {
db, err := getDbConn(s.DbName)
checkErr(err)
defer db.Close()
Expand Down Expand Up @@ -148,7 +261,7 @@ func execDeleteQuery(db *sql.DB, query string) int64 {
// --- GET

// GetAllEventRows returns all events in the database.
func (s Storage) GetAllEventRows() []EventRow {
func (s StorageSQLite3) GetAllEventRows() []EventRow {
db, err := getDbConn(s.DbName)
checkErr(err)
defer db.Close()
Expand All @@ -158,7 +271,7 @@ func (s Storage) GetAllEventRows() []EventRow {
}

// GetEventRowsWithinRange returns a specified range of events from the database.
func (s Storage) GetEventRowsWithinRange(eventRange int) []EventRow {
func (s StorageSQLite3) GetEventRowsWithinRange(eventRange int) []EventRow {
db, err := getDbConn(s.DbName)
checkErr(err)
defer db.Close()
Expand Down
61 changes: 41 additions & 20 deletions tracker/storage_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,25 @@ import (
"github.com/stretchr/testify/assert"
)

// TestSubjectSetFunctions asserts behaviour of all of the Subject setter functions.
func TestStorageInit(t *testing.T) {
// TestStorageMemoryInit asserts behaviour of memdb storage functions.
func TestStorageMemoryInit(t *testing.T) {
assert := assert.New(t)
storage := *InitStorage("/home/vagrant/test.db")
storage := InitStorageMemory()
assert.NotNil(storage)
assert.NotNil(storage.Db)
}

// TestMemoryAddGetDeletePayload asserts ability to add, delete and get payloads.
func TestMemoryAddGetDeletePayload(t *testing.T) {
assert := assert.New(t)
storage := *InitStorageMemory()
assertDatabaseAddGetDeletePayload(assert, storage)
}

// TestStorageSQLite3Init asserts behaviour of SQLite storage functions.
func TestStorageSQLite3Init(t *testing.T) {
assert := assert.New(t)
storage := *InitStorageSQLite3("/home/vagrant/test.db")
assert.NotNil(storage)
assert.Equal("/home/vagrant/test.db", storage.DbName)

Expand All @@ -30,13 +45,32 @@ func TestStorageInit(t *testing.T) {
assert.NotNil(err)
}
}()
storage = *InitStorage("~/")
storage = *InitStorageSQLite3("~/")
}

// TestAddGetDeletePayload asserts ability to add, delete and get payloads.
func TestAddGetDeletePayload(t *testing.T) {
// TestSQLite3AddGetDeletePayload asserts ability to add, delete and get payloads.
func TestSQLite3AddGetDeletePayload(t *testing.T) {
assert := assert.New(t)
storage := *InitStorage("/home/vagrant/test.db")
storage := *InitStorageSQLite3("/home/vagrant/test.db")
assertDatabaseAddGetDeletePayload(assert, storage)
}

func TestSQLite3PanicRecovery(t *testing.T) {
assert := assert.New(t)

result := execDeleteQuery(nil, "")
assert.Equal(int64(0), result)

eventRows := execGetQuery(nil, "")
assert.Equal(0, len(eventRows))

addResult := execAddStatement(nil, nil)
assert.False(addResult)
}

// --- Common

func assertDatabaseAddGetDeletePayload(assert *assert.Assertions, storage Storage) {
storage.DeleteAllEventRows()
payload := *InitPayload()
payload.Add("e", NewString("pv"))
Expand Down Expand Up @@ -69,16 +103,3 @@ func TestAddGetDeletePayload(t *testing.T) {
assert.Equal(0, len(eventRows))
assert.Equal(int64(0), storage.DeleteEventRows([]int{}))
}

func TestPanicRecovery(t *testing.T) {
assert := assert.New(t)

result := execDeleteQuery(nil, "")
assert.Equal(int64(0), result)

eventRows := execGetQuery(nil, "")
assert.Equal(0, len(eventRows))

addResult := execAddStatement(nil, nil)
assert.False(addResult)
}
4 changes: 2 additions & 2 deletions tracker/tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestTrackFunctionsGET(t *testing.T) {
RequireEmitter(InitEmitter(
RequireCollectorUri("com.acme.collector"),
OptionRequestType("GET"),
OptionDbName("/home/vagrant/test.db"),
OptionStorage(*InitStorageMemory()),
OptionCallback(func(g []CallbackResult, b []CallbackResult) {
log.Println("Successes: " + IntToString(len(g)))
log.Println("Failures: " + IntToString(len(b)))
Expand Down Expand Up @@ -282,7 +282,7 @@ func TestTrackFunctionsFailingPOST(t *testing.T) {
RequireEmitter(InitEmitter(
RequireCollectorUri("com.acme.collector"),
OptionRequestType("POST"),
OptionDbName("/home/vagrant/test.db"),
OptionStorage(*InitStorageMemory()),
OptionCallback(func(g []CallbackResult, b []CallbackResult) {
log.Println("Successes: " + IntToString(len(g)))
log.Println("Failures: " + IntToString(len(b)))
Expand Down

0 comments on commit 5d0c22d

Please sign in to comment.