Skip to content
This repository has been archived by the owner on Apr 13, 2021. It is now read-only.

Commit

Permalink
feat: remote working against hub + test
Browse files Browse the repository at this point in the history
Signed-off-by: Carson Farmer <carson.farmer@gmail.com>
  • Loading branch information
carsonfarmer committed Oct 22, 2020
1 parent a6b42d6 commit b631450
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 74 deletions.
18 changes: 8 additions & 10 deletions src/remote/db.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import { personSchema, shouldHaveThrown } from "../utils/spec.utils";
import ThreadID from "@textile/threads-id";

const databaseName = "remote-db";
const serviceHost = "http://localhost:6007";

describe("remote + db", function () {
const privateKey = PrivateKey.fromRandom();
Expand Down Expand Up @@ -36,6 +37,8 @@ describe("remote + db", function () {
it("should be able to push local schemas to remote on push", async function () {
this.timeout(30000);
db = new Database(databaseName);
// db.collectionConfig({ name: "Person", schema: personSchema });
db.remote.set({ serviceHost });
// We always need to authorize first...
const token = await db.remote.authorize(privateKey);
// Now open the db with NO collections
Expand Down Expand Up @@ -70,11 +73,12 @@ describe("remote + db", function () {
});
// Set our thread id and auth token directly
// This is just syntactic sugar over more direct setting
db.remote.set({ id, token });
db.remote.set({ serviceHost, id, token });
// Now let's open the db again
// This internally updates the db version, because the collection set is different
// We should already be authorized because we saved our token from before
await db.open(2); // Version 2
id = await db.remote.initialize();
// Now finally, we push said changes
await db.remote.push("Person");

Expand Down Expand Up @@ -111,22 +115,16 @@ describe("remote + db", function () {
await createChanges();
// Now for the remote stuff
// We always need to authorize first...
db.remote.set({ serviceHost });
await db.remote.authorize(privateKey);
// Do we have a remote table yet? Let's just push and see!
try {
await db.remote.push("Person");
} catch (err) {
expect(err).to.equal(Errors.ThreadIDError);
// Opps, I didn't create the remote one yet, let's initialize
try {
// Use id from before, or if this is a fresh test, create a new one
// Here's a demo of what to do if initialize throws with a thread already exists error
await db.remote.initialize(id);
} catch (err) {
if (err === Errors.ThreadExists) {
db.remote.set({ id }); // Just set the id direcly and move on!
}
}
// Use id from before, or if this is a fresh test, create a new one
await db.remote.initialize(id);
}
await db.remote.push("Person");

Expand Down
27 changes: 14 additions & 13 deletions src/remote/grpc.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,8 +3,8 @@ import type { ThreadID } from "@textile/threads-id";
import { WebsocketTransport } from "@textile/grpc-transport";
import * as pb from "@textile/threads-client-grpc/threads_pb";
import * as api from "@textile/threads-client-grpc/threads_pb_service";
import { Client } from "@textile/threads-client";
import { Context } from "@textile/context";
import { Client, CollectionConfig } from "@textile/threads-client";
import { Context, defaultHost } from "@textile/context";
import { Identity } from "@textile/crypto";

export interface GrpcConfig {
Expand All @@ -13,13 +13,11 @@ export interface GrpcConfig {
debug: boolean;
metadata: grpc.Metadata;
}
// Create local alias
type CollectionConfig = pb.CollectionConfig.AsObject;

export { CollectionConfig };

export const defaults: GrpcConfig = {
// TODO: Should localhost be the default? Probably not.
serviceHost: "http://127.0.0.1:6007",
serviceHost: defaultHost,
transport: WebsocketTransport(),
debug: false,
metadata: new grpc.Metadata(),
Expand All @@ -34,13 +32,16 @@ export function createClient(opts: Partial<GrpcConfig> = {}): api.APIClient {
}

export function createDbClient(config: Partial<GrpcConfig> = {}): Client {
// Get token auth information
const [auth] = config.metadata?.get("authorization") ?? [];
// Create a new remote client instance
// TODO: This is not be the best way to do this...
let context = new Context(config.serviceHost);
// TODO: This is messy!
if (auth) context = context.withToken(auth.slice(7));
// Pull in any existing headers that may have already been set
const json: Record<string, string[]> = {};
config.metadata?.forEach((key, values) => (json[key] = values));
const context =
Object.keys(json).length > 0
? // eslint-disable-next-line @typescript-eslint/no-explicit-any
Context.fromJSON(json, config.serviceHost)
: new Context(config.serviceHost);
const client = new Client(context);
return client;
}
Expand Down Expand Up @@ -141,7 +142,7 @@ export async function getTokenChallenge(
export async function newDB(
name: string,
threadID: ThreadID,
collections: pb.CollectionConfig.AsObject[],
collections: CollectionConfig[],
config?: Partial<GrpcConfig>
): Promise<string> {
const opts = { ...defaults, ...config };
Expand All @@ -155,7 +156,7 @@ export async function newDB(
config.setName(collection.name);
config.setSchema(collection.schema);
const indexesList: pb.Index[] = [];
for (const index of collection.indexesList) {
for (const index of collection.indexes ?? []) {
const idx = new pb.Index();
idx.setPath(index.path);
idx.setUnique(index.unique);
Expand Down
163 changes: 114 additions & 49 deletions src/remote/index.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
import type { Dexie } from "dexie";
import { Context } from "@textile/context";
import { KeyInfo, UserAuth } from "@textile/security";
import jsonpatch from "fast-json-patch";
import {
GrpcConfig,
Expand Down Expand Up @@ -33,6 +35,7 @@ export const Errors = {
export const ThreadIDName = "thread-id";

const encoder = new TextEncoder();
const decoder = new TextDecoder();

/**
* Config specifies the configuration options for remote sync.
Expand Down Expand Up @@ -81,6 +84,74 @@ export class Remote {
this.id = id;
}

/**
* Create a new gRPC client instance from a supplied user auth object.
* Assumes all default gRPC settlings. For customization options, use a context object directly.
* The callback method will automatically refresh expiring credentials.
* @param auth The user auth object or an async callback that returns a user auth object.
* @example
* ```typescript
* import {UserAuth, Client} from '@textile/threads'
*
* function create (auth: UserAuth) {
* return Client.withUserAuth(auth)
* }
* ```
* @example
* ```typescript
* import {UserAuth, Client} from '@textile/threads'
*
* function setCallback (callback: () => Promise<UserAuth>) {
* return Client.withUserAuth(callback)
* }
* ```
*/
async setUserAuth(
auth: UserAuth | (() => Promise<UserAuth>)
): Promise<Remote> {
const init =
typeof auth === "object"
? Context.fromUserAuth(auth, this.config.serviceHost)
: Context.fromUserAuthCallback(auth, this.config.serviceHost);
// Pull in any existing headers that may have already been set
const json: Record<string, string[]> = {};
this.config.metadata?.forEach((key, values) => (json[key] = values));
const metadata = await Context.fromJSON({
...json,
...init.toJSON(),
}).toMetadata();
this.config.metadata = metadata;
return this;
}

/**
* Create a new gRPC client instance from a supplied key and secret
* @param key The KeyInfo object containing {key: string, secret: string, type: 0}. 0 === User Group Key, 1 === Account Key
* @param host The remote gRPC host to connect with. Should be left as default.
* @param debug Whether to run in debug mode. Defaults to false.
* @example
* ```typescript
* import {KeyInfo, Client} from '@textile/threads'
*
* async function create (keyInfo: KeyInfo) {
* return await Client.withKeyInfo(keyInfo)
* }
* ```
*/
async setKeyInfo(key: KeyInfo): Promise<Remote> {
const init = await new Context(this.config.serviceHost).withKeyInfo(key);
// Pull in any existing headers that may have already been set
const json: Record<string, string[]> = {};
this.config.metadata?.forEach((key, values) => (json[key] = values));
const metadata = await Context.fromJSON({
...json,
...init.toJSON(),
}).toMetadata();
this.config.metadata = metadata;
return this;
return this;
}

/**
* Set the remote configuration options.
* @param config The configuration options to use. All are optional.
Expand Down Expand Up @@ -186,7 +257,7 @@ export class Remote {
if (callback === undefined) {
throw new Error("Callback required for public key challenge");
}
token = await getTokenChallenge(identity, callback);
token = await getTokenChallenge(identity, callback, this.config);
} else {
token = await getToken(identity, this.config);
}
Expand Down Expand Up @@ -256,10 +327,22 @@ export class Remote {
idString = await newDB(this.storage.name, threadID, schemas, this.config);
} catch (err) {
if (err.toString().includes("db already exists")) {
throw Errors.ThreadExists;
idString = threadID.toString();
// If it already exists, maybe we just need to create/update the schemas?
const client = createDbClient(this.config);
for (const schema of schemas) {
schema.schema = JSON.parse(decoder.decode(schema.schema));
try {
await client.newCollection(threadID, schema);
} catch (err) {
if (!err.message.includes("collection already registered"))
throw err;
}
}
} else {
// Otherwise, just throw it
throw err;
}
// Otherwise, just throw it
throw err;
}

// Otherwise throw a generic remote error :(
Expand All @@ -284,68 +367,46 @@ export class Remote {

// Blast thru provided collection names...
for (const collectionName of collections) {
// Check that table exists locally...
const table = this.storage.table(collectionName);
// Check that table exists remotely...
try {
// Just check, we don't need to keep the schema info around for now
await client.getCollectionInfo(threadID, collectionName);
// Assume our local schema matches the remote one...
// But we might throw on this later!
} catch (err) {
if (err.toString().includes("collection not found")) {
// We need to create it on the remote, maybe we haven't pushed yet?
// So we grab our local schema, which defaults to open if not specified
const schema = await table.getSchema();
// And then we create a new remote collection to match it
await client.newCollection(threadID, collectionName, schema);
} else throw err;
}
// FIXME: We aren't getting information about failed transactions from the go service
// But some fixes on the go side should make it useful again
// * https://github.com/textileio/go-threads/pull/437
// * https://github.com/textileio/go-threads/pull/436#discussion_r489016111
// Filter changes by collection
const filtered = localChanges.where("name").equals(collectionName);
if ((await filtered.count()) < 1) {
return; // Early out if no changes
}
// For each change, create transaction item and switch on type
// FIXME: There are more idiomatic ways to do this, but dexie seems to
// swallow any errors thrown inside an async function within a transaction
// so we do most of this outside a transaction and just delete the changes
// if the overall remote transaction is successful.
const trans = client.writeTransaction(threadID, collectionName);
try {
// See above, we need to actually materialize the array it seems?
const changes = await filtered.toArray();
await trans.start();
let count = 0;
for (const obj of changes) {
switch (obj.type) {
case "put": {
// FIXME: We can't save known objects, and all objects are unknown the first time
// we push to remote?!
// FIXME: https://github.com/textileio/go-threads/issues/440
// TODO: https://github.com/textileio/go-threads/pull/450
try {
// await trans.save([obj.after]);
await client.save(threadID, collectionName, [obj.after]);
// FIXME: Workaround: we check first, and if error, try to create
await trans.verify([obj.after]);
await trans.save([obj.after]);
break;
} catch (err) {
// TODO: Should we enable this on the go end?
// can't save unkown instance
// sic "unkown"
if (!err.message.includes("validation failed")) {
console.error(err);
break;
}
// Pass thru to add
console.error(err);
}
}
case "add": {
// await trans.create([obj.after]);
await client.create(threadID, collectionName, [obj.after]);
await trans.create([obj.after]);
break;
}
case "delete": {
try {
// await trans.delete([obj.key]);
await client.delete(threadID, collectionName, [obj.key]);
await trans.delete([obj.key]);
} catch (err) {
// TODO: Should this actually be ok on the go end?
// instance not found
console.error(err);
// TODO: https://github.com/textileio/go-threads/pull/450
console.error(err); // We'll ignore this though
break;
}
}
Expand All @@ -359,10 +420,10 @@ export class Remote {
// Won't know why we made it this far, so just use a generic error
if (count !== deleted) throw Errors.ChangeError;
// We can safely end the transaction
// await trans.end();
await trans.end();
} catch (err) {
// In theory, err will be due to remote transaction calls... abort!
// await trans.abort();
await trans.discard();
throw err; // Rethrow for external consumers
}
// TODO: Maybe return updated hash of head update.
Expand All @@ -382,8 +443,9 @@ export class Remote {
if (!(await changes.count())) return;
// Get a reference to stash table for storing changes
const stash = this.storage.table(StashTableName);
const array = await changes.toArray();
// Move change set to stash table, useful for rebasing later
await stash.bulkPut(await changes.toArray());
await stash.bulkPut(array);
// Clear out local changes
return changes.clear();
}
Expand Down Expand Up @@ -414,6 +476,9 @@ export class Remote {
const stash = tx.table<Change, string>(StashTableName);
const table = tx.table(collectionName);
const filtered = stash.where("name").equals(collectionName);
if ((await filtered.count()) < 1) {
return; // Early out if no changes
}
// TODO: This can be optimized big time!
for (const obj of await filtered.toArray()) {
if (obj.type === "delete") {
Expand Down Expand Up @@ -457,10 +522,10 @@ export class Remote {
// Blast thru provided collection names...
// TODO: Yes, I know this is all extremely sub-optimal!
for (const collectionName of collections) {
const { instancesList } = await client.find(threadID, collectionName, {});
const instances = await client.find(threadID, collectionName, {});
const table = this.storage.table(collectionName);
// Remote is our source of thruth, we completely overwrite anything local that is different
const keys = await table.bulkPut(instancesList, { allKeys: true });
const keys = await table.bulkPut(instances, { allKeys: true });
// Now we also need to drop anything locally that wasn't in our remote
await table.filter((obj) => !keys.includes(obj._id)).delete();
}
Expand Down
4 changes: 2 additions & 2 deletions src/remote/remote.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -275,8 +275,8 @@ describe("remote", function () {
const total = await dogs.count();
expect(total).to.equal(2);
const q = new Where("age").gt(0);
const { instancesList } = await client.find(threadID, "dogs", q);
expect(instancesList).to.have.lengthOf(2);
const instances = await client.find(threadID, "dogs", q);
expect(instances).to.have.lengthOf(2);
});

it("should pull changes from remote and automatically update local db", async function () {
Expand Down

0 comments on commit b631450

Please sign in to comment.