Skip to content

Commit

Permalink
Make source manager available to connectors.
Browse files Browse the repository at this point in the history
We need for future changes to be able to access and build queries from
any connector when resolving the queries in a specific connector. To
make our life easier we pass the source manager through the Source to
the connector
  • Loading branch information
andresgutgon committed May 14, 2024
1 parent 827e9f7 commit 95cd163
Show file tree
Hide file tree
Showing 97 changed files with 1,103 additions and 1,305 deletions.
21 changes: 21 additions & 0 deletions .changeset/sour-ghosts-fry.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
---
"@latitude-data/clickhouse-connector": major
"@latitude-data/databricks-connector": major
"@latitude-data/postgresql-connector": major
"@latitude-data/snowflake-connector": major
"@latitude-data/bigquery-connector": major
"@latitude-data/athena-connector": major
"@latitude-data/duckdb-connector": major
"@latitude-data/sqlite-connector": major
"@latitude-data/mssql-connector": major
"@latitude-data/test-connector": major
"@latitude-data/mysql-connector": major
"@latitude-data/trino-connector": major
"@latitude-data/source-manager": major
"@latitude-data/cli": minor
---

Pass Source to connectors to get access to source details and also to source manager
This is a breaking change. Before connectors were receiving `rootPath` of their
queries and now this info is obta1ined from the source.

