Skip to content

Commit

Permalink
Merge cc28985 into 23301d6
Browse files Browse the repository at this point in the history
  • Loading branch information
ycbydd committed Jul 30, 2017
2 parents 23301d6 + cc28985 commit ecb2472
Show file tree
Hide file tree
Showing 3 changed files with 72 additions and 3 deletions.
18 changes: 18 additions & 0 deletions core/model/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,11 @@ func CreateSnapshot(snapshot *Snapshot) error {
return err
}

func DeleteSnapshot(snapshot *Snapshot) error {
err := store.Delete(snapshot)
return err
}

func GetSnapshotList(from, size int, taskId string) (int, []Snapshot, error) {
var snapshots []Snapshot
queryO := store.Query{Sort: "create_time desc", From: from, Size: size}
Expand All @@ -105,6 +110,19 @@ func GetSnapshotList(from, size int, taskId string) (int, []Snapshot, error) {
return result.Total, snapshots, err
}

func GetSnapshotAllList(taskId string) (int, []Snapshot, error) {
var snapshots []Snapshot
queryO := store.Query{Sort: "create_time desc"}
if len(taskId) > 0 {
queryO.Conds = store.And(store.Eq("task_id", taskId))
}
err, result := store.Search(&snapshots, &queryO)
if err != nil {
log.Error(err)
}
return result.Total, snapshots, err
}

func GetSnapshot(id string) (Snapshot, error) {
snapshot := Snapshot{}
err := store.GetBy("id", id, &snapshot)
Expand Down
3 changes: 3 additions & 0 deletions gopa.yml
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,9 @@ modules:
parameters:
compress_enabled: true
bucket: "Snapshot"
snapshottime_toless: "1440,720,360,180,90,45,20,10,1"
snapshottime_tomore: "1,10,20,30,60,90,180,360,720,1440,2880,10080,21600"
snapshot_maxnum: 10
- joint: index
enabled: false
end:
Expand Down
54 changes: 51 additions & 3 deletions modules/crawler/pipe/save_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@ import (
"github.com/infinitbyte/gopa/core/store"
"github.com/infinitbyte/gopa/modules/config"
"time"
"strconv"
"strings"
)

const SaveSnapshotToDB JointKey = "save_snapshot_db"
Expand All @@ -35,6 +37,13 @@ type SaveSnapshotToDBJoint struct {

const compressEnabled ParaKey = "compress_enabled"
const bucket ParaKey = "bucket"
const snapshottimeToless ParaKey = "snapshottime_toless"
const snapshottimeTomore ParaKey = "snapshottime_tomore"
const snapshotMaxnum ParaKey = "snapshot_maxnum"

//minutes
var arrTimeToLess []int
var arrTimeToMore []int

func (this SaveSnapshotToDBJoint) Name() string {
return string(SaveSnapshotToDB)
Expand All @@ -44,6 +53,25 @@ func (this SaveSnapshotToDBJoint) Process(c *Context) error {
task := c.MustGet(CONTEXT_CRAWLER_TASK).(*model.Task)
snapshot := c.MustGet(CONTEXT_CRAWLER_SNAPSHOT).(*model.Snapshot)

//init snapshottimeToless
arrTimeToLessStr := strings.Split(this.MustGetString(snapshottimeToless),",")
arrTimeToLess = make([]int,len(arrTimeToLessStr),len(arrTimeToLessStr))
for i := 0; i < len(arrTimeToLessStr); i++ {
m,error := strconv.Atoi(arrTimeToLessStr[i])
if error == nil {
arrTimeToLess[i] = m
}
}
//init snapshottimeTomore
arrTimeToMoreStr := strings.Split(this.MustGetString(snapshottimeTomore),",")
arrTimeToMore = make([]int,len(arrTimeToMoreStr),len(arrTimeToMoreStr))
for i := 0; i < len(arrTimeToMoreStr); i++ {
m,error := strconv.Atoi(arrTimeToMoreStr[i])
if error == nil {
arrTimeToMore[i] = m
}
}

//update task's snapshot, detect duplicated snapshot
if snapshot != nil {

Expand Down Expand Up @@ -107,15 +135,35 @@ func (this SaveSnapshotToDBJoint) Process(c *Context) error {

model.CreateSnapshot(snapshot)

//delete old snapshot
//get current snapshot list and total num
snapshotTotal, snapshotsList, err := model.GetSnapshotAllList(task.ID)
if err == nil {
//get max snapshot num
maxSnapshotNum := this.MustGetInt64(snapshotMaxnum)
//if more than max snapshot num,delete old snapshot
if int64(snapshotTotal) > maxSnapshotNum {
mustDeleteNum := int64(snapshotTotal) - maxSnapshotNum
for i := 0; i < len(snapshotsList); i++ {
if i > 0 && i < len(snapshotsList)-1 && mustDeleteNum > 0 {
model.DeleteSnapshot(&snapshotsList[i])
store.DeleteValue(this.MustGetString(bucket),[]byte(snapshotsList[i].ID),snapshotsList[i].Payload)
mustDeleteNum -= 1
}
}
}
}


stats.IncrementBy("domain.stats", domain+"."+config.STATS_STORAGE_FILE_SIZE, int64(len(snapshot.Payload)))
stats.Increment("domain.stats", domain+"."+config.STATS_STORAGE_FILE_COUNT)



return nil
}

//minutes
var arrTimeToLess = [11]int{24 * 60, 12 * 60, 6 * 60, 3 * 60, 90, 45, 20, 10, 5, 2, 1}
var arrTimeToMore = [15]int{1, 2, 5, 10, 20, 30, 60, 90, 3 * 60, 6 * 60, 12 * 60, 24 * 60, 2 * 24 * 60, 7 * 24 * 60, 15 * 24 * 60}


func GetNextCheckTimeMinutes(fetchSuccess bool, tLastCheckTime time.Time, tNextCheckTime time.Time) int {
timeIntervalLast := GetTimeInterval(tLastCheckTime, tNextCheckTime)
Expand Down

0 comments on commit ecb2472

Please sign in to comment.