/
full.go
135 lines (109 loc) 路 2.76 KB
/
full.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
package spsync
import (
"context"
"net/url"
"time"
"github.com/koltyakov/gosip/api"
)
// Full synchronization session flow
func fullSyncSession(ctx context.Context, o *Options) (*State, error) {
var syncStart time.Time
isBlankSync := o.State.PageToken == ""
o.Events.FullSyncStarted(o.State.EntID, isBlankSync)
sp := o.SP
l := sp.Web().GetList(o.State.EntID)
// Save current change token and timestamp for new blank sync
if isBlankSync {
syncStart = time.Now()
token, err := l.Changes().GetCurrentToken()
if err != nil {
return o.State, err
}
o.State.ChangeToken = token
o.State.SyncMode = Full
} else {
// For full sync continue sessions keep state values
syncStart = o.State.SyncDate
}
// Sync stage dependent actions
// Getting upsert changes
if o.State.SyncStage == "Upsert" || o.State.SyncStage == "" {
done := false
for !done {
pageToken, err := fullSyncUpsert(ctx, l, o)
if err != nil {
return o.State, err
}
o.State.PageToken = pageToken
o.Persist(o.State)
if pageToken == "" {
done = true
}
}
o.State.SyncStage = "Delete"
o.Persist(o.State)
}
// Getting delete changes
if o.State.SyncStage == "Delete" {
if err := fullSyncDelete(ctx, l, o); err != nil {
return o.State, err
}
}
// Success completion state update
o.State.PageToken = ""
o.State.Fails = 0
o.State.SyncDate = syncStart
o.State.SyncStage = ""
o.State.SyncMode = Incr
o.Events.FullSyncFinished(o.State.EntID, isBlankSync)
o.Persist(o.State)
return o.State, nil
}
// Upserts processing flow
func fullSyncUpsert(ctx context.Context, l *api.List, o *Options) (string, error) {
top := defaultPageSize
if o.Ent.Top > 0 {
top = o.Ent.Top
}
query := l.Items().Conf(api.HeadersPresets.Minimalmetadata).Top(top)
if o.State.PageToken != "" {
query = query.Skip(o.State.PageToken)
}
query = appendOData(query, o.Ent)
o.Events.FullSyncRequest(o.State.EntID, query.ToURL())
items, err := query.Get()
if err != nil {
return o.State.PageToken, err
}
if err := o.Upsert(ctx, itemsToUpsert(items)); err != nil {
return o.State.PageToken, err
}
u, err := url.Parse(items.NextPageURL())
if err != nil {
return o.State.PageToken, err
}
token := u.Query().Get("$skiptoken")
return token, nil
}
// Deletions processing flow
func fullSyncDelete(ctx context.Context, l *api.List, o *Options) error {
items, err := l.Items().Conf(api.HeadersPresets.Minimalmetadata).Select("Id").Top(5000).GetAll()
if err != nil {
return err
}
// Find all missed IDs in items sequence
var ids []int
prevID := 0
for _, item := range items {
currID := item.Data().ID
for prevID+1 != currID {
ids = append(ids, prevID+1)
prevID++
}
prevID = currID
}
if err := o.Delete(ctx, ids); err != nil {
return err
}
return nil
}