1 change: 0 additions & 1 deletion apps/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
"query": "vite-node scripts/run_query/index.ts"
},
"devDependencies": {
"@latitude-data/base-connector": "workspace:*",
"@latitude-data/test-connector": "workspace:^",
"@latitude-data/client": "workspace:*",
"@latitude-data/eslint-config": "workspace:*",
Expand Down
2 changes: 1 addition & 1 deletion apps/server/scripts/run_query/result_display.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import QueryResult from '@latitude-data/query_result'
import { screen, text, box, listtable, type Widgets } from 'blessed'
import { CompileError } from '@latitude-data/sql-compiler'
import { type CompiledQuery } from '@latitude-data/base-connector'
import { type CompiledQuery } from '@latitude-data/source-manager'

export default class QueryDisplay {
private static instance: QueryDisplay
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import computeRelativeQueryPath from './computeRelativeQueryPath'
import { QUERIES_DIR } from '$lib/server/sourceManager'
import { it, describe, expect } from 'vitest'
import { QUERIES_DIR } from '$lib/server/sourceManager'
import computeRelativeQueryPath from './computeRelativeQueryPath'

describe('computeRelativeQueryPath', () => {
it('should compute the relative query path correctly', () => {
Expand Down
106 changes: 61 additions & 45 deletions apps/server/src/lib/query_service/find_or_compute.test.ts
Original file line number Diff line number Diff line change
@@ -1,70 +1,58 @@
import TestConnector from '@latitude-data/test-connector'
import findOrCompute from './find_or_compute'
import fs from 'fs'
import mockFs from 'mock-fs'
import { QUERIES_DIR } from '$lib/server/sourceManager'
import { it, describe, beforeEach, afterEach, expect } from 'vitest'
import { vi } from 'vitest'
import { vi, it, describe, beforeEach, afterEach, expect } from 'vitest'
import { Source, SourceManager } from '@latitude-data/source-manager'
import TestConnector from '@latitude-data/test-connector'

import type { CompiledQuery, QueryRequest } from '@latitude-data/base-connector'
import { QUERIES_DIR } from '$lib/server/sourceManager'
import findOrCompute from './find_or_compute'

const runQuerySpy = vi.fn()
const connector = new TestConnector(QUERIES_DIR, {
onRunQuery: runQuerySpy,
const sourceManager = new SourceManager(QUERIES_DIR)
const source = new Source({
path: QUERIES_DIR,
schema: { type: 'test' },
sourceManager,
})

vi.mock('$lib/server/sourceManager', async (importOriginal) => {
return {
...((await importOriginal()) as Record<string, unknown>),
default: {
loadFromQuery: async () => {
return {
source: {
compileQuery: (request: QueryRequest) =>
connector.compileQuery(request),
runCompiledQuery: (compiledQuery: CompiledQuery) =>
connector.runCompiled(compiledQuery),
},
sourceFilePath: QUERIES_DIR,
}
},
},
}
const connector = new TestConnector({
source,
connectionParams: { fail: false },
})
source.setConnector(connector)

describe('Query cache', () => {
beforeEach(() => {
mockFs({
[QUERIES_DIR]: {
'source.yml': `
type: test
details:
host: localhost
port: 1234
`,
type: test
details:
host: localhost
port: 1234
`,
},
'/tmp/.latitude': {},
})
vi.resetAllMocks()
vi.restoreAllMocks()
})

afterEach(() => {
mockFs.restore()
runQuerySpy.mockClear()
})

it('Computes the same query only once', async () => {
fs.writeFileSync(`${QUERIES_DIR}/query.sql`, 'SELECT * FROM table')

const queryPath = 'query'
const queryParams = {}
const runQuerySpy = vi.spyOn(connector, 'runQuery')

// First call
await findOrCompute({ query: queryPath, queryParams, force: false })
await findOrCompute({ source, query: queryPath, queryParams, force: false })

expect(runQuerySpy).toHaveBeenCalledTimes(1)

// Second call
await findOrCompute({ query: queryPath, queryParams, force: false })
await findOrCompute({ source, query: queryPath, queryParams, force: false })
expect(runQuerySpy).toHaveBeenCalledTimes(1) // No additional calls
})

Expand All @@ -73,13 +61,14 @@ describe('Query cache', () => {

const queryPath = 'query'
const queryParams = {}
const runQuerySpy = vi.spyOn(connector, 'runQuery')

// First call
await findOrCompute({ query: queryPath, queryParams, force: false })
await findOrCompute({ source, query: queryPath, queryParams, force: false })
expect(runQuerySpy).toHaveBeenCalledTimes(1)

// Second call
await findOrCompute({ query: queryPath, queryParams, force: true })
await findOrCompute({ source, query: queryPath, queryParams, force: true })
expect(runQuerySpy).toHaveBeenCalledTimes(2) // Additional call
})

Expand All @@ -90,18 +79,39 @@ describe('Query cache', () => {
const queryPath1 = 'query1'
const queryPath2 = 'query2'
const queryParams = {}
const runQuerySpy = vi.spyOn(connector, 'runQuery')

// First call of query 1
await findOrCompute({ query: queryPath1, queryParams, force: false })
await findOrCompute({
source,
query: queryPath1,
queryParams,
force: false,
})
expect(runQuerySpy).toHaveBeenCalledTimes(1)

// First call of query 2
await findOrCompute({ query: queryPath2, queryParams, force: false })
await findOrCompute({
source,
query: queryPath2,
queryParams,
force: false,
})
expect(runQuerySpy).toHaveBeenCalledTimes(2) // Called again for query 2

// Second calls
await findOrCompute({ query: queryPath1, queryParams, force: false })
await findOrCompute({ query: queryPath2, queryParams, force: false })
await findOrCompute({
source,
query: queryPath1,
queryParams,
force: false,
})
await findOrCompute({
source,
query: queryPath2,
queryParams,
force: false,
})
expect(runQuerySpy).toHaveBeenCalledTimes(2) // No additional calls
})

Expand All @@ -111,9 +121,11 @@ describe('Query cache', () => {
const queryPath = 'query'
const queryParams1 = { foo: 'bar1' }
const queryParams2 = { foo: 'bar2' }
const runQuerySpy = vi.spyOn(connector, 'runQuery')

// First call with params 1
await findOrCompute({
source,
query: queryPath,
queryParams: queryParams1,
force: false,
Expand All @@ -122,6 +134,7 @@ describe('Query cache', () => {

// First call with params 2
await findOrCompute({
source,
query: queryPath,
queryParams: queryParams2,
force: false,
Expand All @@ -130,11 +143,13 @@ describe('Query cache', () => {

// Second calls
await findOrCompute({
source,
query: queryPath,
queryParams: queryParams1,
force: false,
})
await findOrCompute({
source,
query: queryPath,
queryParams: queryParams2,
force: false,
Expand All @@ -150,19 +165,20 @@ describe('Query cache', () => {

const queryPath = 'query'
const queryParams = { usedParam: 'bar', unusedParam: 'qux' }
const runQuerySpy = vi.spyOn(connector, 'runQuery')

// First call
await findOrCompute({ query: queryPath, queryParams, force: false })
await findOrCompute({ source, query: queryPath, queryParams, force: false })
expect(runQuerySpy).toHaveBeenCalledTimes(1)

// Changing the unused param
queryParams.unusedParam = 'qux2'
await findOrCompute({ query: queryPath, queryParams, force: false })
await findOrCompute({ source, query: queryPath, queryParams, force: false })
expect(runQuerySpy).toHaveBeenCalledTimes(1) // No additional calls

// Changing the used param
queryParams.usedParam = 'bar2'
await findOrCompute({ query: queryPath, queryParams, force: false })
await findOrCompute({ source, query: queryPath, queryParams, force: false })
expect(runQuerySpy).toHaveBeenCalledTimes(2) // Called again with different params
})
})
24 changes: 11 additions & 13 deletions apps/server/src/lib/query_service/find_or_compute.ts
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
import cache from './query_cache'
import sourceManager from '$lib/server/sourceManager'
import QueryResult from '@latitude-data/query_result'
import { CompiledQuery } from '@latitude-data/base-connector'
import { type CompiledQuery, type Source } from '@latitude-data/source-manager'
import cache from './query_cache'
import computeRelativeQueryPath from './computeRelativeQueryPath'

type Props = {
query: string
queryParams: Record<string, unknown>
force: boolean
}

export default async function findOrCompute({
source,
query,
queryParams,
force,
}: Props): Promise<{
}: {
source: Source
query: string
queryParams: Record<string, unknown>
force: boolean
}): Promise<{
queryResult: QueryResult
compiledQuery: CompiledQuery
}> {
const { source, sourceFilePath } = await sourceManager.loadFromQuery(query)
const compiledQuery = await source.compileQuery({
queryPath: computeRelativeQueryPath({
queryPath: query,
sourcePath: sourceFilePath,
sourcePath: source.path,
}),
params: queryParams,
})
Expand All @@ -43,7 +41,7 @@ export default async function findOrCompute({
queryResult = await compute()
} else {
queryResult =
cache.find(request, compiledQuery.config.ttl) || (await compute())
cache.find(request, compiledQuery.config?.ttl) || (await compute())
}

return {
Expand Down
2 changes: 1 addition & 1 deletion apps/server/src/lib/query_service/query_cache.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import CacheManager from '$lib/cache_manager'
import QueryResult from '@latitude-data/query_result'
import { QueryRequest } from '@latitude-data/base-connector'
import { QueryRequest } from '@latitude-data/source-manager'

class QueryCache {
private cache: CacheManager
Expand Down
2 changes: 1 addition & 1 deletion apps/server/src/lib/server/sourceManager.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import SourceManager from '@latitude-data/source-manager'
import { SourceManager } from '@latitude-data/source-manager'

export const QUERIES_DIR = 'static/.latitude/queries'
const sourceManager = new SourceManager(QUERIES_DIR)
Expand Down
7 changes: 5 additions & 2 deletions apps/server/src/routes/api/queries/[...query]/+server.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,18 @@
import handleError from '$lib/errors/handler'
import sourceManager from '$lib/server/sourceManager'
import findOrCompute from '$lib/query_service/find_or_compute'
import getQueryParams from './getQueryParams'

type Props = { params: { query?: string }; url: URL }

export async function GET({ params: args, url }: Props) {
const { query } = args
try {
const { params, force, download } = await getQueryParams(url)
const query = args.query ?? ''
const source = await sourceManager.loadFromQuery(query)
const { queryResult } = await findOrCompute({
query: query ?? '',
source,
query,
queryParams: params,
force,
})
Expand Down
Loading

0 comments on commit 95cd163

Please sign in to comment.