@@ -7,16 +7,13 @@ package dgraphimport
7
7
8
8
import (
9
9
"context"
10
- "errors"
11
10
"fmt"
12
- "io"
13
- "math"
14
11
"os"
15
12
"path/filepath"
16
13
17
14
"github.com/dgraph-io/badger/v4"
18
15
apiv2 "github.com/dgraph-io/dgo/v250/protos/api.v2"
19
- "github.com/dgraph-io/ristretto/v2/z "
16
+ "github.com/hypermodeinc/ dgraph/v25/worker "
20
17
21
18
"github.com/golang/glog"
22
19
"golang.org/x/sync/errgroup"
@@ -48,10 +45,12 @@ func Import(ctx context.Context, endpoint string, opts grpc.DialOption, bulkOutD
48
45
}
49
46
50
47
// startPDirStream initiates a snapshot stream session with the Dgraph server.
51
- func startPDirStream (ctx context.Context , dc apiv2.DgraphClient ) (* apiv2.InitiatePDirStreamResponse , error ) {
48
+ func startPDirStream (ctx context.Context , dc apiv2.DgraphClient ) (* apiv2.UpdateExtSnapshotStreamingStateResponse , error ) {
52
49
glog .Info ("Initiating pdir stream" )
53
- req := & apiv2.InitiatePDirStreamRequest {}
54
- resp , err := dc .InitiatePDirStream (ctx , req )
50
+ req := & apiv2.UpdateExtSnapshotStreamingStateRequest {
51
+ Start : true ,
52
+ }
53
+ resp , err := dc .UpdateExtSnapshotStreamingState (ctx , req )
55
54
if err != nil {
56
55
glog .Errorf ("failed to initiate pdir stream: %v" , err )
57
56
return nil , fmt .Errorf ("failed to initiate pdir stream: %v" , err )
@@ -63,7 +62,7 @@ func startPDirStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.Initiat
63
62
// sendPDir takes a p directory and a set of group IDs and streams the data from the
64
63
// p directory to the corresponding group IDs. It first scans the provided directory for
65
64
// subdirectories named with numeric group IDs.
66
- func sendPDir (ctx context.Context , dg apiv2.DgraphClient , baseDir string , groups []uint32 ) error {
65
+ func sendPDir (ctx context.Context , dc apiv2.DgraphClient , baseDir string , groups []uint32 ) error {
67
66
glog .Infof ("Starting to stream pdir from directory: %s" , baseDir )
68
67
69
68
errG , ctx := errgroup .WithContext (ctx )
@@ -74,31 +73,43 @@ func sendPDir(ctx context.Context, dg apiv2.DgraphClient, baseDir string, groups
74
73
if _ , err := os .Stat (pDir ); err != nil {
75
74
return fmt .Errorf ("p directory does not exist for group [%d]: [%s]" , group , pDir )
76
75
}
77
-
78
76
glog .Infof ("Streaming data for group [%d] from directory: [%s]" , group , pDir )
79
- if err := streamData (ctx , dg , pDir , group ); err != nil {
77
+ if err := streamData (ctx , dc , pDir , group ); err != nil {
80
78
glog .Errorf ("Failed to stream data for groups [%v] from directory: [%s]: %v" , group , pDir , err )
81
79
return err
82
80
}
83
81
84
82
return nil
85
83
})
86
84
}
87
- if err := errG .Wait (); err != nil {
88
- return err
85
+ if err1 := errG .Wait (); err1 != nil {
86
+ // If the p directory doesn't exist for this group, it indicates that
87
+ // streaming might be in progress to other groups. We disable drain mode
88
+ // to prevent interference and drop any streamed data to ensure a clean state.
89
+ req := & apiv2.UpdateExtSnapshotStreamingStateRequest {
90
+ Start : false ,
91
+ Finish : true ,
92
+ DropData : true ,
93
+ }
94
+ if _ , err := dc .UpdateExtSnapshotStreamingState (context .Background (), req ); err != nil {
95
+ return fmt .Errorf ("failed to stream data :%v failed to off drain mode: %v" , err1 , err )
96
+ }
97
+
98
+ glog .Info ("successfully disabled drain mode" )
99
+ return err1
89
100
}
90
101
91
- glog .Infof ("Completed streaming all pdirs" )
102
+ glog .Info ("Completed streaming all pdirs" )
92
103
return nil
93
104
}
94
105
95
106
// streamData handles the actual data streaming process for a single group.
96
107
// It opens the BadgerDB at the specified directory and streams all data to the server.
97
- func streamData (ctx context.Context , dg apiv2.DgraphClient , pdir string , groupId uint32 ) error {
108
+ func streamData (ctx context.Context , dc apiv2.DgraphClient , pdir string , groupId uint32 ) error {
98
109
glog .Infof ("Opening stream for group %d from directory %s" , groupId , pdir )
99
110
100
111
// Initialize stream with the server
101
- out , err := dg . StreamPDir (ctx )
112
+ out , err := dc . StreamExtSnapshot (ctx )
102
113
if err != nil {
103
114
return fmt .Errorf ("failed to start pdir stream for group %d: %w" , groupId , err )
104
115
}
@@ -118,41 +129,23 @@ func streamData(ctx context.Context, dg apiv2.DgraphClient, pdir string, groupId
118
129
119
130
// Send group ID as the first message in the stream
120
131
glog .Infof ("Sending group ID [%d] to server" , groupId )
121
- groupReq := & apiv2.StreamPDirRequest {GroupId : groupId }
132
+ groupReq := & apiv2.StreamExtSnapshotRequest {GroupId : groupId }
122
133
if err := out .Send (groupReq ); err != nil {
123
134
return fmt .Errorf ("failed to send group ID [%d]: %w" , groupId , err )
124
135
}
125
136
126
137
// Configure and start the BadgerDB stream
127
138
glog .Infof ("Starting BadgerDB stream for group [%d]" , groupId )
128
- stream := ps .NewStreamAt (math .MaxUint64 )
129
- stream .LogPrefix = fmt .Sprintf ("Sending P dir for group [%d]" , groupId )
130
- stream .KeyToList = nil
131
- stream .Send = func (buf * z.Buffer ) error {
132
- p := & apiv2.StreamPacket {Data : buf .Bytes ()}
133
- if err := out .Send (& apiv2.StreamPDirRequest {StreamPacket : p }); err != nil && ! errors .Is (err , io .EOF ) {
134
- return fmt .Errorf ("failed to send data chunk: %w" , err )
135
- }
136
- return nil
137
- }
138
-
139
- // Execute the stream process
140
- if err := stream .Orchestrate (ctx ); err != nil {
141
- return fmt .Errorf ("stream orchestration failed for group [%d]: %w" , groupId , err )
142
- }
143
-
144
- // Send the final 'done' signal to mark completion
145
- glog .Infof ("Sending completion signal for group [%d]" , groupId )
146
- done := & apiv2.StreamPacket {Done : true }
139
+ // if err := RunBadgerStream(ctx, ps, out, groupId); err != nil {
140
+ // return fmt.Errorf("badger stream failed for group [%d]: %w", groupId, err)
141
+ // }
147
142
148
- if err := out . Send ( & apiv2. StreamPDirRequest { StreamPacket : done } ); err != nil && ! errors . Is ( err , io . EOF ) {
149
- return fmt .Errorf ("failed to send 'done' signal for group [%d]: %w" , groupId , err )
143
+ if err := worker . RunBadgerStream ( ctx , ps , out , groupId ); err != nil {
144
+ return fmt .Errorf ("badger stream failed for group [%d]: %w" , groupId , err )
150
145
}
151
- // Wait for acknowledgment from the server
152
146
if _ , err := out .CloseAndRecv (); err != nil {
153
147
return fmt .Errorf ("failed to receive ACK for group [%d]: %w" , groupId , err )
154
148
}
155
149
glog .Infof ("Group [%d]: Received ACK " , groupId )
156
-
157
150
return nil
158
151
}
0 commit comments