Skip to content

Commit

Permalink
Revert "Separate api for getting tenant gitManager. (microsoft#14290)"
Browse files Browse the repository at this point in the history
This reverts commit d368f26.
  • Loading branch information
tylerbutler committed Mar 29, 2023
1 parent 0d7687f commit 00693c6
Show file tree
Hide file tree
Showing 9 changed files with 186 additions and 239 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -97,10 +97,9 @@ export class DeliLambdaFactory extends EventEmitter implements IPartitionLambdaF
let gitManager: IGitManager;
let document: IDocument;

try {
// Lookup the last sequence number stored
// TODO - is this storage specific to the orderer in place? Or can I generalize the output context?
document = await this.collection.findOne({ documentId, tenantId });
try {
const tenant = await this.tenantManager.getTenant(tenantId, documentId);
gitManager = tenant.gitManager;

// Check if the document was deleted prior.
if (!isDocumentValid(document)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -99,8 +99,9 @@ export class ScribeLambdaFactory extends EventEmitter implements IPartitionLambd
this.serviceConfiguration,
);

try {
document = await this.documentCollection.findOne({ documentId, tenantId });
try {
const tenant = await this.tenantManager.getTenant(tenantId, documentId);
gitManager = tenant.gitManager;

if (!isDocumentValid(document)) {
// Document sessions can be joined (via Alfred) after a document is functionally deleted.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,33 +73,24 @@ describe("Routerlicious", () => {
messageFactory = new MessageFactory(testDocumentId, testClientId, testTenantId);
kafkaMessageFactory = new KafkaMessageFactory();

const testData = [
{
documentId: testDocumentId,
tenantId: testTenantId,
sequenceNumber: 0,
logOffset: undefined,
},
];
const dbFactory = new TestDbFactory(_.cloneDeep({ documents: testData }));
testMongoManager = new MongoManager(dbFactory);
const database = await testMongoManager.getDatabase();
testDocumentCollection = database.collection("documents");
testMessageCollection = new TestCollection([]);
testKafka = new TestKafka();
testProducer = testKafka.createProducer();
testTenantManager = new TestTenantManager();
testGitManager = (await testTenantManager.getTenantGitManager(
testTenantId,
testDocumentId,
)) as GitManager;
const createTreeEntry: ICreateTreeEntry[] = [];
const requestBody: ICreateTreeParams = {
tree: createTreeEntry,
};
tree = await testGitManager.createGitTree(requestBody);
testGitManager.addTree(tree);
const testDeltaManager = new TestDeltaManager();
const testData = [{ documentId: testDocumentId, tenantId: testTenantId, sequenceNumber: 0, logOffset: undefined }];
const dbFactory = new TestDbFactory(_.cloneDeep({ documents: testData }));
testMongoManager = new MongoManager(dbFactory);
const database = await testMongoManager.getDatabase();
testDocumentCollection = database.collection("documents");
testMessageCollection = new TestCollection([]);
testKafka = new TestKafka();
testProducer = testKafka.createProducer();
testTenantManager = new TestTenantManager();
const tenant = await testTenantManager.getTenant(testTenantId, testDocumentId);
testGitManager = tenant.gitManager as GitManager;
const createTreeEntry: ICreateTreeEntry[] = [];
const requestBody: ICreateTreeParams = {
tree: createTreeEntry,
};
tree = await testGitManager.createGitTree(requestBody);
testGitManager.addTree(tree);
const testDeltaManager = new TestDeltaManager();

let factory = new ScribeLambdaFactory(
testMongoManager,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,14 +79,14 @@ export class DeltaService implements IDeltaService {
return dbDeltas.map((delta) => delta.operation);
}

public async getDeltasFromSummaryAndStorage(
collectionName: string,
tenantId: string,
documentId: string,
from?: number,
to?: number,
) {
const gitManager = await this.tenantManager.getTenantGitManager(tenantId, documentId);
public async getDeltasFromSummaryAndStorage(
collectionName: string,
tenantId: string,
documentId: string,
from?: number,
to?: number) {
const tenant = await this.tenantManager.getTenant(tenantId, documentId);
const gitManager = tenant.gitManager;

const existingRef = await gitManager.getRef(encodeURIComponent(documentId));
if (!existingRef) {
Expand Down
13 changes: 4 additions & 9 deletions server/routerlicious/packages/services-core/src/tenant.ts
Original file line number Diff line number Diff line change
Expand Up @@ -86,15 +86,10 @@ export interface ITenantManager {
*/
getTenant(tenantId: string, documentId: string): Promise<ITenant>;

/**
* Retrieves GitManager instance for the given tenant
*/
getTenantGitManager(tenantId: string, documentId: string): Promise<IGitManager>;

/**
* Verifies that the given auth token is valid. A rejected promise indicates an invalid token.
*/
verifyToken(tenantId: string, token: string): Promise<void>;
/**
* Verifies that the given auth token is valid. A rejected promise indicates an invalid token.
*/
verifyToken(tenantId: string, token: string): Promise<void>;

/**
* Retrieves the key for the given tenant. This is a privileged op and should be used with care.
Expand Down
162 changes: 75 additions & 87 deletions server/routerlicious/packages/services-shared/src/storage.ts
Original file line number Diff line number Diff line change
Expand Up @@ -116,20 +116,21 @@ export class DocumentStorage implements IDocumentStorage {
};
}

public async createDocument(
tenantId: string,
documentId: string,
appTree: ISummaryTree,
sequenceNumber: number,
term: number,
initialHash: string,
ordererUrl: string,
historianUrl: string,
deltaStreamUrl: string,
values: [string, ICommittedProposal][],
enableDiscovery: boolean = false,
): Promise<IDocumentDetails> {
const gitManager = await this.tenantManager.getTenantGitManager(tenantId, documentId);
public async createDocument(
tenantId: string,
documentId: string,
appTree: ISummaryTree,
sequenceNumber: number,
term: number,
initialHash: string,
ordererUrl: string,
historianUrl: string,
deltaStreamUrl: string,
values: [string, ICommittedProposal][],
enableDiscovery: boolean = false,
): Promise<IDocumentDetails> {
const tenant = await this.tenantManager.getTenant(tenantId, documentId);
const gitManager = tenant.gitManager;

const lumberjackProperties = {
[BaseTelemetryProperties.tenantId]: tenantId,
Expand Down Expand Up @@ -278,41 +279,31 @@ export class DocumentStorage implements IDocumentStorage {
};
}

public async getVersions(
tenantId: string,
documentId: string,
count: number,
): Promise<ICommitDetails[]> {
const gitManager = await this.tenantManager.getTenantGitManager(tenantId, documentId);
public async getVersions(tenantId: string, documentId: string, count: number): Promise<ICommitDetails[]> {
const tenant = await this.tenantManager.getTenant(tenantId, documentId);
const gitManager = tenant.gitManager;

return gitManager.getCommits(documentId, count);
}

public async getVersion(tenantId: string, documentId: string, sha: string): Promise<ICommit> {
const gitManager = await this.tenantManager.getTenantGitManager(tenantId, documentId);
public async getVersion(tenantId: string, documentId: string, sha: string): Promise<ICommit> {
const tenant = await this.tenantManager.getTenant(tenantId, documentId);
const gitManager = tenant.gitManager;

return gitManager.getCommit(sha);
}

public async getFullTree(
tenantId: string,
documentId: string,
): Promise<{ cache: IGitCache; code: string }> {
const gitManager = await this.tenantManager.getTenantGitManager(tenantId, documentId);
const versions = await gitManager.getCommits(documentId, 1);
if (versions.length === 0) {
return {
cache: {
blobs: [],
commits: [],
refs: { [documentId]: null as unknown as string },
trees: [],
},
code: null as unknown as string,
};
}
public async getFullTree(tenantId: string, documentId: string): Promise<{ cache: IGitCache; code: string; }> {
const tenant = await this.tenantManager.getTenant(tenantId, documentId);
const versions = await tenant.gitManager.getCommits(documentId, 1);
if (versions.length === 0) {
return {
cache: { blobs: [], commits: [], refs: { [documentId]: (null as unknown) as string }, trees: [] },
code: (null as unknown) as string,
};
}

const fullTree = await gitManager.getFullTree(versions[0].sha);
const fullTree = await tenant.gitManager.getFullTree(versions[0].sha);

let code: string = null as unknown as string;
if (fullTree.quorumValues) {
Expand Down Expand Up @@ -412,51 +403,48 @@ export class DocumentStorage implements IDocumentStorage {
}
}

private async readFromSummary(tenantId: string, documentId: string): Promise<boolean> {
const gitManager = await this.tenantManager.getTenantGitManager(tenantId, documentId);
const existingRef = await gitManager.getRef(encodeURIComponent(documentId));
if (existingRef) {
// Fetch ops from logTail and insert into deltas collection.
// TODO: Make the rest endpoint handle this case.
const opsContent = await gitManager.getContent(
existingRef.object.sha,
".logTail/logTail",
);
const ops = JSON.parse(
Buffer.from(
opsContent.content,
Buffer.isEncoding(opsContent.encoding) ? opsContent.encoding : undefined,
).toString(),
) as ISequencedDocumentMessage[];
const dbOps: ISequencedOperationMessage[] = ops.map((op: ISequencedDocumentMessage) => {
return {
documentId,
operation: op,
tenantId,
type: SequencedOperationType,
mongoTimestamp: new Date(op.timestamp),
};
});
const opsCollection = await this.databaseManager.getDeltaCollection(
tenantId,
documentId,
);
await opsCollection.insertMany(dbOps, false).catch(async (error) => {
// Duplicate key errors are ignored
if (error.code !== 11000) {
// Needs to be a full rejection here
return Promise.reject(error);
}
});
winston.info(`Inserted ${dbOps.length} ops into deltas DB`);
const lumberjackProperties = {
[BaseTelemetryProperties.tenantId]: tenantId,
[BaseTelemetryProperties.documentId]: documentId,
};
Lumberjack.info(`Inserted ${dbOps.length} ops into deltas DB`, lumberjackProperties);
return true;
} else {
return false;
}
}
private async readFromSummary(tenantId: string, documentId: string): Promise<boolean> {
const tenant = await this.tenantManager.getTenant(tenantId, documentId);
const gitManager = tenant.gitManager;
const existingRef = await gitManager.getRef(encodeURIComponent(documentId));
if (existingRef) {
// Fetch ops from logTail and insert into deltas collection.
// TODO: Make the rest endpoint handle this case.
const opsContent = await gitManager.getContent(existingRef.object.sha, ".logTail/logTail");
const ops = JSON.parse(
Buffer.from(
opsContent.content,
Buffer.isEncoding(opsContent.encoding) ? opsContent.encoding : undefined,
).toString(),
) as ISequencedDocumentMessage[];
const dbOps: ISequencedOperationMessage[] = ops.map((op: ISequencedDocumentMessage) => {
return {
documentId,
operation: op,
tenantId,
type: SequencedOperationType,
mongoTimestamp: new Date(op.timestamp),
};
});
const opsCollection = await this.databaseManager.getDeltaCollection(tenantId, documentId);
await opsCollection
.insertMany(dbOps, false)
.catch(async (error) => {
// Duplicate key errors are ignored
if (error.code !== 11000) {
// Needs to be a full rejection here
return Promise.reject(error);
}
});
winston.info(`Inserted ${dbOps.length} ops into deltas DB`);
const lumberjackProperties = {
[BaseTelemetryProperties.tenantId]: tenantId,
[BaseTelemetryProperties.documentId]: documentId,
};
Lumberjack.info(`Inserted ${dbOps.length} ops into deltas DB`, lumberjackProperties);
return true;
} else {
return false;
}
}
}

0 comments on commit 00693c6

Please sign in to comment.