Skip to content
Browse files

more stats and job touching

  • Loading branch information...
1 parent 6c959bc commit 22b57f0a889f2c26989afa6573f3b40813d0cc3a @manveru committed
Showing with 145 additions and 113 deletions.
  1. +24 −22 cmd.go
  2. +60 −74 gostalkc/gostalkc.go
  3. +12 −12 gostalkc/gostalkc_test.go
  4. +5 −0 job.go
  5. +4 −0 ready_jobs.go
  6. +7 −0 reserved_jobs.go
  7. +33 −5 tube.go
View
46 cmd.go
@@ -346,27 +346,18 @@ func (cmd *Cmd) statsTube() {
}
stats := &map[string]interface{}{
- "name": tube.name, // is the tube's name.
- "current-jobs-urgent": tube.statUrgent, // is the number of ready jobs with priority < 1024 in this tube.
- "current-jobs-ready": tube.statReady, // is the number of jobs in the ready
- // queue in this tube.
- "current-jobs-reserved": tube.statReserved, // is the number of jobs
- // reserved by all clients in
- // this tube.
- "current-jobs-delayed": tube.statDelayed, // is the number of delayed jobs
- // in this tube.
- "current-jobs-buried": tube.statBuried, // is the number of buried jobs in
- // this tube.
- "total-jobs": tube.statTotalJobs, // is the cumulative count of jobs
- // created in this tube in the current
- // beanstalkd process.
- "current-waiting": 0, // TODO: is the number of open connections that have
- // issued a reserve command while watching this tube
- // but not yet received a response.
- "pause": tube.pausedDuration(), // is the number of seconds the tube has been paused for.
- "cmd-delete": tube.statDeleted, // is the cumulative number of delete commands for this tube
- "cmd-pause-tube": tube.statPaused, // is the cumulative number of pause-tube commands for this tube.
- "pause-time-left": tube.pauseTimeLeft(), // is the number of seconds until the tube is un-paused.
+ "name": tube.name, // is the tube's name.
+ "current-jobs-urgent": tube.statUrgent, // is the number of ready jobs with priority < 1024 in this tube.
+ "current-jobs-ready": tube.statReady, // is the number of jobs in the ready queue in this tube.
+ "current-jobs-reserved": tube.statReserved, // is the number of jobs reserved by all clients in this tube.
+ "current-jobs-delayed": tube.statDelayed, // is the number of delayed jobs in this tube.
+ "current-jobs-buried": tube.statBuried, // is the number of buried jobs in this tube.
+ "total-jobs": tube.statTotalJobs, // is the cumulative count of jobs created in this tube in the current beanstalkd process.
+ "current-waiting": 0, // TODO: is the number of open connections that have issued a reserve command while watching this tube but not yet received a response.
+ "cmd-delete": tube.statDeleted, // is the cumulative number of delete commands for this tube
+ "cmd-pause-tube": tube.statPaused, // is the cumulative number of pause-tube commands for this tube.
+ "pause": tube.pausedDuration(), // is the number of seconds the tube has been paused for.
+ "pause-time-left": tube.pauseTimeLeft(), // is the number of seconds until the tube is un-paused.
}
yaml, err := goyaml.Marshal(stats)
@@ -379,7 +370,18 @@ func (cmd *Cmd) statsTube() {
cmd.respond(fmt.Sprintf("OK %d\r\n%s\r\n", len(yaml), yaml))
}
func (cmd *Cmd) touch() {
- cmd.respond(MSG_INTERNAL_ERROR)
+ cmd.assertNumberOfArguments(1)
+ jobId := JobId(cmd.getInt(0))
+
+ job, found := cmd.server.findJob(jobId)
+
+ if !found {
+ cmd.respond(MSG_NOT_FOUND)
+ return
+ }
+
+ job.touch()
+ cmd.respond(MSG_TOUCHED)
}
func (cmd *Cmd) use() {
View
134 gostalkc/gostalkc.go
@@ -14,17 +14,18 @@ import (
)
type Instance interface {
- Watch(tubeName string) error
- ListTubes() ([]string, error)
- ListTubeUsed() (string, error)
- ListTubesWatched() ([]string, error)
- Delete(jobId uint64) error
- Touch(jobId uint64) error
- StatsJob(jobId uint64) (map[string]interface{}, error)
- ReserveWithTimeout(int) (uint64, []byte, error)
- Put(uint32, uint64, uint64, []byte) (uint64, bool, error)
- Ignore(tubeName string) (uint64, error)
- Reserve() (uint64, []byte, error)
+ Delete(jobId uint64) (err error)
+ Ignore(tubeName string) (tubesLeft uint64, err error)
+ ListTubes() (tubeNames []string, err error)
+ ListTubesWatched() (tubeNames []string, err error)
+ ListTubeUsed() (tubeName string, err error)
+ Put(priority uint32, delay uint64, ttr uint64, jobData []byte) (jobId uint64, buried bool, err error)
+ Reserve() (jobId uint64, jobData []byte, err error)
+ ReserveWithTimeout(seconds int) (jobId uint64, jobData []byte, err error)
+ StatsJob(jobId uint64) (stats map[string]interface{}, err error)
+ StatsTube(tubeName string) (stats map[string]interface{}, err error)
+ Touch(jobId uint64) (err error)
+ Watch(tubeName string) (err error)
}
type instance struct {
@@ -34,18 +35,18 @@ type instance struct {
const (
BURIED = "BURIED"
- USING = "USING"
- OK = "OK"
DELETED = "DELETED"
DRAINING = "DRAINING"
- TIMED_OUT = "TIMED_OUT"
EXPECTED_CRLF = "EXPECTED_CRLF"
INSERTED = "INSERTED"
JOB_TOO_BIG = "JOB_TOO_BIG"
NOT_FOUND = "NOT_FOUND"
NOT_IGNORED = "NOT_IGNORED"
+ OK = "OK"
RESERVED = "RESERVED"
+ TIMED_OUT = "TIMED_OUT"
TOUCHED = "TOUCHED"
+ USING = "USING"
WATCHING = "WATCHING"
)
@@ -58,9 +59,10 @@ const (
msgPut = "put %d %d %d %d\r\n%s\r\n"
msgReserve = "reserve\r\n"
msgReserveWithTimeout = "reserve-with-timeout %d\r\n"
+ msgStatsJob = "stats-job %d\r\n"
+ msgStatsTube = "stats-tube %s\r\n"
msgTouch = "touch %d\r\n"
msgWatch = "watch %s\r\n"
- msgStatsJob = "stats-job %d\r\n"
)
var (
@@ -186,47 +188,23 @@ func (i *instance) Put(priority uint32, delay, ttr uint64, data []byte) (jobId u
return
}
-func (i *instance) Reserve() (jobId uint64, data []byte, err error) {
- i.write(msgReserve)
-
- line, err := i.readLine()
+func (i *instance) Reserve() (jobId uint64, jobData []byte, err error) {
+ words, err := i.wordsCmd(msgReserve)
if err != nil {
return
}
- words := strings.Split(line, " ")
-
switch words[0] {
case RESERVED:
- jobId, err = strconv.ParseUint(words[1], 10, 64)
- if err != nil {
- return
- }
-
- var dataLen uint64
- dataLen, err = strconv.ParseUint(words[2], 10, 64)
- if err != nil {
- return
- }
-
- data = make([]byte, dataLen+2)
- var n int
- n, err = i.readWriter.Read(data)
- if err != nil {
- return
- }
- if n != len(data) {
- err = Exception(fmt.Sprintf("read only %d bytes of %d", n, len(data)))
- return
- }
-
- data = data[:len(data)-2]
+ jobId, jobData, err = i.readJob(words[1:3])
+ default:
+ err = Exception(words[0])
}
return
}
-func (i *instance) ReserveWithTimeout(timeout int) (jobId uint64, data []byte, err error) {
+func (i *instance) ReserveWithTimeout(timeout int) (jobId uint64, jobData []byte, err error) {
words, err := i.wordsCmd(fmt.Sprintf(msgReserveWithTimeout, timeout))
if err != nil {
return
@@ -234,29 +212,7 @@ func (i *instance) ReserveWithTimeout(timeout int) (jobId uint64, data []byte, e
switch words[0] {
case RESERVED:
- jobId, err = strconv.ParseUint(words[1], 10, 64)
- if err != nil {
- return
- }
-
- var dataLen uint64
- dataLen, err = strconv.ParseUint(words[2], 10, 64)
- if err != nil {
- return
- }
-
- data = make([]byte, dataLen+2)
- var n int
- n, err = i.readWriter.Read(data)
- if err != nil {
- return
- }
- if n != len(data) {
- err = Exception(fmt.Sprintf("read only %d bytes of %d", n, len(data)))
- return
- }
-
- data = data[:len(data)-2]
+ jobId, jobData, err = i.readJob(words[1:len(words)])
default:
err = Exception(words[0])
}
@@ -265,18 +221,16 @@ func (i *instance) ReserveWithTimeout(timeout int) (jobId uint64, data []byte, e
}
func (i *instance) Touch(jobId uint64) (err error) {
- i.write(fmt.Sprintf(msgTouch, jobId))
-
- line, err := i.readLine()
+ words, err := i.wordsCmd(fmt.Sprintf(msgTouch, jobId))
if err != nil {
return
}
- switch line {
+ switch words[0] {
case TOUCHED:
return
- case NOT_FOUND:
- err = Exception(NOT_FOUND)
+ default:
+ err = Exception(words[0])
}
return
@@ -302,6 +256,11 @@ func (i *instance) StatsJob(jobId uint64) (stats map[string]interface{}, err err
return
}
+func (i *instance) StatsTube(tubeName string) (stats map[string]interface{}, err error) {
+ err = i.yamlCmd(fmt.Sprintf(msgStatsTube, tubeName), &stats)
+ return
+}
+
func (i *instance) write(line string) (err error) {
logger.Printf("i.write %#v\n", line)
n, err := i.readWriter.WriteString(line)
@@ -380,3 +339,30 @@ func (i *instance) yamlCmd(command string, dest interface{}) (err error) {
err = goyaml.Unmarshal(rawYaml[:len(rawYaml)-1], dest)
return err
}
+
+func (i *instance) readJob(args []string) (jobId uint64, jobData []byte, err error) {
+ jobId, err = strconv.ParseUint(args[0], 10, 64)
+ if err != nil {
+ return
+ }
+
+ var jobDataLen uint64
+ jobDataLen, err = strconv.ParseUint(args[1], 10, 64)
+ if err != nil {
+ return
+ }
+
+ jobData = make([]byte, jobDataLen+2)
+ var n int
+ n, err = i.readWriter.Read(jobData)
+ if err != nil {
+ return
+ }
+ if n != len(jobData) {
+ err = Exception(fmt.Sprintf("read only %d bytes of %d", n, len(jobData)))
+ return
+ }
+
+ jobData = jobData[:len(jobData)-2]
+ return
+}
View
24 gostalkc/gostalkc_test.go
@@ -79,18 +79,18 @@ func init() {
It("returns stats about a given tube", func() {
stats, err := i.StatsTube("testing")
Expect(err, ToBeNil)
- Expect(stats["id"], ToEqual, 1)
- Expect(stats["tube"], ToEqual, "default")
- Expect(stats["state"], ToEqual, "ready")
- Expect(stats["pri"], ToEqual, 42)
- Expect(stats["age"], ToNotBeNil) // TODO: do at least a rough delta compare
- Expect(stats["time-left"], ToEqual, 0)
- Expect(stats["file"], ToEqual, 0)
- Expect(stats["reserves"], ToEqual, 0)
- Expect(stats["releases"], ToEqual, 0)
- Expect(stats["timeouts"], ToEqual, 0)
- Expect(stats["buries"], ToEqual, 0)
- Expect(stats["kicks"], ToEqual, 0)
+ Expect(stats["name"], ToEqual, "testing")
+ Expect(stats["current-jobs-urgent"], ToEqual, 0)
+ Expect(stats["current-jobs-ready"], ToEqual, 0)
+ Expect(stats["current-jobs-reserved"], ToEqual, 0) // TODO: do at least a rough delta compare
+ Expect(stats["current-jobs-delayed"], ToEqual, 0)
+ Expect(stats["current-jobs-buried"], ToEqual, 0)
+ Expect(stats["total-jobs"], ToEqual, 0)
+ Expect(stats["current-waiting"], ToEqual, 0)
+ Expect(stats["cmd-delete"], ToEqual, 0)
+ Expect(stats["cmd-pause-tube"], ToEqual, 0)
+ Expect(stats["pause"], ToEqual, 0)
+ Expect(stats["pause-time-left"], ToEqual, 0)
})
})
View
5 job.go
@@ -14,6 +14,7 @@ const (
type JobHolder interface {
deleteJob(*Job)
+ touchJob(*Job)
}
type JobId uint64
@@ -77,3 +78,7 @@ func (job *Job) deleteFrom(server *Server) {
delete(server.jobs, job.id)
job.tube.jobDelete <- job
}
+
+func (job *Job) touch() {
+ job.tube.jobTouch <- job
+}
View
4 ready_jobs.go
@@ -36,3 +36,7 @@ func (jobs *readyJobs) putJob(job *Job) {
func (jobs *readyJobs) deleteJob(job *Job) {
jobs.Remove(job.index)
}
+
+func (jobs *readyJobs) touchJob(job *Job) {
+ // nothing to do here
+}
View
7 reserved_jobs.go
@@ -2,6 +2,7 @@ package gostalk
import (
"code.google.com/p/go-priority-queue/prio"
+ "time"
)
type reservedJobsItem Job
@@ -36,3 +37,9 @@ func (jobs *reservedJobs) putJob(job *Job) {
func (jobs *reservedJobs) deleteJob(job *Job) {
jobs.Remove(job.index)
}
+
+func (jobs *reservedJobs) touchJob(job *Job) {
+ jobs.Remove(job.index)
+ job.reserveEndsAt = time.Now().Add(job.timeToReserve)
+ jobs.Push((*reservedJobsItem)(job))
+}
View
38 tube.go
@@ -1,5 +1,9 @@
package gostalk
+import (
+ "time"
+)
+
type jobReserveRequest struct {
success chan *Job
cancel chan bool
@@ -15,12 +19,19 @@ type Tube struct {
jobDemand chan *jobReserveRequest
jobSupply chan *Job
jobDelete chan *Job
+ jobTouch chan *Job
+
+ pauseStartedAt time.Time
+ pauseEndsAt time.Time
- statUrgent int
- statReady int
- statReserved int
- statDelayed int
- statBuried int
+ statUrgent int
+ statReady int
+ statReserved int
+ statDelayed int
+ statBuried int
+ statTotalJobs int
+ statDeleted int
+ statPaused int
}
func newTube(name string) (tube *Tube) {
@@ -33,6 +44,7 @@ func newTube(name string) (tube *Tube) {
jobDemand: make(chan *jobReserveRequest),
jobSupply: make(chan *Job),
jobDelete: make(chan *Job),
+ jobTouch: make(chan *Job),
}
go tube.handleDemand()
@@ -48,6 +60,8 @@ func (tube *Tube) handleDemand() {
tube.delete(job)
case job := <-tube.jobSupply:
tube.put(job)
+ case job := <-tube.jobTouch:
+ tube.touch(job)
case request := <-tube.jobDemand:
select {
case request.success <- tube.reserve():
@@ -59,6 +73,8 @@ func (tube *Tube) handleDemand() {
select {
case job := <-tube.jobDelete:
tube.delete(job)
+ case job := <-tube.jobTouch:
+ tube.touch(job)
case job := <-tube.jobSupply:
tube.put(job)
}
@@ -95,3 +111,15 @@ func (tube *Tube) put(job *Job) {
func (tube *Tube) delete(job *Job) {
job.jobHolder.deleteJob(job)
}
+
+func (tube *Tube) touch(job *Job) {
+ job.jobHolder.touchJob(job)
+}
+
+func (tube *Tube) pauseTimeLeft() (seconds time.Duration) {
+ return
+}
+
+func (tube *Tube) pausedDuration() (seconds time.Duration) {
+ return
+}

0 comments on commit 22b57f0

Please sign in to comment.
Something went wrong with that request. Please try again.