Skip to content

Commit 2572f94

Browse files
Add support for HA and multishard functionality in import APIs
1 parent 08fbcfc commit 2572f94

File tree

8 files changed

+576
-296
lines changed

8 files changed

+576
-296
lines changed

dgraph/cmd/dgraphimport/import_client.go

Lines changed: 6 additions & 27 deletions
Original file line numberDiff line numberDiff line change
@@ -7,16 +7,13 @@ package dgraphimport
77

88
import (
99
"context"
10-
"errors"
1110
"fmt"
12-
"io"
13-
"math"
1411
"os"
1512
"path/filepath"
1613

1714
"github.com/dgraph-io/badger/v4"
1815
apiv25 "github.com/dgraph-io/dgo/v250/protos/api.v25"
19-
"github.com/dgraph-io/ristretto/v2/z"
16+
"github.com/hypermodeinc/dgraph/v25/worker"
2017

2118
"github.com/golang/glog"
2219
"golang.org/x/sync/errgroup"
@@ -125,34 +122,16 @@ func streamData(ctx context.Context, dg apiv25.DgraphClient, pdir string, groupI
125122

126123
// Configure and start the BadgerDB stream
127124
glog.Infof("Starting BadgerDB stream for group [%d]", groupId)
128-
stream := ps.NewStreamAt(math.MaxUint64)
129-
stream.LogPrefix = fmt.Sprintf("Sending P dir for group [%d]", groupId)
130-
stream.KeyToList = nil
131-
stream.Send = func(buf *z.Buffer) error {
132-
p := &apiv25.StreamPacket{Data: buf.Bytes()}
133-
if err := out.Send(&apiv25.StreamPDirRequest{StreamPacket: p}); err != nil && !errors.Is(err, io.EOF) {
134-
return fmt.Errorf("failed to send data chunk: %w", err)
135-
}
136-
return nil
137-
}
138-
139-
// Execute the stream process
140-
if err := stream.Orchestrate(ctx); err != nil {
141-
return fmt.Errorf("stream orchestration failed for group [%d]: %w", groupId, err)
142-
}
143-
144-
// Send the final 'done' signal to mark completion
145-
glog.Infof("Sending completion signal for group [%d]", groupId)
146-
done := &apiv25.StreamPacket{Done: true}
125+
// if err := RunBadgerStream(ctx, ps, out, groupId); err != nil {
126+
// return fmt.Errorf("badger stream failed for group [%d]: %w", groupId, err)
127+
// }
147128

148-
if err := out.Send(&apiv25.StreamPDirRequest{StreamPacket: done}); err != nil && !errors.Is(err, io.EOF) {
149-
return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err)
129+
if err := worker.RunBadgerStream(ctx, ps, out, groupId); err != nil {
130+
return fmt.Errorf("badger stream failed for group [%d]: %w", groupId, err)
150131
}
151-
// Wait for acknowledgment from the server
152132
if _, err := out.CloseAndRecv(); err != nil {
153133
return fmt.Errorf("failed to receive ACK for group [%d]: %w", groupId, err)
154134
}
155135
glog.Infof("Group [%d]: Received ACK ", groupId)
156-
157136
return nil
158137
}

dgraph/cmd/dgraphimport/import_test.go

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"encoding/json"
1212
"path/filepath"
1313
"testing"
14+
"time"
1415

1516
"github.com/hypermodeinc/dgraph/v25/dgraphapi"
1617
"github.com/hypermodeinc/dgraph/v25/dgraphtest"
@@ -96,9 +97,9 @@ func TestImportApis(t *testing.T) {
9697
targetAlphas int
9798
replicasFactor int
9899
}{
99-
{"SingleGroupSingleAlpha", 1, 1, 1},
100-
{"TwoGroupsSingleAlpha", 2, 2, 1},
101-
{"ThreeGroupsSingleAlpha", 3, 3, 1},
100+
{"SingleGroupSingleAlpha", 1, 3, 3},
101+
{"TwoGroupsSingleAlpha", 2, 6, 3},
102+
{"ThreeGroupsSingleAlpha", 3, 9, 3},
102103
}
103104

104105
for _, tt := range tests {
@@ -123,6 +124,7 @@ func runImportTest(t *testing.T, bulkAlphas, targetAlphas, replicasFactor int) {
123124
require.NoError(t, Import(context.Background(), url,
124125
grpc.WithTransportCredentials(insecure.NewCredentials()), outDir))
125126

127+
time.Sleep(time.Minute * 3)
126128
verifyImportResults(t, gc)
127129
}
128130

edgraph/server.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1829,11 +1829,6 @@ func (s *ServerV25) StreamPDir(stream apiv25.Dgraph_StreamPDirServer) error {
18291829
return err
18301830
}
18311831

1832-
drainMode := &pb.DrainModeRequest{State: false}
1833-
if _, err := worker.ProposeDrain(stream.Context(), drainMode); err != nil {
1834-
return err
1835-
}
1836-
18371832
return nil
18381833
}
18391834

protos/pb.proto

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -320,6 +320,7 @@ message Proposal {
320320
// Skipping 15 as it is used for uint64 key in master and might be needed later here.
321321
uint64 start_ts = 16;
322322
DrainModeRequest drainmode = 17;
323+
ReqPDirStreamRequest reqpdirstream = 18;
323324
}
324325

325326
message CDCState {
@@ -598,6 +599,7 @@ service Worker {
598599
rpc TaskStatus(TaskStatusRequest) returns (TaskStatusResponse) {}
599600
rpc ApplyDrainmode(DrainModeRequest) returns (Status) {}
600601
rpc InternalStreamPDir(stream api.v25.StreamPDirRequest) returns (api.v25.StreamPDirResponse) {}
602+
rpc ReqPDirStream(ReqPDirStreamRequest) returns (stream api.v25.StreamPDirRequest) {}
601603
}
602604

603605
message DrainModeRequest {
@@ -795,4 +797,8 @@ message TaskStatusResponse {
795797
uint64 task_meta = 1;
796798
}
797799

800+
message ReqPDirStreamRequest {
801+
string addr=2 ;
802+
}
803+
798804
// vim: expandtab sw=2 ts=2

protos/pb/pb.pb.go

Lines changed: 337 additions & 255 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

protos/pb/pb_grpc.pb.go

Lines changed: 64 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

worker/draft.go

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -86,6 +86,8 @@ func (id op) String() string {
8686
return "opBackup"
8787
case opPredMove:
8888
return "opPredMove"
89+
case opStreamPDir:
90+
return "opStreamPDir"
8991
default:
9092
return "opUnknown"
9193
}
@@ -98,6 +100,7 @@ const (
98100
opRestore
99101
opBackup
100102
opPredMove
103+
opStreamPDir
101104
)
102105

103106
// startTask is used for the tasks that do not require tracking of timestamp.
@@ -693,6 +696,44 @@ func (n *node) applyCommitted(proposal *pb.Proposal, key uint64) error {
693696
x.UpdateDrainingMode(proposal.Drainmode.State)
694697
return nil
695698

699+
case proposal.Reqpdirstream != nil:
700+
defer x.UpdateDrainingMode(false)
701+
702+
glog.Info("[import] Got proposal to get latest from leader P dir")
703+
if proposal.Reqpdirstream.Addr == n.MyAddr {
704+
glog.Info("[import] already have latest p dir skipping")
705+
return nil
706+
}
707+
708+
var err error
709+
var closer *z.Closer
710+
closer, err = n.startTask(opStreamPDir)
711+
if err != nil {
712+
return errors.Wrapf(err, "cannot start stream p dir task")
713+
}
714+
defer closer.Done()
715+
716+
pl, err := conn.GetPools().Get(proposal.Reqpdirstream.Addr)
717+
if err != nil {
718+
glog.Error("[import] unable to set connection pool")
719+
return err
720+
}
721+
722+
if pl == nil {
723+
glog.Error("[import] connection is broken")
724+
return fmt.Errorf("connection is broken")
725+
}
726+
727+
con := pl.Get()
728+
c := pb.NewWorkerClient(con)
729+
r := &pb.ReqPDirStreamRequest{Addr: n.MyAddr}
730+
stream, err := c.ReqPDirStream(ctx, r)
731+
if err != nil {
732+
glog.Error("[import] Error streaming P dir")
733+
return err
734+
}
735+
return FlushData(stream)
736+
696737
case proposal.Snapshot != nil:
697738
existing, err := n.Store.Snapshot()
698739
if err != nil {

0 commit comments

Comments
 (0)