Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#14: Lazy cloning and caching of subdomains #16

Merged
merged 1 commit into from
Jul 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 9 additions & 7 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -35,14 +35,7 @@
"@11ty/eleventy-plugin-syntaxhighlight": "^5.0.0",
"@flydotio/dockerfile": "^0.3.3",
"@m-ld/io-web-build": "^0.2.0-0",
"@types/cryptr": "^4.0.1",
"@types/jest": "^29.2.4",
"@types/jsonwebtoken": "^9.0.2",
"@types/restify": "^8.5.5",
"@types/restify-errors": "^4.3.4",
"@types/supertest": "^2.0.12",
"@types/tmp": "^0.2.3",
"@types/yargs": "^17.0.13",
"jest": "^29.3.1",
"jest-mock-extended": "^3.0.1",
"memory-level": "^1.0.0",
Expand All @@ -52,9 +45,17 @@
"typescript": "^4.9.3"
},
"dependencies": {
"@types/cryptr": "^4.0.1",
"@m-ld/io-web-runtime": "^0.2.3",
"@types/jsonwebtoken": "^9.0.2",
"@m-ld/m-ld": "^0.10.0-edge.5",
"@types/lru-cache": "^5.1.1",
"@types/nodemailer": "^6.4.7",
"@types/restify": "^8.5.5",
"@types/restify-errors": "^4.3.4",
"@types/supertest": "^2.0.12",
"@types/tmp": "^0.2.3",
"@types/yargs": "^17.0.13",
"ably": "^1.2.33",
"abstract-level": "^1.0.3",
"classic-level": "^1.2.0",
Expand All @@ -66,6 +67,7 @@
"jsonwebtoken": "^9.0.1",
"liquidjs": "^10.8.4",
"loglevel": "^1.8.1",
"lru-cache": "^6.0.0",
"nodemailer": "^6.9.0",
"reflect-metadata": "^0.1.13",
"restify": "^11.0.0",
Expand Down
166 changes: 73 additions & 93 deletions src/server/Gateway.ts
Original file line number Diff line number Diff line change
@@ -1,40 +1,42 @@
import {
GraphSubject, MeldClone, MeldConfig, MeldReadState, MeldUpdate, propertyValue, Reference, uuid
MeldClone, MeldConfig, MeldReadState, MeldUpdate, propertyValue, Reference, uuid
} from '@m-ld/m-ld';
import {
AccountOwnedId, BaseGateway, BaseGatewayConfig, CloneFactory, Env, GatewayPrincipal, KeyStore
} from '../lib/index.js';
import { gatewayContext, Iri, UserKey } from '../data/index.js';
import LOG from 'loglevel';
import { access, rm, writeFile } from 'fs/promises';
import { finalize, Subscription } from 'rxjs';
import { Subscription } from 'rxjs';
import { BadRequestError, ConflictError, UnauthorizedError } from '../http/errors.js';
import { GatewayConfig } from './index.js';
import { Bite, Consumable } from 'rx-flowable';
import { Account, AccountContext, RemotesAuthType, SubdomainNaming } from './Account.js';
import { accountHasSubdomain } from './statements.js';
import { SubdomainClone } from './SubdomainClone.js';
import { randomInt } from 'crypto';
import jsonwebtoken, { JwtPayload } from 'jsonwebtoken';
import Cryptr from 'cryptr';
import { Subdomain, SubdomainSpec } from '../data/Subdomain.js';
import { SubdomainCache } from './SubdomainCache';

export type Who = { acc: Account, keyid: string };
export interface Who {
acc: Account,
keyid: string
}

export class Gateway extends BaseGateway implements AccountContext {
public readonly me: GatewayPrincipal;
public /*readonly*/ domain: MeldClone;
public readonly config: GatewayConfig;

private readonly subdomains: { [name: string]: SubdomainClone } = {};
private readonly subs: Subscription = new Subscription();

// noinspection JSUnusedGlobalSymbols keyStore is in account context
constructor(
private readonly env: Env,
config: GatewayConfig,
private readonly cloneFactory: CloneFactory,
readonly keyStore: KeyStore
readonly keyStore: KeyStore,
private readonly subdomainCache: SubdomainCache
) {
super(config['@domain']!);
LOG.debug('Gateway domain is', this.domainName);
Expand All @@ -53,55 +55,23 @@ export class Gateway extends BaseGateway implements AccountContext {
name: this.rootAccountName, keyids: [this.me.authKey.keyid]
}).toJSON()
);
// Enliven all subdomains and connectors already in the domain
await new Promise(resolve => {
this.subs.add(this.domain.read(
state => this.initDomain(state).then(resolve),
(update, state) => this.onUpdateDomain(update, state).then()
));
});
this.subs.add(this.domain.follow(update =>
this.onUpdateSubdomains(update).then()));
return this;
}

get rootAccountName() {
return this.me.authKey.appId.toLowerCase();
}

