Skip to content

Commit 2bf5aa4

Browse files
Add support for HA and multishard functionality in import APIs
1 parent 5cd5bff commit 2bf5aa4

File tree

13 files changed

+1299
-797
lines changed

13 files changed

+1299
-797
lines changed

dgraph/cmd/dgraphimport/import_client.go

Lines changed: 81 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ func newClient(endpoint string, opts grpc.DialOption) (apiv2.DgraphClient, error
3030
return nil, fmt.Errorf("failed to connect to endpoint [%s]: %w", endpoint, err)
3131
}
3232

33-
glog.Infof("Successfully connected to Dgraph endpoint: %s", endpoint)
33+
glog.Infof("[import] Successfully connected to Dgraph endpoint: %s", endpoint)
3434
return apiv2.NewDgraphClient(conn), nil
3535
}
3636

@@ -39,120 +39,160 @@ func Import(ctx context.Context, endpoint string, opts grpc.DialOption, bulkOutD
3939
if err != nil {
4040
return err
4141
}
42-
resp, err := startPDirStream(ctx, dg)
42+
resp, err := initiateSnapshotStream(ctx, dg)
4343
if err != nil {
4444
return err
4545
}
4646

47-
return sendPDir(ctx, dg, bulkOutDir, resp.Groups)
47+
return streamSnapshot(ctx, dg, bulkOutDir, resp.Groups)
4848
}
4949

50-
// startPDirStream initiates a snapshot stream session with the Dgraph server.
51-
func startPDirStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.InitiatePDirStreamResponse, error) {
52-
glog.Info("Initiating pdir stream")
53-
req := &apiv2.InitiatePDirStreamRequest{}
54-
resp, err := dc.InitiatePDirStream(ctx, req)
50+
// initiateSnapshotStream initiates a snapshot stream session with the Dgraph server.
51+
func initiateSnapshotStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.UpdateExtSnapshotStreamingStateResponse, error) {
52+
glog.Info("[import] Initiating external snapshot stream")
53+
req := &apiv2.UpdateExtSnapshotStreamingStateRequest{
54+
Start: true,
55+
}
56+
resp, err := dc.UpdateExtSnapshotStreamingState(ctx, req)
5557
if err != nil {
56-
glog.Errorf("failed to initiate pdir stream: %v", err)
57-
return nil, fmt.Errorf("failed to initiate pdir stream: %v", err)
58+
glog.Errorf("[import] failed to initiate external snapshot stream: %v", err)
59+
return nil, fmt.Errorf("failed to initiate external snapshot stream: %v", err)
5860
}
59-
glog.Info("Pdir stream initiated successfully")
61+
glog.Info("[import] External snapshot stream initiated successfully")
6062
return resp, nil
6163
}
6264

