Skip to content

Commit

Permalink
Add IndexProvider to connection provider
Browse files Browse the repository at this point in the history
  • Loading branch information
MXPOL committed Mar 19, 2024
1 parent d9c3452 commit 2815e73
Show file tree
Hide file tree
Showing 7 changed files with 197 additions and 57 deletions.
9 changes: 3 additions & 6 deletions apps/velo-external-db/test/drivers/index_api_rest_matchers.ts
Original file line number Diff line number Diff line change
@@ -1,9 +1,6 @@
import { indexSpi } from '@wix-velo/velo-external-db-core'
const { IndexFieldOrder, IndexStatus } = indexSpi

const responseWith = (matcher: any) => expect.objectContaining({ data: matcher })


const indexWith = (index: indexSpi.Index, extraProps: Partial<indexSpi.Index>) => ({
...index,
fields: index.fields.map(field => ({
Expand All @@ -14,7 +11,7 @@ const indexWith = (index: indexSpi.Index, extraProps: Partial<indexSpi.Index>) =
...extraProps
})

export const failedIndexCreationResponse = (index: indexSpi.Index) => expect.objectContaining({
export const failedIndexCreationResponse = (index: indexSpi.Index) => ({
index: indexWith(index, { status: IndexStatus.FAILED })
})

Expand All @@ -41,9 +38,9 @@ export const toHaveDefaultIndex = () => ({
})


export const createIndexResponseWith = (index: indexSpi.Index) => responseWith(({ index: indexWith(index, { status: IndexStatus.BUILDING }) }))
export const createIndexResponseWith = (index: indexSpi.Index) => ({ index: indexWith(index, { status: IndexStatus.BUILDING }) })

export const removeIndexResponse = () => responseWith(({}))
export const removeIndexResponse = () => (({}))

export const listIndexResponseWithFailedIndex = (index: indexSpi.Index) => {
return expect.arrayContaining([indexWith(index, { status: IndexStatus.FAILED })])
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ const axiosInstance = axios.create({

export const givenIndexes = async(collectionName: string, indexes: indexSpi.Index[], auth: any) => {
for (const index of indexes) {
await axiosInstance.post('/indexes/create', { dataCollectionId: collectionName, index } as indexSpi.CreateIndexRequest, auth)
await axiosInstance.post('/indexes/create', { collectionId: collectionName, index } as indexSpi.CreateIndexRequest, auth)
}
await Promise.all(indexes.map(index => indexCreated(collectionName, index.name, auth)))
}
Expand All @@ -20,5 +20,11 @@ const indexCreated = async(collectionName: string, indexName: string, auth: any)
})
}

export const createIndexFor = async(collectionName: string, index: indexSpi.Index, auth: any) => await
axiosInstance.post('/indexes/create', { collectionId: collectionName, index } as indexSpi.CreateIndexRequest, auth).then(res => res.data)

export const removeIndexFor = async(collectionName: string, indexName: string, auth: any) => await
axiosInstance.post('/indexes/remove', { collectionId: collectionName, indexName } as indexSpi.RemoveIndexRequest, auth).then(res => res.data)

export const retrieveIndexesFor = async(collectionName: string, auth: any) => await
axiosInstance.post('/indexes/list', { dataCollectionId: collectionName }, { transformRequest: auth.transformRequest }).then(res => res.data)
axiosInstance.post('/indexes/list', { collectionId: collectionName } as indexSpi.ListIndexesRequest, { transformRequest: auth.transformRequest }).then(res => res.data)
19 changes: 5 additions & 14 deletions apps/velo-external-db/test/e2e/app_index.e2e.spec.ts
Original file line number Diff line number Diff line change
Expand Up @@ -44,10 +44,7 @@ describe(`Velo External DB Index API: ${currentDbImplementationName()}`, () => {
await schema.givenCollection(ctx.collectionName, [ctx.column], authOwner)

// in-progress
await expect(axiosServer.post('/indexes/create', {
dataCollectionId: ctx.collectionName,
index: ctx.index
}, authOwner)).resolves.toEqual(matchers.createIndexResponseWith(ctx.index))
await expect(index.createIndexFor(ctx.collectionName, ctx.index, authOwner)).resolves.toEqual(matchers.createIndexResponseWith(ctx.index))

// active
await eventually(async() =>
Expand All @@ -68,22 +65,16 @@ describe(`Velo External DB Index API: ${currentDbImplementationName()}`, () => {

test('creation of index with invalid column should return the index with status failed', async() => {
await schema.givenCollection(ctx.collectionName, [ctx.column], authOwner)

await eventually(async() => await expect(axiosServer.post('/indexes/create', {
dataCollectionId: ctx.collectionName,
index: ctx.invalidIndex
}, authOwner).then(res => res.data)).resolves.toEqual(matchers.failedIndexCreationResponse(ctx.invalidIndex)))

await eventually(async() => await expect(index.createIndexFor(ctx.collectionName, ctx.invalidIndex, authOwner)).resolves.toEqual(matchers.failedIndexCreationResponse(ctx.invalidIndex)))
})

test('remove', async() => {
await schema.givenCollection(ctx.collectionName, [ctx.column], authOwner)
await index.givenIndexes(ctx.collectionName, [ctx.index], authOwner)

await expect(axiosServer.post('/indexes/remove', {
dataCollectionId: ctx.collectionName,
indexName: ctx.index.name
}, authOwner)).resolves.toEqual(matchers.removeIndexResponse()).catch()

await expect( index.removeIndexFor(ctx.collectionName, ctx.index.name, authOwner)).resolves.toEqual(matchers.removeIndexResponse())

await expect(index.retrieveIndexesFor(ctx.collectionName, authOwner)).resolves.not.toEqual(matchers.listIndexResponseWith([ctx.index]))
})

Expand Down
3 changes: 3 additions & 0 deletions libs/external-db-postgres/src/connection_provider.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ import FilterParser from './sql_filter_transformer'
import DatabaseOperations from './postgres_operations'
import { PostgresConfig, postgresPoolOptions } from './types'
import { ILogger } from '@wix-velo/external-db-logger'
import IndexProvider from './postgres_index_provider'

types.setTypeParser(builtins.NUMERIC, val => parseFloat(val))

Expand All @@ -32,12 +33,14 @@ export default (cfg: PostgresConfig, _poolOptions: postgresPoolOptions, logger?:
const databaseOperations = new DatabaseOperations(pool)
const dataProvider = new DataProvider(pool, filterParser, logger)
const schemaProvider = new SchemaProvider(pool, logger)
const indexProvider = new IndexProvider(pool, logger)

return {
dataProvider,
schemaProvider,
databaseOperations,
connection: pool,
indexProvider,
cleanup: async() => pool.end(() => {})
}
}
Expand Down
164 changes: 164 additions & 0 deletions libs/external-db-postgres/src/postgres_index_provider.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,164 @@
import { Pool } from 'pg'
import { escapeIdentifier } from './postgres_utils'
import { errors } from '@wix-velo/velo-external-db-commons'
import { DomainIndex, IIndexProvider, DomainIndexStatus } from '@wix-velo/velo-external-db-types'
import { ILogger } from '@wix-velo/external-db-logger'


export default class IndexProvider implements IIndexProvider {
pool: Pool
logger?: ILogger

constructor(pool: any, logger?: ILogger) {
this.pool = pool
this.logger = logger
}

async list(collectionName: string): Promise<DomainIndex[]> {
const activeIndexes = await this.getActiveIndexesFor(collectionName)
const inProgressIndexes = await this.getInProgressIndexesFor(collectionName)
const indexes = { ...inProgressIndexes, ...activeIndexes }
return Object.values(indexes)
}

async create(collectionName: string, index: DomainIndex): Promise<DomainIndex> {
const unique = index.isUnique ? 'UNIQUE' : ''

const columnsToIndex = await Promise.all(index.columns.map(async(col: string) => {
return {
name: col,
partialString: await this.partialStringFor(col, collectionName)
}
}))

const createIndexPromise = this.pool.query(`CREATE ${unique} INDEX ${escapeIdentifier(index.name)} ON ${escapeIdentifier(collectionName)} (${columnsToIndex.map((col: { name: string, partialString: string }) => `${escapeIdentifier(col.name)}${col.partialString}`)})`)

const status = await this.returnStatusAfterXSeconds(1, createIndexPromise, index)

return { ...index, status }
}

async remove(collectionName: string, indexName: string): Promise<void> {
await this.pool.query(`DROP INDEX ${escapeIdentifier(indexName)}`)
.catch(e => { throw this.translateErrorCodes(e) })
}


private async getActiveIndexesFor(collectionName: string): Promise<{ [x: string]: DomainIndex }> {
const { rows } = await this.pool.query('SELECT * FROM pg_indexes WHERE schemaname = current_schema() AND tablename = $1', [collectionName])
.catch(err => { throw this.translateErrorCodes(err) })


const indexs: { [x: string]: DomainIndex } = {}

// postgres return the following properties for each index:
type IndexRow = {
// Table name
tablename: string
// Index name
indexname: string
// Index creation command
indexdef: string
}

rows.forEach((r: IndexRow) => {
if (!indexs[r.indexname]) {
indexs[r.indexname] = {
name: r.indexname,
columns: [],
isUnique: r.indexdef.includes('UNIQUE'),
caseInsensitive: false,
order: 'ASC',
status: DomainIndexStatus.ACTIVE
}
}
// TODO: extract this column extraction to a function
indexs[r.indexname].columns.push(r.indexdef.split('ON')[1].split('(')[1].split(')')[0])
})

return indexs
}

private async getInProgressIndexesFor(_collectionName: string): Promise<{ [x: string]: DomainIndex }> {
// const query = `
// SELECT
// now()::TIME(0) AS "Current Time",
// a.query,
// p.phase,
// round(p.blocks_done / p.blocks_total::numeric * 100, 2) AS "% Done",
// p.blocks_total,
// p.blocks_done,
// p.tuples_total,
// p.tuples_done,
// ai.schemaname,
// ai.relname AS tablename,
// ai.indexrelname AS "Index Name"
// FROM pg_stat_progress_create_index p
// JOIN pg_stat_activity a ON p.pid = a.pid
// LEFT JOIN pg_stat_all_indexes ai ON ai.relid = p.relid AND ai.indexrelid = p.index_relid
// WHERE ai.relname = $1;
// `
// const query2 = `
// SELECT *
// FROM pg_stat_activity
// WHERE state LIKE '%vacuum%' OR state LIKE '%autovacuum%';
// `


// const { rows } = await this.pool.query(query2)
// .catch(err => { throw this.translateErrorCodes(err) })
// console.dir({
// rows
// }, { depth: null })
return {}
// const databaseName = this.pool.config.connectionConfig.database
// const inProgressIndexes = await this.query('SELECT * FROM information_schema.processlist WHERE db = ? AND info LIKE \'CREATE%INDEX%\'', [databaseName])
// const domainIndexesForCollection = inProgressIndexes.map((r: any) => this.extractIndexFromQueryForCollection(collectionName, r.INFO)).filter(Boolean) as DomainIndex[]
// return domainIndexesForCollection.reduce((acc, index) => {
// acc[index.name] = index
// return acc
// }, {} as { [x: string]: DomainIndex })
}

private async returnStatusAfterXSeconds(x: number, promise: Promise<any>, _index: DomainIndex): Promise<DomainIndexStatus> {
return new Promise((resolve, _reject) => {
promise.catch((e: any) => {
this.logger?.error('failed to create index', this.translateErrorCodes(e))
resolve(DomainIndexStatus.FAILED)
})

setTimeout(() => {
resolve(DomainIndexStatus.BUILDING)
}, x * 1000)
})
}

private translateErrorCodes(e: any) {
switch (e.code) {
case '42P07':
return new errors.IndexAlreadyExists(`Index already exists: ${e.sqlMessage}`)
case '42703':
return new errors.FieldDoesNotExist(`Field does not exist: ${e.sqlMessage}`)
default:
console.log(e)
return new errors.UnrecognizedError(`Error while creating index: ${e} ${e.code}`)
}
}

private async partialStringFor(_col: string, _collectionName: string) {
return ''
// const typeResp = await this.query('SELECT DATA_TYPE FROM information_schema.COLUMNS WHERE TABLE_NAME = ? AND COLUMN_NAME = ?', [collectionName, col]).catch(_e => [])
// const type = typeResp[0]?.DATA_TYPE

// if (this.isTextType(type)) {
// const lengthResp = await this.query('SELECT CHARACTER_MAXIMUM_LENGTH FROM information_schema.COLUMNS WHERE TABLE_NAME = ? AND COLUMN_NAME = ?', [collectionName, col])
// const length = lengthResp[0].CHARACTER_MAXIMUM_LENGTH
// if (length) {
// return length > 767 ? '(767)' : `(${length})` // 767 is the max length for a text index
// }
// }
// return ''
}


}
6 changes: 3 additions & 3 deletions libs/velo-external-db-core/src/router.ts
Original file line number Diff line number Diff line change
Expand Up @@ -337,7 +337,7 @@ export const createRouter = () => {

router.post('/v3/indexes/list', async(req, res, next) => {
try {
const { dataCollectionId: collectionId } = req.body as ListIndexesRequest
const { collectionId } = req.body as ListIndexesRequest
const indexes = await indexService.list(collectionId)
res.json(indexes)
} catch (e) {
Expand All @@ -347,7 +347,7 @@ export const createRouter = () => {

router.post('/v3/indexes/create', async(req, res, next) => {
try {
const { dataCollectionId: collectionId, index } = req.body as CreateIndexRequest
const { collectionId, index } = req.body as CreateIndexRequest
const createdIndex = await indexService.create(collectionId, index)
res.json({
index: createdIndex
Expand All @@ -359,7 +359,7 @@ export const createRouter = () => {

router.post('/v3/indexes/remove', async(req, res, next) => {
try {
const { dataCollectionId: collectionId, indexName } = req.body as RemoveIndexRequest
const { collectionId, indexName } = req.body as RemoveIndexRequest
await indexService.remove(collectionId, indexName)
res.json({})
} catch (e) {
Expand Down
43 changes: 11 additions & 32 deletions libs/velo-external-db-core/src/spi-model/indexing.ts
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ export interface Index {
}

export enum IndexStatus {
UNKNOWN = 0,
BUILDING = 1,
ACTIVE = 2,
DROPPING = 3,
DROPPED = 4,
FAILED = 5,
INVALID = 6
UNKNOWN = 'UNKNOWN',
BUILDING = 'BUILDING',
ACTIVE = 'ACTIVE',
DROPPING = 'DROPPING',
DROPPED = 'DROPPED',
FAILED = 'FAILED',
INVALID = 'INVALID'
}

export interface IndexField {
Expand All @@ -45,50 +45,29 @@ export enum IndexFieldOrder {

interface ApplicationError {
code: string,
description: string,
errorMessage: string,
data: any
}

export abstract class IndexingService {
abstract List(req: ListIndexesRequest): Promise<ListIndexesResponse> //stream of Indexes
abstract Create(req: CreateIndexRequest): Promise<CreateIndexResponse>
abstract Remove(req: RemoveIndexRequest): Promise<RemoveIndexResponse>
}

export interface ListIndexesRequest {
// collection to list indexes from
dataCollectionId: string;
// optional namespace assigned to collection/installation
namespace?: string;
// slower read but consistent with recent updates
consistentRead: boolean;
collectionId: string;
}

export interface ListIndexesResponse {
// stream of Indexes
index: Index[];
}

export interface CreateIndexRequest {
// collection to list indexes from
dataCollectionId: string;
// optional namespace assigned to collection/installation
namespace?: string;
// index definition
collectionId: string;
index: Index;
}

export interface CreateIndexResponse {
// created index and it's status
index: Index;
}

export interface RemoveIndexRequest {
// collection to delete index from
dataCollectionId: string;
// optional namespace assigned to collection/installation
namespace?: string;
// index name
collectionId: string;
indexName: string;
}

Expand Down

0 comments on commit 2815e73

Please sign in to comment.