Skip to content

Add support for HA and multishard functionality in import APIs #9406

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 2 commits into from
Jun 20, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
138 changes: 90 additions & 48 deletions dgraph/cmd/dgraphimport/import_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,144 +15,186 @@ import (
"path/filepath"

"github.com/dgraph-io/badger/v4"
"github.com/dgraph-io/dgo/v250"
apiv2 "github.com/dgraph-io/dgo/v250/protos/api.v2"
"github.com/dgraph-io/ristretto/v2/z"

"github.com/golang/glog"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc"
)

// newClient creates a new import client with the specified endpoint and gRPC options.
func newClient(endpoint string, opts grpc.DialOption) (apiv2.DgraphClient, error) {
conn, err := grpc.NewClient(endpoint, opts)
func newClient(connectionString string) (apiv2.DgraphClient, error) {
dg, err := dgo.Open(connectionString)
if err != nil {
return nil, fmt.Errorf("failed to connect to endpoint [%s]: %w", endpoint, err)
return nil, fmt.Errorf("failed to connect to endpoint [%s]: %w", connectionString, err)
}

glog.Infof("Successfully connected to Dgraph endpoint: %s", endpoint)
return apiv2.NewDgraphClient(conn), nil
glog.Infof("[import] Successfully connected to Dgraph endpoint: %s", connectionString)
return dg.GetAPIv2Client()[0], nil
}

func Import(ctx context.Context, endpoint string, opts grpc.DialOption, bulkOutDir string) error {
dg, err := newClient(endpoint, opts)
func Import(ctx context.Context, connectionString string, bulkOutDir string) error {
dg, err := newClient(connectionString)
if err != nil {
return err
}
resp, err := startPDirStream(ctx, dg)
resp, err := initiateSnapshotStream(ctx, dg)
if err != nil {
return err
}

return sendPDir(ctx, dg, bulkOutDir, resp.Groups)
return streamSnapshot(ctx, dg, bulkOutDir, resp.Groups)
}

// startPDirStream initiates a snapshot stream session with the Dgraph server.
func startPDirStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.InitiatePDirStreamResponse, error) {
glog.Info("Initiating pdir stream")
req := &apiv2.InitiatePDirStreamRequest{}
resp, err := dc.InitiatePDirStream(ctx, req)
// initiateSnapshotStream initiates a snapshot stream session with the Dgraph server.
func initiateSnapshotStream(ctx context.Context, dc apiv2.DgraphClient) (*apiv2.UpdateExtSnapshotStreamingStateResponse, error) {
glog.Info("[import] Initiating external snapshot stream")
req := &apiv2.UpdateExtSnapshotStreamingStateRequest{
Start: true,
}
resp, err := dc.UpdateExtSnapshotStreamingState(ctx, req)
if err != nil {
glog.Errorf("failed to initiate pdir stream: %v", err)
return nil, fmt.Errorf("failed to initiate pdir stream: %v", err)
glog.Errorf("[import] failed to initiate external snapshot stream: %v", err)
return nil, fmt.Errorf("failed to initiate external snapshot stream: %v", err)
}
glog.Info("Pdir stream initiated successfully")
glog.Info("[import] External snapshot stream initiated successfully")
return resp, nil
}

// sendPDir takes a p directory and a set of group IDs and streams the data from the
// streamSnapshot takes a p directory and a set of group IDs and streams the data from the
// p directory to the corresponding group IDs. It first scans the provided directory for
// subdirectories named with numeric group IDs.
func sendPDir(ctx context.Context, dg apiv2.DgraphClient, baseDir string, groups []uint32) error {
glog.Infof("Starting to stream pdir from directory: %s", baseDir)
func streamSnapshot(ctx context.Context, dc apiv2.DgraphClient, baseDir string, groups []uint32) error {
glog.Infof("[import] Starting to stream snapshot from directory: %s", baseDir)

errG, ctx := errgroup.WithContext(ctx)
errG, errGrpCtx := errgroup.WithContext(ctx)
for _, group := range groups {
group := group
errG.Go(func() error {
pDir := filepath.Join(baseDir, fmt.Sprintf("%d", group-1), "p")
if _, err := os.Stat(pDir); err != nil {
return fmt.Errorf("p directory does not exist for group [%d]: [%s]", group, pDir)
}

glog.Infof("Streaming data for group [%d] from directory: [%s]", group, pDir)
if err := streamData(ctx, dg, pDir, group); err != nil {
glog.Errorf("Failed to stream data for groups [%v] from directory: [%s]: %v", group, pDir, err)
glog.Infof("[import] Streaming data for group [%d] from directory: [%s]", group, pDir)
if err := streamSnapshotForGroup(errGrpCtx, dc, pDir, group); err != nil {
glog.Errorf("[import] Failed to stream data for group [%v] from directory: [%s]: %v", group, pDir, err)
return err
}

return nil
})
}

if err := errG.Wait(); err != nil {
glog.Errorf("[import] failed to stream external snapshot: %v", err)
// If errors occurs during streaming of the external snapshot, we drop all the data and
// go back to ensure a clean slate and the cluster remains in working state.
glog.Info("[import] dropping all the data and going back to clean slate")
req := &apiv2.UpdateExtSnapshotStreamingStateRequest{
Start: false,
Finish: true,
DropData: true,
}
if _, err := dc.UpdateExtSnapshotStreamingState(ctx, req); err != nil {
return fmt.Errorf("failed to turn off drain mode: %v", err)
}

glog.Info("[import] successfully disabled drain mode")
return err
}

glog.Infof("Completed streaming all pdirs")
glog.Info("[import] Completed streaming external snapshot")
req := &apiv2.UpdateExtSnapshotStreamingStateRequest{
Start: false,
Finish: true,
DropData: false,
}
if _, err := dc.UpdateExtSnapshotStreamingState(ctx, req); err != nil {
glog.Errorf("[import] failed to disable drain mode: %v", err)
return fmt.Errorf("failed to disable drain mode: %v", err)
}
glog.Info("[import] successfully disable drain mode")
return nil
}

// streamData handles the actual data streaming process for a single group.
// streamSnapshotForGroup handles the actual data streaming process for a single group.
// It opens the BadgerDB at the specified directory and streams all data to the server.
func streamData(ctx context.Context, dg apiv2.DgraphClient, pdir string, groupId uint32) error {
func streamSnapshotForGroup(ctx context.Context, dc apiv2.DgraphClient, pdir string, groupId uint32) error {
glog.Infof("Opening stream for group %d from directory %s", groupId, pdir)

// Initialize stream with the server
out, err := dg.StreamPDir(ctx)
out, err := dc.StreamExtSnapshot(ctx)
if err != nil {
return fmt.Errorf("failed to start pdir stream for group %d: %w", groupId, err)
return fmt.Errorf("failed to start external snapshot stream for group %d: %w", groupId, err)
}

defer func() {
if _, err := out.CloseAndRecv(); err != nil {
glog.Errorf("failed to close the stream for group [%v]: %v", groupId, err)
}

glog.Infof("[import] Group [%v]: Received ACK ", groupId)
}()

// Open the BadgerDB instance at the specified directory
opt := badger.DefaultOptions(pdir)
ps, err := badger.OpenManaged(opt)
if err != nil {
return fmt.Errorf("failed to open BadgerDB at [%s]: %w", pdir, err)
glog.Errorf("failed to open BadgerDB at [%s]: %v", pdir, err)
return fmt.Errorf("failed to open BadgerDB at [%v]: %v", pdir, err)
}

defer func() {
if err := ps.Close(); err != nil {
glog.Warningf("Error closing BadgerDB: %v", err)
glog.Warningf("[import] Error closing BadgerDB: %v", err)
}
}()

// Send group ID as the first message in the stream
glog.Infof("Sending group ID [%d] to server", groupId)
groupReq := &apiv2.StreamPDirRequest{GroupId: groupId}
glog.Infof("[import] Sending request for streaming external snapshot for group ID [%v]", groupId)
groupReq := &apiv2.StreamExtSnapshotRequest{GroupId: groupId}
if err := out.Send(groupReq); err != nil {
return fmt.Errorf("failed to send group ID [%d]: %w", groupId, err)
return fmt.Errorf("failed to send request for streaming external snapshot for group ID [%v] to the server: %w",
groupId, err)
}

// Configure and start the BadgerDB stream
glog.Infof("Starting BadgerDB stream for group [%d]", groupId)
glog.Infof("[import] Starting BadgerDB stream for group [%v]", groupId)

if err := streamBadger(ctx, ps, out, groupId); err != nil {
return fmt.Errorf("badger streaming failed for group [%v]: %v", groupId, err)
}

return nil
}

// streamBadger runs a BadgerDB stream to send key-value pairs to the specified group.
// It creates a new stream at the maximum sequence number and sends the data to the specified group.
// It also sends a final 'done' signal to mark completion.
func streamBadger(ctx context.Context, ps *badger.DB, out apiv2.Dgraph_StreamExtSnapshotClient, groupId uint32) error {
stream := ps.NewStreamAt(math.MaxUint64)
stream.LogPrefix = fmt.Sprintf("Sending P dir for group [%d]", groupId)
stream.LogPrefix = "[import] Sending external snapshot to group [" + fmt.Sprintf("%d", groupId) + "]"
stream.KeyToList = nil
stream.Send = func(buf *z.Buffer) error {
p := &apiv2.StreamPacket{Data: buf.Bytes()}
if err := out.Send(&apiv2.StreamPDirRequest{StreamPacket: p}); err != nil && !errors.Is(err, io.EOF) {
if err := out.Send(&apiv2.StreamExtSnapshotRequest{Pkt: p}); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("failed to send data chunk: %w", err)
}
return nil
}

// Execute the stream process
if err := stream.Orchestrate(ctx); err != nil {
return fmt.Errorf("stream orchestration failed for group [%d]: %w", groupId, err)
return fmt.Errorf("stream orchestration failed for group [%v]: %w, badger path: %s", groupId, err, ps.Opts().Dir)
}

// Send the final 'done' signal to mark completion
glog.Infof("Sending completion signal for group [%d]", groupId)
glog.Infof("[import] Sending completion signal for group [%d]", groupId)
done := &apiv2.StreamPacket{Done: true}

if err := out.Send(&apiv2.StreamPDirRequest{StreamPacket: done}); err != nil && !errors.Is(err, io.EOF) {
if err := out.Send(&apiv2.StreamExtSnapshotRequest{Pkt: done}); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("failed to send 'done' signal for group [%d]: %w", groupId, err)
}
// Wait for acknowledgment from the server
if _, err := out.CloseAndRecv(); err != nil {
return fmt.Errorf("failed to receive ACK for group [%d]: %w", groupId, err)
}
glog.Infof("Group [%d]: Received ACK ", groupId)

return nil
}
Loading