Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: v2 endpoint for pruned snapshot that returns entities as raw bytes #215

Merged
merged 11 commits into from
Oct 28, 2022
48 changes: 40 additions & 8 deletions packages/network/src/workers/syncUtils.ts
Original file line number Diff line number Diff line change
@@ -1,13 +1,14 @@
import { JsonRpcProvider } from "@ethersproject/providers";
import { EntityID, ComponentValue, Components } from "@latticexyz/recs";
import { to256BitString, awaitPromise, range } from "@latticexyz/utils";
import { to256BitString, awaitPromise, range, Uint8ArrayToHexString } from "@latticexyz/utils";
import { BytesLike, Contract, BigNumber } from "ethers";
import { Observable, map, concatMap, of, from } from "rxjs";
import { createDecoder } from "../createDecoder";
import { createTopics } from "../createTopics";
import { fetchEventsInBlockRange } from "../networkUtils";
import {
ECSStateReply,
ECSStateReplyV2,
ECSStateSnapshotServiceClient,
ECSStateSnapshotServiceDefinition,
} from "@latticexyz/services/protobuf/ts/ecs-snapshot/ecs-snapshot";
Expand All @@ -23,7 +24,6 @@ import { CacheStore, createCacheStore, storeEvent, storeEvents } from "./CacheSt
import { abi as ComponentAbi } from "@latticexyz/solecs/abi/Component.json";
import { abi as WorldAbi } from "@latticexyz/solecs/abi/World.json";
import { Component, World } from "@latticexyz/solecs/types/ethers-contracts";
import { SyncState } from "./constants";
import {
ECSStreamBlockBundleReply,
ECSStreamServiceClient,
Expand Down Expand Up @@ -80,6 +80,7 @@ export async function getSnapshotBlockNumber(
* @param worldAddress Address of the World contract to get the snapshot for.
* @param decode Function to decode raw component values ({@link createDecode}).
* @returns Promise resolving with {@link CacheStore} containing the snapshot state.
* @deprecated this util will be removed in a future version, use fetchSnapshotChunked instead
*/
export async function fetchSnapshot(
snapshotClient: ECSStateSnapshotServiceClient,
Expand Down Expand Up @@ -115,20 +116,24 @@ export async function fetchSnapshotChunked(
pruneOptions?: { playerAddress: string; hashedComponentId: string }
): Promise<CacheStore> {
const cacheStore = createCacheStore();
const chunkPercentage = 100 / numChunks;
const chunkPercentage = Math.ceil(100 / numChunks);

try {
const response = pruneOptions
? snapshotClient.getStateLatestStreamPruned({
? snapshotClient.getStateLatestStreamPrunedV2({
worldAddress,
chunkPercentage,
pruneAddress: pruneOptions.playerAddress,
pruneComponentId: pruneOptions.hashedComponentId,
pruneAddress: pruneOptions?.playerAddress,
pruneComponentId: pruneOptions?.hashedComponentId,
})
: snapshotClient.getStateLatestStream({ worldAddress, chunkPercentage });
: snapshotClient.getStateLatestStreamV2({
worldAddress,
chunkPercentage,
});

let i = 0;
for await (const responseChunk of response) {
await reduceFetchedState(responseChunk, cacheStore, decode);
await reduceFetchedStateV2(responseChunk, cacheStore, decode);
setPercentage && setPercentage((i++ / numChunks) * 100);
}
} catch (e) {
Expand All @@ -145,6 +150,7 @@ export async function fetchSnapshotChunked(
* @param cacheStore {@link CacheStore} to store snapshot state into.
* @param decode Function to decode raw component values ({@link createDecode}).
* @returns Promise resolving once state is reduced into {@link CacheStore}.
* @deprecated this util will be removed in a future version, use reduceFetchedStateV2 instead
*/
export async function reduceFetchedState(
response: ECSStateReply,
Expand All @@ -161,6 +167,32 @@ export async function reduceFetchedState(
}
}

/**
* Reduces a snapshot response by storing corresponding ECS events into the cache store.
*
* @param response ECSStateReplyV2
* @param cacheStore {@link CacheStore} to store snapshot state into.
* @param decode Function to decode raw component values ({@link createDecode}).
* @returns Promise resolving once state is reduced into {@link CacheStore}.
*/
export async function reduceFetchedStateV2(
response: ECSStateReplyV2,
cacheStore: CacheStore,
decode: ReturnType<typeof createDecode>
): Promise<void> {
const { state, blockNumber, stateComponents, stateEntities } = response;
const stateEntitiesHex = stateEntities.map((e) => Uint8ArrayToHexString(e) as EntityID);
const stateComponentsHex = stateComponents.map((e) => to256BitString(e));

for (const { componentIdIdx, entityIdIdx, value: rawValue } of state) {
const component = stateComponentsHex[componentIdIdx];
const entity = stateEntitiesHex[entityIdIdx];
if (entity == undefined) console.log("invalid entity index", stateEntities.length, entityIdIdx);
const value = await decode(component, rawValue);
storeEvent(cacheStore, { type: NetworkEvents.NetworkComponentUpdate, component, entity, value, blockNumber });
}
}

/**
* Create a RxJS stream of {@link NetworkComponentUpdate}s by subscribing to a
* gRPC streaming service.
Expand Down
61 changes: 61 additions & 0 deletions packages/services/pkg/grpc/snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"fmt"
"latticexyz/mud/packages/services/pkg/snapshot"
"latticexyz/mud/packages/services/pkg/utils"
pb "latticexyz/mud/packages/services/protobuf/go/ecs-snapshot"
)

Expand Down Expand Up @@ -124,3 +125,63 @@ func (server *ecsSnapshotServer) GetStateBlockLatest(ctx context.Context, in *pb
func (server *ecsSnapshotServer) GetStateAtBlock(ctx context.Context, in *pb.ECSStateRequestAtBlock) (*pb.ECSStateReply, error) {
return nil, fmt.Errorf("not implemented")
}

//
// V2 endpoints
//

func (server *ecsSnapshotServer) GetStateLatestStreamV2(in *pb.ECSStateRequestLatestStream, stream pb.ECSStateSnapshotService_GetStateLatestStreamV2Server) error {
if !snapshot.IsSnaphotAvailableLatest(in.WorldAddress) {
return fmt.Errorf("no snapshot")
}
latestSnapshot := snapshot.RawReadStateSnapshotLatest(in.WorldAddress)

// Respond in fraction chunks. If request has specified a chunk percentage, use that value.
chunkPercentage := server.config.DefaultSnapshotChunkPercentage
if in.ChunkPercentage != nil {
chunkPercentage = int(*in.ChunkPercentage)
}

latestSnapshotChunked := snapshot.ChunkRawStateSnapshot(latestSnapshot, chunkPercentage)

for _, snapshotChunk := range latestSnapshotChunked {
stream.Send(&pb.ECSStateReplyV2{
State: snapshotChunk.State,
StateComponents: snapshotChunk.StateComponents,
StateEntities: utils.HexStringArrayToBytesArray(snapshotChunk.StateEntities),
StateHash: snapshotChunk.StateHash,
BlockNumber: snapshotChunk.EndBlockNumber,
})
}
return nil
}

func (server *ecsSnapshotServer) GetStateLatestStreamPrunedV2(request *pb.ECSStateRequestLatestStreamPruned, stream pb.ECSStateSnapshotService_GetStateLatestStreamPrunedV2Server) error {
if !snapshot.IsSnaphotAvailableLatest(request.WorldAddress) {
return fmt.Errorf("no snapshot")
}
if len(request.PruneAddress) == 0 {
return fmt.Errorf("address for which to prune for required")
}
latestSnapshot := snapshot.RawReadStateSnapshotLatest(request.WorldAddress)
latestSnapshotPruned := snapshot.PruneSnapshotOwnedByComponent(latestSnapshot, request.PruneAddress)

// Respond in fraction chunks. If request has specified a chunk percentage, use that value.
chunkPercentage := server.config.DefaultSnapshotChunkPercentage
if request.ChunkPercentage != nil {
chunkPercentage = int(*request.ChunkPercentage)
}

latestSnapshotChunked := snapshot.ChunkRawStateSnapshot(latestSnapshotPruned, chunkPercentage)

for _, snapshotChunk := range latestSnapshotChunked {
stream.Send(&pb.ECSStateReplyV2{
State: snapshotChunk.State,
StateComponents: snapshotChunk.StateComponents,
StateEntities: utils.HexStringArrayToBytesArray(snapshotChunk.StateEntities),
StateHash: snapshotChunk.StateHash,
BlockNumber: snapshotChunk.EndBlockNumber,
})
}
return nil
}
19 changes: 19 additions & 0 deletions packages/services/pkg/utils/general.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,27 @@
package utils

import (
"latticexyz/mud/packages/services/pkg/logger"

"github.com/ethereum/go-ethereum/common/hexutil"
"go.uber.org/zap"
)

func Min(a, b int) int {
if a < b {
return a
}
return b
}

func HexStringArrayToBytesArray(strArray []string) [][]byte {
bytesArray := [][]byte{}
for _, str := range strArray {
entityIdBigInt, err := hexutil.DecodeBig(str)
if err != nil {
logger.GetLogger().Error("can't parse entity ID", zap.String("string", str), zap.Error(err))
}
bytesArray = append(bytesArray, entityIdBigInt.Bytes())
}
return bytesArray
}
18 changes: 18 additions & 0 deletions packages/services/proto/ecs-snapshot.proto
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,19 @@ service ECSStateSnapshotService {
// Requests the latest ECS state in stream format, which will chunk the state.
rpc GetStateLatestStream (ECSStateRequestLatestStream) returns (stream ECSStateReply) {}

// Requests the latest ECS state in stream format, which will chunk the state.
//
// V2 version optimized to return entities as raw bytes.
rpc GetStateLatestStreamV2 (ECSStateRequestLatestStream) returns (stream ECSStateReplyV2) {}

// Requests the latest ECS state, with aditional pruning.
rpc GetStateLatestStreamPruned (ECSStateRequestLatestStreamPruned) returns (stream ECSStateReply) {}

// Requests the latest ECS state, with aditional pruning.
//
// V2 version optimized to return entities as raw bytes.
rpc GetStateLatestStreamPrunedV2 (ECSStateRequestLatestStreamPruned) returns (stream ECSStateReplyV2) {}

// Requests the latest block number based on the latest ECS state.
rpc GetStateBlockLatest (ECSStateBlockRequestLatest) returns (ECSStateBlockReply) {}

Expand Down Expand Up @@ -87,6 +97,14 @@ message ECSStateReply {
uint32 blockNumber = 5;
}

message ECSStateReplyV2 {
repeated ECSState state = 1;
repeated string stateComponents = 2;
repeated bytes stateEntities = 3;
string stateHash = 4;
uint32 blockNumber = 5;
}

message ECSStateBlockReply {
uint32 blockNumber = 1;
}
Loading