Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: allow specific snapshot chunk ratio #212

Merged
merged 2 commits into from
Oct 19, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions packages/network/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ export type SyncWorkerConfig = {
fetchSystemCalls?: boolean;
cacheInterval?: number;
cacheAgeThreshold?: number;
snapshotNumChunks?: number;
};

export enum ContractSchemaValue {
Expand Down
8 changes: 6 additions & 2 deletions packages/network/src/workers/SyncWorker.ts
Original file line number Diff line number Diff line change
Expand Up @@ -184,8 +184,12 @@ export class SyncWorker<C extends Components> implements DoWork<SyncWorkerConfig

if (syncFromSnapshot) {
this.setLoadingState({ state: SyncState.INITIAL, msg: "Fetching initial state from snapshot", percentage: 0 });
initialState = await fetchSnapshotChunked(snapshotClient, worldContract.address, decode, (percentage: number) =>
this.setLoadingState({ percentage })
initialState = await fetchSnapshotChunked(
snapshotClient,
worldContract.address,
decode,
config.snapshotNumChunks,
(percentage: number) => this.setLoadingState({ percentage })
);
} else {
this.setLoadingState({ state: SyncState.INITIAL, msg: "Fetching initial state from cache", percentage: 0 });
Expand Down
8 changes: 5 additions & 3 deletions packages/network/src/workers/syncUtils.ts
Original file line number Diff line number Diff line change
Expand Up @@ -110,16 +110,18 @@ export async function fetchSnapshotChunked(
snapshotClient: ECSStateSnapshotServiceClient,
worldAddress: string,
decode: ReturnType<typeof createDecode>,
setPercentage: (percentage: number) => void
numChunks = 10,
setPercentage?: (percentage: number) => void
): Promise<CacheStore> {
const cacheStore = createCacheStore();
const chunkPercentage = 100 / numChunks;

try {
const response = snapshotClient.getStateLatestStream({ worldAddress });
const response = snapshotClient.getStateLatestStream({ worldAddress, chunkPercentage });
let i = 0;
for await (const responseChunk of response) {
await reduceFetchedState(responseChunk, cacheStore, decode);
setPercentage((i++ / 10) * 100);
setPercentage && setPercentage((i++ / numChunks) * 100);
}
} catch (e) {
console.error(e);
Expand Down
2 changes: 2 additions & 0 deletions packages/services/cmd/ecs-snapshot/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ var (
initialSyncBlockBatchSize = flag.Int64("initial-sync-block-batch-size", 10, "Number of blocks to fetch data for when performing an initial sync")
initialSyncBlockBatchSyncTimeout = flag.Int64("initial-sync-block-batch-sync-timeout", 100, "Time in milliseconds to wait between calls to fetch batched log data when performing an initial sync")
initialSyncSnapshotInterval = flag.Int64("initial-sync-snapshot-interval", 5000, "Block number interval for how often to take intermediary snapshots when performing an initial sync")
defaultSnapshotChunkPercentage = flag.Int("default-snapshot-chunk-percentage", 10, "Default percentage for RPCs that request a snapshot in chunks. Default to 10, i.e. 10 percent chunks")
metricsPort = flag.Int("metrics-port", 6060, "Prometheus metrics http handler port. Defaults to port 6060")
)

Expand All @@ -68,6 +69,7 @@ func main() {
InitialSyncBlockBatchSize: *initialSyncBlockBatchSize,
InitialSyncBlockBatchSyncTimeout: time.Duration(*initialSyncBlockBatchSyncTimeout) * time.Millisecond,
InitialSyncSnapshotInterval: *initialSyncSnapshotInterval,
DefaultSnapshotChunkPercentage: *defaultSnapshotChunkPercentage,
}

// Parse world addresses to listen to.
Expand Down
15 changes: 10 additions & 5 deletions packages/services/pkg/grpc/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,16 +50,21 @@ func (server *ecsSnapshotServer) GetStateLatest(ctx context.Context, in *pb.ECSS
}

// GetStateLatestStream is a gRPC endpoint that returns the latest snapshot, if any, for a given
// WorldAddress provided via ECSStateRequestLatest. The snapshot is sent chunked via a stream over
// WorldAddress provided via ECSStateRequestLatestStream. The snapshot is sent chunked via a stream over
// the wire.
func (server *ecsSnapshotServer) GetStateLatestStream(in *pb.ECSStateRequestLatest, stream pb.ECSStateSnapshotService_GetStateLatestStreamServer) error {
func (server *ecsSnapshotServer) GetStateLatestStream(in *pb.ECSStateRequestLatestStream, stream pb.ECSStateSnapshotService_GetStateLatestStreamServer) error {
if !snapshot.IsSnaphotAvailableLatest(in.WorldAddress) {
return fmt.Errorf("no snapshot")
}
latestSnapshot := snapshot.RawReadStateSnapshotLatest(in.WorldAddress)
// Respond in fraction chunks, e.g. in chunks of 10%.
chunkFraction := 10
latestSnapshotChunked := snapshot.ChunkRawStateSnapshot(latestSnapshot, chunkFraction)

// Respond in fraction chunks. If request has specified a chunk percentage, use that value.
chunkPercentage := server.config.DefaultSnapshotChunkPercentage
if in.ChunkPercentage != nil {
chunkPercentage = int(*in.ChunkPercentage)
}

latestSnapshotChunked := snapshot.ChunkRawStateSnapshot(latestSnapshot, chunkPercentage)

for _, snapshotChunk := range latestSnapshotChunked {
stream.Send(&pb.ECSStateReply{
Expand Down
2 changes: 2 additions & 0 deletions packages/services/pkg/snapshot/core.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ type SnapshotServerConfig struct {
// snapshots when performing an initial sync. This is useful in case the snapshot service
// disconnects or fails while perfoming a lengthy initial sync.
InitialSyncSnapshotInterval int64
// Default to use when chunking snapshot to send snapshot in chunks over the wire.
DefaultSnapshotChunkPercentage int
}

// Start starts the process of processing data from an Ethereum client, reducing the ECS state, and
Expand Down
8 changes: 7 additions & 1 deletion packages/services/proto/ecs-snapshot.proto
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ service ECSStateSnapshotService {
rpc GetStateLatest (ECSStateRequestLatest) returns (ECSStateReply) {}

// Requests the latest ECS state in stream format, which will chunk the state.
rpc GetStateLatestStream (ECSStateRequestLatest) returns (stream ECSStateReply) {}
rpc GetStateLatestStream (ECSStateRequestLatestStream) returns (stream ECSStateReply) {}

// Requests the latest block number based on the latest ECS state.
rpc GetStateBlockLatest (ECSStateBlockRequestLatest) returns (ECSStateBlockReply) {}
Expand All @@ -46,6 +46,12 @@ message ECSStateRequestLatest {
string worldAddress = 1;
}

// The request message for the latest chunked ECS state.
message ECSStateRequestLatestStream {
string worldAddress = 1;
optional uint32 chunkPercentage = 2;
}

// The request message for the latest block based on latest ECS state.
message ECSStateBlockRequestLatest {
string worldAddress = 1;
Expand Down
Loading