initDomain(state: MeldReadState) {
// Subdomains are the range of the 'subdomain' Account property
return this.readAsync(state.read({
'@select': '?d', '@where': { 'subdomain': '?d' }
}).consume, ({ value, next }) => {
this.subdomainAdded(state, this.ownedRefAsId(<Reference>value['?d'])).finally(next);
});
}

onUpdateDomain(update: MeldUpdate, state: MeldReadState) {
return this.onUpdateSubdomains(update, state);
}

onUpdateSubdomains(update: MeldUpdate, state: MeldReadState) {
onUpdateSubdomains(update: MeldUpdate) {
// Watch for subdomains appearing and disappearing
// noinspection JSCheckFunctionSignatures
return Promise.all([
...update['@delete'].map(subject => Promise.all(
return Promise.all(
update['@delete'].map(subject => Promise.all(
propertyValue(subject, 'subdomain', Array, Reference).map(tsRef =>
this.subdomainRemoved(this.ownedRefAsId(tsRef))))),
...update['@insert'].map(subject => Promise.all(
propertyValue(subject, 'subdomain', Array, Reference).map(tsRef =>
this.subdomainAdded(state, this.ownedRefAsId(tsRef)))))
]);
}

/**
* Hoop-jumping to ensure that an asynchronous read does not throw an
* unhandled exception if the gateway is closed too soon.
*/
readAsync(results: Consumable<GraphSubject>, sub: (value: Bite<GraphSubject>) => void) {
return new Promise<void>(resolve => {
// noinspection JSCheckFunctionSignatures
this.subs.add(results.pipe(finalize(resolve)).subscribe(sub));
});
this.subdomainRemoved(this.ownedRefAsId(tsRef)))))
);
}

// TODO: implement this in timeld
Expand Down Expand Up @@ -137,30 +107,17 @@ export class Gateway extends BaseGateway implements AccountContext {
return this.env.readyPath('data', 'domain', id.account, id.name);
}

async subdomainAdded(state: MeldReadState, id: AccountOwnedId) {
if (!(id.toDomain() in this.subdomains)) {
try {
const src = await state.get(id.toIri());
LOG.debug('Loading declared subdomain', id);
await this.cloneSubdomain(Subdomain.fromJSON(src));
LOG.info('Loaded declared subdomain', id);
} catch (e) {
// If the clone fails that's fine, we'll try again if it's asked for
LOG.warn('Failed to load declared subdomain', id, e);
}
}
}