63-
// sendPDir takes a p directory and a set of group IDs and streams the data from the
65+
// streamSnapshot takes a p directory and a set of group IDs and streams the data from the
6466
// p directory to the corresponding group IDs. It first scans the provided directory for
6567
// subdirectories named with numeric group IDs.
66-
func sendPDir(ctx context.Context, dg apiv2.DgraphClient, baseDir string, groups []uint32) error {
67-
glog.Infof("Starting to stream pdir from directory: %s", baseDir)
68+
func streamSnapshot(ctx context.Context, dc apiv2.DgraphClient, baseDir string, groups []uint32) error {
69+
glog.Infof("[import] Starting to stream snapshot from directory: %s", baseDir)
6870

69-
errG, ctx := errgroup.WithContext(ctx)
71+
errG, errGrpCtx := errgroup.WithContext(ctx)
7072
for _, group := range groups {
71-
group := group
7273
errG.Go(func() error {
7374
pDir := filepath.Join(baseDir, fmt.Sprintf("%d", group-1), "p")
7475
if _, err := os.Stat(pDir); err != nil {
7576
return fmt.Errorf("p directory does not exist for group [%d]: [%s]", group, pDir)
7677
}
77-
78-
glog.Infof("Streaming data for group [%d] from directory: [%s]", group, pDir)
79-
if err := streamData(ctx, dg, pDir, group); err != nil {
80-
glog.Errorf("Failed to stream data for groups [%v] from directory: [%s]: %v", group, pDir, err)
78+
glog.Infof("[import] Streaming data for group [%d] from directory: [%s]", group, pDir)
79+
if err := streamSnapshotForGroup(errGrpCtx, dc, pDir, group); err != nil {
80+
glog.Errorf("[import] Failed to stream data for group [%v] from directory: [%s]: %v", group, pDir, err)
8181
return err
8282
}
8383

8484
return nil
8585
})
8686
}
87+
8788
if err := errG.Wait(); err != nil {
89+
glog.Errorf("[import] failed to stream external snapshot: %v", err)
90+
// If errors occurs during streaming of the external snapshot, we drop all the data and
91+
// go back to ensure a clean slate and the cluster remains in working state.
92+
req := &apiv2.UpdateExtSnapshotStreamingStateRequest{
93+
Start: false,
94+
Finish: true,
95+
DropData: true,
96+
}
97+
if _, err := dc.UpdateExtSnapshotStreamingState(ctx, req); err != nil {
98+
return fmt.Errorf("failed to turn off drain mode: %v", err)
99+
}
100+
101+
glog.Info("[import] successfully disabled drain mode")
88102
return err
89103
}
90104

91-
glog.Infof("Completed streaming all pdirs")
105+
glog.Info("[import] Completed streaming external snapshot")
106+
req := &apiv2.UpdateExtSnapshotStreamingStateRequest{
107+
Start: false,
108+
Finish: true,
109+
DropData: false,
110+
}
111+
if _, err := dc.UpdateExtSnapshotStreamingState(ctx, req); err != nil {
112+
glog.Errorf("[import] failed to disable drain mode: %v", err)
113+
return fmt.Errorf("failed to disable drain mode: %v", err)
114+
}
115+
glog.Info("[import] successfully disable drain mode")
92116
return nil
93117
}
94118

95-
// streamData handles the actual data streaming process for a single group.
119+
// streamSnapshotForGroup handles the actual data streaming process for a single group.
96120
// It opens the BadgerDB at the specified directory and streams all data to the server.
97-
func streamData(ctx context.Context, dg apiv2.DgraphClient, pdir string, groupId uint32) error {
121+
func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir string, groupId uint32) error {
98122
glog.Infof("Opening stream for group %d from directory %s", groupId, pdir)
99123

100124
// Initialize stream with the server
101-
out, err := dg.StreamPDir(ctx)
125+
out, err := dc.StreamExtSnapshot(ctx)
102126
if err != nil {
103-
return fmt.Errorf("failed to start pdir stream for group %d: %w", groupId, err)
127+
return fmt.Errorf("failed to start external snapshot stream for group %d: %w", groupId, err)
104128
}
105129

130+
defer func() {
131+
if _, err := out.CloseAndRecv(); err != nil {
132+
glog.Errorf("[import] failed to receive ACK for group [%v]: %v", groupId, err)
133+
}
134+
135+
glog.Infof("Group [%v]: Received ACK ", groupId)
136+
}()
137+
106138
// Open the BadgerDB instance at the specified directory
107139
opt := badger.DefaultOptions(pdir)
108140
ps, err := badger.OpenManaged(opt)
109141
if err != nil {
110-
return fmt.Errorf("failed to open BadgerDB at [%s]: %w", pdir, err)
142+
glog.Errorf("failed to open BadgerDB at [%s]: %v", pdir, err)
143+
return fmt.Errorf("failed to open BadgerDB at [%v]: %v", pdir, err)
111144
}
112145

113146
defer func() {
114147
if err := ps.Close(); err != nil {
115-
glog.Warningf("Error closing BadgerDB: %v", err)
148+
glog.Warningf("[import] Error closing BadgerDB: %v", err)
116149
}
117150
}()
118151

119152
// Send group ID as the first message in the stream
120-
glog.Infof("Sending group ID [%d] to server", groupId)
121-
groupReq := &apiv2.StreamPDirRequest{GroupId: groupId}
153+
glog.Infof("[import] Sending group ID [%v] to server", groupId)
154+
groupReq := &apiv2.StreamExtSnapshotRequest{GroupId: groupId}
122155
if err := out.Send(groupReq); err != nil {
123-
return fmt.Errorf("failed to send group ID [%d]: %w", groupId, err)
156+
return fmt.Errorf("failed to send group ID [%v]: %v", groupId, err)
124157
}
125158

126159
// Configure and start the BadgerDB stream
127-
glog.Infof("Starting BadgerDB stream for group [%d]", groupId)
160+
glog.Infof("[import] Starting BadgerDB stream for group [%v]", groupId)
161+
162+
if err := streamBadger(ctx, ps, out, groupId); err != nil {
163+
return fmt.Errorf("badger streaming failed for group [%v]: %v", groupId, err)
164+
}
165+
166+
return nil
167+
}
168+
169+
// streamBadger runs a BadgerDB stream to send key-value pairs to the specified group.
170+
// It creates a new stream at the maximum sequence number and sends the data to the specified group.
171+
// It also sends a final 'done' signal to mark completion.
172+
func streamBadger(ctx context.Context, ps *badger.DB, out apiv2.Dgraph_StreamExtSnapshotClient, groupId uint32) error {
128173
stream := ps.NewStreamAt(math.MaxUint64)
129-
stream.LogPrefix = fmt.Sprintf("Sending P dir for group [%d]", groupId)
174+
stream.LogPrefix = "[import] Sending external snapshot to group [" + fmt.Sprintf("%d", groupId) + "]"
130175
stream.KeyToList = nil
131176
stream.Send = func(buf *z.Buffer) error {
132177
p := &apiv2.StreamPacket{Data: buf.Bytes()}
133-
if err := out.Send(&apiv2.StreamPDirRequest{StreamPacket: p}); err != nil && !errors.Is(err, io.EOF) {
178+
if err := out.Send(&apiv2.StreamExtSnapshotRequest{Pkt: p}); err != nil && !errors.Is(err, io.EOF) {
134179
return fmt.Errorf("failed to send data chunk: %w", err)
135180
}
136181
return nil
137182
}
138183

139184
// Execute the stream process
140185
if err := stream.Orchestrate(ctx); err != nil {
141-
return fmt.Errorf("stream orchestration failed for group [%d]: %w", groupId, err)
186+
return fmt.Errorf("stream orchestration failed: %w", err)
142187
}
143188

144189
// Send the final 'done' signal to mark completion
145-
glog.Infof("Sending completion signal for group [%d]", groupId)
190+
glog.Infof("[import] Sending completion signal for group [%d]", groupId)
146191
done := &apiv2.StreamPacket{Done: true}
147192

148-
if err := out.Send(&apiv2.StreamPDirRequest{StreamPacket: done}); err != nil && !errors.Is(err, io.EOF) {
193+
if err := out.Send(&apiv2.StreamExtSnapshotRequest{Pkt: done}); err != nil && !errors.Is(err, io.EOF) {
149194
return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err)
150195
}
151-
// Wait for acknowledgment from the server
152-
if _, err := out.CloseAndRecv(); err != nil {
153-
return fmt.Errorf("failed to receive ACK for group [%d]: %w", groupId, err)
154-
}
155-
glog.Infof("Group [%d]: Received ACK ", groupId)
156196

157197
return nil
158198
}

0 commit comments

Comments
 (0)