/
incr.go
118 lines (95 loc) 路 2.72 KB
/
incr.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
package spsync
import (
"context"
"fmt"
"strings"
"time"
"github.com/koltyakov/gosip/api"
)
// Incremental synchronization session flow
func incrSyncSession(ctx context.Context, o *Options) (*State, error) {
syncDate := time.Now()
o.Events.IncrSyncStarted(o.State.EntID)
sp := o.SP
ent := sp.Web().GetList(o.State.EntID)
tillToken, err := ent.Changes().GetCurrentToken()
if err != nil {
return o.State, err
}
completed := false
for !completed {
changeToken, changes, err := incrSyncPaged(ctx, ent, tillToken, o)
if err != nil {
return o.State, err
}
o.State.ChangeToken = changeToken
o.Persist(o.State)
if changes == 0 {
completed = true
}
}
// Success completion state update
o.State.PageToken = ""
o.State.Fails = 0
o.State.SyncDate = syncDate
o.State.ChangeToken = tillToken
o.State.SyncStage = ""
o.Events.IncrSyncFinished(o.State.EntID)
o.Persist(o.State)
return o.State, nil
}
// Change API paged responce processing
func incrSyncPaged(ctx context.Context, l *api.List, endToken string, o *Options) (string, int, error) {
o.Events.IncrSyncRequest(o.State.EntID, o.State.ChangeToken, endToken)
// Default 100 items per page is used for getting changes
// the page size increase is not recommended as change API returns only IDs
// and when IDs are used to construct requests to get specific items
changes, _ := l.Changes().Top(100).GetChanges(&api.ChangeQuery{
ChangeTokenStart: o.State.ChangeToken,
ChangeTokenEnd: endToken,
Item: true,
Restore: true,
Add: true,
DeleteObject: true,
Update: true,
SystemUpdate: true,
})
changesCnt := len(changes.Data())
if changesCnt == 0 {
return "", 0, nil
}
deleteChangeType := 3
// Upserted
var upsertIds []int
for _, ch := range changes.Data() {
if ch.ChangeType != deleteChangeType {
upsertIds = append(upsertIds, ch.ItemID)
}
}
if len(upsertIds) > 0 {
query := l.Items().Conf(api.HeadersPresets.Minimalmetadata).Top(len(upsertIds))
query = appendOData(query, o.Ent)
var filters []string
for _, id := range upsertIds {
filters = append(filters, fmt.Sprintf("Id eq %d", id))
}
items, err := query.Filter(strings.Join(filters, " or ")).Get()
if err != nil {
return o.State.ChangeToken, changesCnt, err
}
if err := o.Upsert(ctx, itemsToUpsert(items)); err != nil {
return o.State.ChangeToken, changesCnt, err
}
}
// Deleted
var deleteIds []int
for _, ch := range changes.Data() {
if ch.ChangeType == deleteChangeType {
deleteIds = append(deleteIds, ch.ItemID)
}
}
if err := o.Delete(ctx, deleteIds); err != nil {
return o.State.ChangeToken, changesCnt, err
}
return changes.Data()[changesCnt-1].ChangeToken.StringValue, changesCnt, nil
}