async subdomainRemoved(id: AccountOwnedId) {
try {
await this.subdomains[id.toDomain()]?.close();
// Not relying on cache dispose, we want to wait for the close
await this.subdomainCache.peek(id.toDomain())?.close();
const path = await this.getDataPath(id);
// Remove the persistent data
await rm(path, { recursive: true, force: true });
// Write the tombstone file to prevent re-creation
await writeFile(`${path}.rip`, '');
// TODO: Remove all channel permissions
delete this.subdomains[id.toDomain()];
this.subdomainCache.del(id.toDomain());
LOG.info('Removed declared subdomain', id);
} catch (e) {
LOG.warn('Error removing declared subdomain', id, e);
Expand Down Expand Up @@ -223,7 +180,7 @@ export class Gateway extends BaseGateway implements AccountContext {
const { sub, email } =
jsonwebtoken.verify(jwt, this.me.authKey.secret) as JwtPayload;
if (!AccountOwnedId.isComponentId(sub))
throw new BadRequestError
throw new BadRequestError;
return { user: sub, email };
}

Expand All @@ -243,50 +200,70 @@ export class Gateway extends BaseGateway implements AccountContext {
who?: Who
): Promise<Partial<MeldConfig>> {
const id = this.ownedId(spec);
const sdDomain = id.toDomain();
const remotesAuth: RemotesAuthType[] = [];
if (naming === 'any') {
// Use m-ld write locking to guard against API race conditions
await this.domain.write(async state => {
// Do we already have a clone of this subdomain?
let sdc = this.subdomains[sdDomain];
if (sdc == null) {
// Use m-ld write locking to guard against API race conditions
await this.domain.write(async state => {
if (naming === 'any') {
// Does this subdomain already exist in its account?
const src = await state.get(id.toRelativeIri()); // Genesis if null
let sdClone = this.subdomainCache.get(id.toDomain());
if (sdClone == null) {
// Check that this subdomain has not existed before
if (await this.tsTombstoneExists(id))
if (src == null && await this.tsTombstoneExists(id))
throw new ConflictError('Cannot re-use domain name');
sdc = await this.cloneSubdomain(spec, true);
sdClone = await this.cloneSubdomain(spec, src == null);
// Ensure that the clone is online to avoid race with the client
await sdc.clone.status.becomes({ online: true });
await sdClone.clone.status.becomes({ online: true });
// Ensure the subdomain is in the domain
state = await state.write(accountHasSubdomain(sdc));
if (sdc.useSignatures && who != null) {
state = await state.write({
'@id': id.account, subdomain: sdClone.toJSON()
});
if (sdClone.useSignatures && who != null) {
// Ensure that the user account is in the subdomain for signing
const userKey = await who.acc.key(state, who.keyid);
await this.writePrincipalToSubdomain(
sdc, who.acc.name, 'Account', userKey);
sdClone, who.acc.name, 'Account', userKey);
}
remotesAuth.push(...await Account.getDetails(
state, spec.account, 'remotesAuth'));
} else if (spec.useSignatures != null && sdc.useSignatures !== spec.useSignatures) {
this.subdomainCache.set(id.toDomain(), sdClone);
} else if (src != null &&
spec.useSignatures != null &&
Subdomain.fromJSON(src).useSignatures !== spec.useSignatures) {
throw new ConflictError('Cannot change use of signatures after creation');
}
});
} else {
}
remotesAuth.push(...await Account.getDetails(
this.domain, spec.account, 'remotesAuth'));
}
state, spec.account, 'remotesAuth'));
});
// Return the config required for a new clone, using some of our config
return Object.assign({
'@domain': sdDomain, genesis: naming !== 'any'
'@domain': this.ownedId(spec).toDomain(), genesis: naming !== 'any'
}, await this.cloneFactory.reusableConfig(this.config, remotesAuth, who));
}

async getSubdomain(id: AccountOwnedId) {
if (this.hasClonedSubdomain(id))
return this.subdomains[id.toDomain()];
getSubdomain(id: AccountOwnedId) {
return new Promise<SubdomainClone | undefined>((resolve, reject) => {
// Use a read lock to prevent concurrent cache manipulation
this.domain.read(async state => {
try { // First check the cache
const inCache = this.subdomainCache.get(id.toDomain());
if (inCache == null) {
const src = await state.get(id.toRelativeIri());
if (src != null) {
LOG.debug('Cloning declared subdomain', id);
const sdc = await this.cloneSubdomain(Subdomain.fromJSON(src), false);
this.subdomainCache.set(id.toDomain(), sdc);
return resolve(sdc);
}
}
return resolve(inCache);
} catch (e) {
reject(e);
}
});
});
}

async cloneSubdomain(spec: SubdomainSpec, genesis = false): Promise<SubdomainClone> {
private async cloneSubdomain(spec: SubdomainSpec, genesis = false): Promise<SubdomainClone> {
const id = this.ownedId(spec);
const config = Object.assign(Env.mergeConfig<BaseGatewayConfig>(this.config, {
'@id': uuid(), '@domain': id.toDomain(), '@context': false
Expand All @@ -302,19 +279,22 @@ export class Gateway extends BaseGateway implements AccountContext {
// Add our machine identity and key to the subdomain for signing
await this.writePrincipalToSubdomain(sdc, '/', 'Gateway', this.me.userKey);
}
return this.subdomains[id.toDomain()] = sdc;
// We could put sdc in the cache here, but prefer to leave that to the
// caller so that cache calls are co-located.
return sdc;
}

/** @internal for tests */
hasClonedSubdomain(id: AccountOwnedId) {
return id.toDomain() in this.subdomains;
return this.subdomainCache.has(id.toDomain());
}

close() {
async close() {
this.subs.unsubscribe();
// Close the gateway domain
return Promise.all([
await Promise.all([
this.domain?.close(),
...Object.values(this.subdomains).map(d => d.close())
this.subdomainCache.clear()
]);
}
}
Expand Down
23 changes: 23 additions & 0 deletions src/server/SubdomainCache.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
import LRUCache from 'lru-cache';
import { SubdomainClone } from './SubdomainClone';
import LOG from 'loglevel';
import { GatewayConfig } from './index';

/**
* All mutating access to the cache should be serialised with m-ld states
*/
export class SubdomainCache extends LRUCache<string, SubdomainClone> {
constructor(config: GatewayConfig) {
super({
max: config.subdomainCacheSize ?? 100,
dispose(_name: string, sdc: SubdomainClone) {
sdc.close().catch(err => LOG.warn(err));
},
});
}

async clear() {
await Promise.all(this.values().map(sdc => sdc.close()));
super.reset();
}
}
11 changes: 8 additions & 3 deletions src/server/SubdomainClone.ts
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ export class SubdomainClone extends Subdomain {
private readonly queueStore: AbstractSublevel<BackendLevel, unknown, string, any>;
private _state: CloneState;
private events = new EventEmitter;
private _closing = false;

constructor(
spec: SubdomainSpec,
Expand Down Expand Up @@ -153,7 +154,8 @@ export class SubdomainClone extends Subdomain {
else
this._clone.write(state =>
this.doAndStayLocked({ state, lock: 'write' },
() => doWrite(state))).catch(reject);
() => doWrite(state))
).catch(reject);
});
}

Expand All @@ -166,7 +168,10 @@ export class SubdomainClone extends Subdomain {
this.events.emit('lockRelease');
}

close() {
return this._clone.close();
async close() {
if (!this._closing) {
this._closing = true;
await this._clone.close();
}
}
}
Loading