Skip to content

Commit

Permalink
Merge pull request #82 from scheduler0/service_tests
Browse files Browse the repository at this point in the history
Tests for service modules
  • Loading branch information
victorlenerd committed Jul 4, 2023
2 parents bd26981 + 7af9b96 commit 101ea3d
Show file tree
Hide file tree
Showing 67 changed files with 5,881 additions and 706 deletions.
2 changes: 1 addition & 1 deletion cmd/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var credentialCmd = &cobra.Command{
}
req, err := http.NewRequest(
"POST",
fmt.Sprintf("%s://%s:%s/credentials", configs.Protocol, configs.Host, configs.Port),
fmt.Sprintf("%s://%s:%s/v1/credentials", configs.Protocol, configs.Host, configs.Port),
bytes.NewReader(data),
)

Expand Down
89 changes: 25 additions & 64 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"path"
"scheduler0/constants"
"strconv"
"sync"
)

// RaftNode represents a node in a raft cluster, providing the necessary
Expand Down Expand Up @@ -45,7 +44,6 @@ type Scheduler0Configurations struct {
RaftAddress string `json:"raftAddress" yaml:"RaftAddress"` // Address used for raft communication
RaftTransportMaxPool uint64 `json:"raftTransportMaxPool" yaml:"RaftTransportMaxPool"` // Maximum size of the raft transport pool
RaftTransportTimeout uint64 `json:"raftTransportTimeout" yaml:"RaftTransportTimeout"` // Timeout for raft transport operations
RaftApplyTimeout uint64 `json:"raftApplyTimeout" yaml:"RaftApplyTimeout"` // Timeout for applying raft log entries
RaftSnapshotInterval uint64 `json:"raftSnapshotInterval" yaml:"RaftSnapshotInterval"` // Interval between raft snapshots
RaftSnapshotThreshold uint64 `json:"raftSnapshotThreshold" yaml:"RaftSnapshotThreshold"` // Threshold for raft snapshot creation
RaftHeartbeatTimeout uint64 `json:"raftHeartbeatTimeout" yaml:"RaftHeartbeatTimeout"` // Timeout for raft heartbeat
Expand All @@ -57,56 +55,46 @@ type Scheduler0Configurations struct {
JobExecutionRetryMax uint64 `json:"jobExecutionRetryMax" yaml:"JobExecutionRetryMax"` // Maximum number of retries for job execution
MaxWorkers uint64 `json:"maxWorkers" yaml:"MaxWorkers"` // Maximum number of concurrent workers
MaxQueue uint64 `json:"maxQueue" yaml:"MaxQueue"` // Maximum size of the job queue
JobQueueDebounceDelay uint64 `json:"jobQueueDebounceDelay" yaml:"JobQueueDebounceDelay"` // Delay for debouncing the job queue
MaxMemory uint64 `json:"maxMemory" yaml:"MaxMemory"` // Maximum amount of memory to be used by the scheduler
ExecutionLogFetchFanIn uint64 `json:"executionLogFetchFanIn" yaml:"ExecutionLogFetchFanIn"` // Fan-in factor for fetching execution logs
ExecutionLogFetchIntervalSeconds uint64 `json:"executionLogFetchIntervalSeconds" yaml:"ExecutionLogFetchIntervalSeconds"` // Interval between log fetches, in seconds
JobInvocationDebounceDelay uint64 `json:"jobInvocationDebounceDelay" yaml:"JobInvocationDebounceDelay"` // Delay for debouncing job invocation
HTTPExecutorPayloadMaxSizeMb uint64 `json:"httpExecutorPayloadMaxSizeMb" yaml:"HTTPExecutorPayloadMaxSizeMb"` // Maximum payload size for HTTP executor, in megabytes
}

var cachedConfig *Scheduler0Configurations
var once sync.Once

// GetConfigurations returns the cached Scheduler0Configurations if it exists,
// otherwise it reads the configuration file and caches it.
func (_ Scheduler0Configurations) GetConfigurations() *Scheduler0Configurations {
// Check if cachedConfig is not nil, then return it
if cachedConfig != nil {
return cachedConfig
}

// Ensure that the configuration is read and cached only once
once.Do(func() {
// Get the binary path
binPath := getBinPath()

// Create a new file system
fs := afero.NewOsFs()
// Read the configuration file
data, err := afero.ReadFile(fs, binPath+"/"+constants.ConfigFileName)
// If there is an error and it's not due to the file not existing, panic
if err != nil && !os.IsNotExist(err) {
panic(err)
}
// Get the binary path
binPath := getBinPath()

// Initialize an empty Scheduler0Configurations struct
config := Scheduler0Configurations{}
// Create a new file system
fs := afero.NewOsFs()
// Read the configuration file
data, err := afero.ReadFile(fs, binPath+"/"+constants.ConfigFileName)
// If there is an error and it's not due to the file not existing, panic
if err != nil && !os.IsNotExist(err) {
panic(err)
}

// If the error is due to the file not existing, get the configuration from environment variables
if os.IsNotExist(err) {
config = *getConfigFromEnv()
}
// Initialize an empty Scheduler0Configurations struct
config := Scheduler0Configurations{}

// Unmarshal the YAML data into the config struct
err = yaml.Unmarshal(data, &config)
// If there is an error in unmarshaling, panic
if err != nil {
panic(err)
}
// Cache the configuration
cachedConfig = &config
})
// If the error is due to the file not existing, get the configuration from environment variables
if os.IsNotExist(err) {
config = *getConfigFromEnv()
}

// Unmarshal the YAML data into the config struct
err = yaml.Unmarshal(data, &config)
// If there is an error in unmarshaling, panic
if err != nil {
panic(err)
}
// Cache the configuration
cachedConfig = &config

// Return the cached configuration
return cachedConfig
Expand Down Expand Up @@ -219,15 +207,6 @@ func getConfigFromEnv() *Scheduler0Configurations {
config.RaftTransportTimeout = parsed
}

// Set RaftApplyTimeout
if val, ok := os.LookupEnv("SCHEDULER0_RAFT_APPLY_TIMEOUT"); ok {
parsed, err := strconv.ParseUint(val, 10, 64)
if err != nil {
log.Fatalf("Error parsing SCHEDULER0_RAFT_APPLY_TIMEOUT: %v", err)
}
config.RaftApplyTimeout = parsed
}

// Set RaftSnapshotInterval
if val, ok := os.LookupEnv("SCHEDULER0_RAFT_SNAPSHOT_INTERVAL"); ok {
parsed, err := strconv.ParseUint(val, 10, 64)
Expand Down Expand Up @@ -318,15 +297,6 @@ func getConfigFromEnv() *Scheduler0Configurations {
config.MaxWorkers = parsed
}

// Set JobQueueDebounceDelay
if val, ok := os.LookupEnv("SCHEDULER0_JOB_QUEUE_DEBOUNCE_DELAY"); ok {
parsed, err := strconv.ParseUint(val, 10, 64)
if err != nil {
log.Fatalf("Error parsing SCHEDULER0_JOB_QUEUE_DEBOUNCE_DELAY: %v", err)
}
config.JobQueueDebounceDelay = parsed
}

// Set MaxMemory
if val, ok := os.LookupEnv("SCHEDULER0_MAX_MEMORY"); ok {
parsed, err := strconv.ParseUint(val, 10, 64)
Expand Down Expand Up @@ -354,15 +324,6 @@ func getConfigFromEnv() *Scheduler0Configurations {
config.ExecutionLogFetchIntervalSeconds = parsed
}

// Set JobInvocationDebounceDelay
if val, ok := os.LookupEnv("SCHEDULER0_JOB_INVOCATION_DEBOUNCE_DELAY"); ok {
parsed, err := strconv.ParseUint(val, 10, 64)
if err != nil {
log.Fatalf("Error parsing SCHEDULER0_JOB_INVOCATION_DEBOUNCE_DELAY: %v", err)
}
config.JobInvocationDebounceDelay = parsed
}

// Set HTTPExecutorPayloadMaxSizeMb
if val, ok := os.LookupEnv("SCHEDULER0_HTTP_EXECUTOR_PAYLOAD_MAX_SIZE_MB"); ok {
parsed, err := strconv.ParseUint(val, 10, 64)
Expand Down
37 changes: 31 additions & 6 deletions config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,35 +23,63 @@ func TestGetConfigurations(t *testing.T) {
func TestGetConfigFromEnv(t *testing.T) {
// Set environment variables
os.Setenv("SCHEDULER0_LOGLEVEL", "info")
defer os.Remove("SCHEDULER0_LOGLEVEL")
os.Setenv("SCHEDULER0_PROTOCOL", "http")
defer os.Remove("SCHEDULER0_PROTOCOL")
os.Setenv("SCHEDULER0_HOST", "localhost")
defer os.Remove("SCHEDULER0_HOST")
os.Setenv("SCHEDULER0_PORT", "8080")
defer os.Remove("SCHEDULER0_PORT")
os.Setenv("SCHEDULER0_REPLICAS", `[{"nodeId":1, "address":"localhost:12345"}]`)
defer os.Remove("SCHEDULER0_REPLICAS")
os.Setenv("SCHEDULER0_PEER_AUTH_REQUEST_TIMEOUT_MS", "5000")
defer os.Remove("SCHEDULER0_PEER_AUTH_REQUEST_TIMEOUT_MS")
os.Setenv("SCHEDULER0_PEER_CONNECT_RETRY_MAX", "3")
defer os.Remove("SCHEDULER0_PEER_CONNECT_RETRY_MAX")
os.Setenv("SCHEDULER0_PEER_CONNECT_RETRY_DELAY_SECONDS", "1")
defer os.Remove("SCHEDULER0_PEER_CONNECT_RETRY_DELAY_SECONDS")
os.Setenv("SCHEDULER0_BOOTSTRAP", "true")
os.Setenv("SCHEDULER0_NODE_ID", "1")
defer os.Remove("SCHEDULER0_BOOTSTRAP")
os.Setenv("SCHEDULER0_NODE_ID", "2")
defer os.Remove("SCHEDULER0_NODE_ID")
os.Setenv("SCHEDULER0_RAFT_ADDRESS", "localhost:12345")
defer os.Remove("SCHEDULER0_RAFT_ADDRESS")
os.Setenv("SCHEDULER0_RAFT_TRANSPORT_MAX_POOL", "3")
defer os.Remove("SCHEDULER0_RAFT_TRANSPORT_MAX_POOL")
os.Setenv("SCHEDULER0_RAFT_TRANSPORT_TIMEOUT", "1000")
os.Setenv("SCHEDULER0_RAFT_APPLY_TIMEOUT", "1000")
defer os.Remove("SCHEDULER0_RAFT_TRANSPORT_TIMEOUT")
os.Setenv("SCHEDULER0_RAFT_SNAPSHOT_INTERVAL", "1000")
defer os.Remove("SCHEDULER0_RAFT_SNAPSHOT_INTERVAL")
os.Setenv("SCHEDULER0_RAFT_SNAPSHOT_THRESHOLD", "1000")
defer os.Remove("SCHEDULER0_RAFT_SNAPSHOT_THRESHOLD")
os.Setenv("SCHEDULER0_RAFT_HEARTBEAT_TIMEOUT", "1000")
defer os.Remove("SCHEDULER0_RAFT_HEARTBEAT_TIMEOUT")
os.Setenv("SCHEDULER0_RAFT_ELECTION_TIMEOUT", "1000")
defer os.Remove("SCHEDULER0_RAFT_ELECTION_TIMEOUT")
os.Setenv("SCHEDULER0_RAFT_COMMIT_TIMEOUT", "1000")
defer os.Remove("SCHEDULER0_RAFT_COMMIT_TIMEOUT")
os.Setenv("SCHEDULER0_RAFT_MAX_APPEND_ENTRIES", "1000")
defer os.Remove("SCHEDULER0_RAFT_MAX_APPEND_ENTRIES")
os.Setenv("SCHEDULER0_JOB_EXECUTION_TIMEOUT", "1000")
defer os.Remove("SCHEDULER0_JOB_EXECUTION_TIMEOUT")
os.Setenv("SCHEDULER0_JOB_EXECUTION_RETRY_DELAY", "1000")
defer os.Remove("SCHEDULER0_JOB_EXECUTION_RETRY_DELAY")
os.Setenv("SCHEDULER0_JOB_EXECUTION_RETRY_MAX", "3")
defer os.Remove("SCHEDULER0_JOB_EXECUTION_RETRY_MAX")
os.Setenv("SCHEDULER0_MAX_WORKERS", "10")
defer os.Remove("SCHEDULER0_MAX_WORKERS")
os.Setenv("SCHEDULER0_JOB_QUEUE_DEBOUNCE_DELAY", "1000")
defer os.Remove("SCHEDULER0_JOB_QUEUE_DEBOUNCE_DELAY")
os.Setenv("SCHEDULER0_MAX_MEMORY", "1024")
defer os.Remove("SCHEDULER0_MAX_MEMORY")
os.Setenv("SCHEDULER0_EXECUTION_LOG_FETCH_FAN_IN", "2")
defer os.Remove("SCHEDULER0_EXECUTION_LOG_FETCH_FAN_IN")
os.Setenv("SCHEDULER0_EXECUTION_LOG_FETCH_INTERVAL_SECONDS", "10")
defer os.Remove("SCHEDULER0_EXECUTION_LOG_FETCH_INTERVAL_SECONDS")
os.Setenv("SCHEDULER0_JOB_INVOCATION_DEBOUNCE_DELAY", "1000")
defer os.Remove("SCHEDULER0_JOB_INVOCATION_DEBOUNCE_DELAY")
os.Setenv("SCHEDULER0_HTTP_EXECUTOR_PAYLOAD_MAX_SIZE_MB", "5")
defer os.Remove("SCHEDULER0_HTTP_EXECUTOR_PAYLOAD_MAX_SIZE_MB")

// Get configuration from environment variables
config := getConfigFromEnv()
Expand All @@ -70,11 +98,10 @@ func TestGetConfigFromEnv(t *testing.T) {
assert.Equal(t, uint64(3), config.PeerConnectRetryMax)
assert.Equal(t, uint64(1), config.PeerConnectRetryDelaySeconds)
assert.Equal(t, true, config.Bootstrap)
assert.Equal(t, uint64(1), config.NodeId)
assert.Equal(t, uint64(2), config.NodeId)
assert.Equal(t, "localhost:12345", config.RaftAddress)
assert.Equal(t, uint64(3), config.RaftTransportMaxPool)
assert.Equal(t, uint64(1000), config.RaftTransportTimeout)
assert.Equal(t, uint64(1000), config.RaftApplyTimeout)
assert.Equal(t, uint64(1000), config.RaftSnapshotInterval)
assert.Equal(t, uint64(1000), config.RaftSnapshotThreshold)
assert.Equal(t, uint64(1000), config.RaftHeartbeatTimeout)
Expand All @@ -85,10 +112,8 @@ func TestGetConfigFromEnv(t *testing.T) {
assert.Equal(t, uint64(1000), config.JobExecutionRetryDelay)
assert.Equal(t, uint64(3), config.JobExecutionRetryMax)
assert.Equal(t, uint64(10), config.MaxWorkers)
assert.Equal(t, uint64(1000), config.JobQueueDebounceDelay)
assert.Equal(t, uint64(1024), config.MaxMemory)
assert.Equal(t, uint64(2), config.ExecutionLogFetchFanIn)
assert.Equal(t, uint64(10), config.ExecutionLogFetchIntervalSeconds)
assert.Equal(t, uint64(1000), config.JobInvocationDebounceDelay)
assert.Equal(t, uint64(5), config.HTTPExecutorPayloadMaxSizeMb)
}
30 changes: 18 additions & 12 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ type dataStore struct {
dbFilePath string
fileLock sync.Mutex

isInMemDb bool
connectionLock sync.Mutex
connection *sql.DB

Expand All @@ -42,6 +43,7 @@ func NewSqliteDbConnection(logger hclog.Logger, dbFilePath string) DataStore {
return &dataStore{
dbFilePath: dbFilePath,
logger: logger,
isInMemDb: false,
}
}

Expand Down Expand Up @@ -109,21 +111,24 @@ func (db *dataStore) BeginTx(ctx context.Context, opts *sql.TxOptions) (*sql.Tx,
}

func (db *dataStore) RunMigration() {
fs := afero.NewOsFs()

err := fs.Remove(db.dbFilePath)
if err != nil && !os.IsNotExist(err) {
log.Fatalln(fmt.Errorf("Fatal failed to remove db file path error: %s \n", err))
}
_, err = fs.Create(db.dbFilePath)
if err != nil {
log.Fatalln(fmt.Errorf("Fatal db file creation error: %s \n", err))
}
if !db.isInMemDb {
fs := afero.NewOsFs()

datastore := NewSqliteDbConnection(db.logger, db.dbFilePath)
conn := datastore.OpenConnectionToExistingDB()
err := fs.Remove(db.dbFilePath)
if err != nil && !os.IsNotExist(err) {
log.Fatalln(fmt.Errorf("Fatal failed to remove db file path error: %s \n", err))
}
_, err = fs.Create(db.dbFilePath)
if err != nil {
log.Fatalln(fmt.Errorf("Fatal db file creation error: %s \n", err))
}

dbConnection := conn.(*sql.DB)
datastore := NewSqliteDbConnection(db.logger, db.dbFilePath)
db.connection = datastore.OpenConnectionToExistingDB().(*sql.DB)
}

dbConnection := db.connection

trx, dbConnErr := dbConnection.Begin()
if dbConnErr != nil {
Expand Down Expand Up @@ -181,6 +186,7 @@ func GetDBMEMConnection(logger hclog.Logger) DataStore {
logger.Error("ping error: failed to create in memory db: %v", err)
}
return &dataStore{
isInMemDb: true,
connection: conn,
}
}
Expand Down
7 changes: 3 additions & 4 deletions fsm/fsm_actions.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"scheduler0/db"
"scheduler0/models"
"scheduler0/protobuffs"
"scheduler0/scheduler0time"
"scheduler0/shared_repo"
"scheduler0/utils"
"time"
Expand Down Expand Up @@ -74,9 +75,7 @@ func (_ *scheduler0RaftActions) WriteCommandToRaftLog(
return nil, utils.HTTPGenericError(http.StatusInternalServerError, err.Error())
}

configs := config.NewScheduler0Config().GetConfigurations()

af := rft.Apply(createCommandData, time.Second*time.Duration(configs.RaftApplyTimeout)).(raft.ApplyFuture)
af := rft.Apply(createCommandData, time.Duration(15)*time.Second).(raft.ApplyFuture)
if af.Error() != nil {
if af.Error() == raft.ErrNotLeader {
return nil, utils.HTTPGenericError(http.StatusInternalServerError, "server not raft leader")
Expand Down Expand Up @@ -250,7 +249,7 @@ func insertJobQueue(logger hclog.Logger, command *protobuffs.Command, db db.Data
upperBound := jobIds[1].(float64)
lastVersion := jobIds[2].(float64)

schedulerTime := utils.GetSchedulerTime()
schedulerTime := scheduler0time.GetSchedulerTime()
now := schedulerTime.GetTime(time.Now())

insertBuilder := sq.Insert(constants.JobQueuesTableName).
Expand Down
11 changes: 5 additions & 6 deletions fsm/fsm_actions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func Test_WriteCommandToRaftLog_Executes_SQL(t *testing.T) {
sqliteDb.OpenConnectionToExistingDB()
scheduler0Store := NewFSMStore(logger, scheduler0RaftActions, sqliteDb)
cluster := raft.MakeClusterCustom(t, &raft.MakeClusterOpts{
Peers: 3,
Peers: 1,
Bootstrap: true,
Conf: raft.DefaultConfig(),
ConfigStoreFSM: false,
Expand Down Expand Up @@ -123,7 +123,7 @@ func Test_WriteCommandToRaftLog_Job_Queue(t *testing.T) {
sqliteDb.OpenConnectionToExistingDB()
scheduler0Store := NewFSMStore(logger, scheduler0RaftActions, sqliteDb)
cluster := raft.MakeClusterCustom(t, &raft.MakeClusterOpts{
Peers: 3,
Peers: 1,
Bootstrap: true,
Conf: raft.DefaultConfig(),
ConfigStoreFSM: false,
Expand Down Expand Up @@ -222,7 +222,7 @@ func Test_WriteCommandToRaftLog_Local_Data_Commit(t *testing.T) {

scheduler0Store := NewFSMStore(logger, scheduler0RaftActions, sqliteDb)
cluster := raft.MakeClusterCustom(t, &raft.MakeClusterOpts{
Peers: 3,
Peers: 1,
Bootstrap: true,
Conf: raft.DefaultConfig(),
ConfigStoreFSM: false,
Expand Down Expand Up @@ -261,7 +261,6 @@ func Test_WriteCommandToRaftLog_Local_Data_Commit(t *testing.T) {
}

params := models.CommitLocalData{
Address: "",
Data: models.LocalData{
ExecutionLogs: jobExecutionLogs,
AsyncTasks: asyncTasks,
Expand Down Expand Up @@ -314,7 +313,7 @@ func Test_WriteCommandToRaftLog_Stop_All_Jobs(t *testing.T) {
sqliteDb.OpenConnectionToExistingDB()
scheduler0Store := NewFSMStore(logger, scheduler0RaftActions, sqliteDb)
cluster := raft.MakeClusterCustom(t, &raft.MakeClusterOpts{
Peers: 3,
Peers: 1,
Bootstrap: true,
Conf: raft.DefaultConfig(),
ConfigStoreFSM: false,
Expand Down Expand Up @@ -356,7 +355,7 @@ func Test_WriteCommandToRaftLog_Recovery_All_Jobs(t *testing.T) {
sqliteDb.OpenConnectionToExistingDB()
scheduler0Store := NewFSMStore(logger, scheduler0RaftActions, sqliteDb)
cluster := raft.MakeClusterCustom(t, &raft.MakeClusterOpts{
Peers: 3,
Peers: 1,
Bootstrap: true,
Conf: raft.DefaultConfig(),
ConfigStoreFSM: false,
Expand Down

0 comments on commit 101ea3d

Please sign in to comment.