Skip to content

Commit

Permalink
Write a SQL query in Parquet file
Browse files Browse the repository at this point in the history
We want to allow users to cache queries in a more permanent way. We're
doing persistent into parquet files. In this PR we introduce the
functionality for writing parquet files to the Source Manager. We also
introduce the concept of batched queries in our connectors. This means
now they can pull all the data from the query in a way that don't
exhaust the memory of the running machine. This can happen with huge
queries
  • Loading branch information
andresgutgon committed May 24, 2024
1 parent aab4a4e commit 203e297
Show file tree
Hide file tree
Showing 39 changed files with 2,176 additions and 136 deletions.
7 changes: 7 additions & 0 deletions .changeset/proud-snakes-pull.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
"@latitude-data/postgresql-connector": minor
"@latitude-data/source-manager": minor
---

- Add the ability of running batched queries to PostgreSQL connector.
- Allow source manager to write the result of a query into a parquet file
14 changes: 14 additions & 0 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@ jobs:
matrix:
node-version: [18.x] # Specify node versions you want to test against

postgres:
image: postgres
env:
POSTGRES_USER: 'latitude'
POSTGRES_PASSWORD: 'secret'
POSTGRES_DB: postgresql_adapter_test
ports:
- '5436:5432'
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- name: Checkout repository
uses: actions/checkout@v3
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,5 @@ yarn-error.log*

sites/**/*
!sites/package.json

apps/server/scripts/dist/**/*
14 changes: 10 additions & 4 deletions apps/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,18 @@
"dev": "vite dev",
"build:server": "svelte-kit sync && vite build",
"build:shutdownScript": "mv build/index.js build/server.js && cp scripts/shutdownOnDemand.js build/index.js",
"build": "npm run build:server && npm run build:shutdownScript",
"build": "npm run build:scripts && npm run build:server && npm run build:shutdownScript",
"preview": "vite preview",
"prettier": "prettier --write src/**/*.ts src/**/*.svelte",
"test": "svelte-kit sync && vitest --run",
"test:watch": "vitest",
"lint": "eslint .",
"check": "svelte-kit sync && svelte-check --tsconfig ./tsconfig.json --output human-verbose",
"check:watch": "svelte-kit sync && svelte-check --tsconfig ./tsconfig.json --watch",
"query": "vite-node scripts/run_query/index.ts"
"query": "vite-node scripts/run_query/index.ts",
"materialize_queries:dev": "vite-node scripts/materialize_queries/index.ts",
"materialize_queries": "node scripts/dist/materialize_queries.js --debug",
"build:scripts": "rollup -c scripts.rollup.config.mjs"
},
"devDependencies": {
"@latitude-data/client": "workspace:*",
Expand All @@ -25,6 +28,7 @@
"@latitude-data/sql-compiler": "workspace:^",
"@latitude-data/sveltekit-autoimport": "^1.7.1",
"@latitude-data/test-connector": "workspace:^",
"@rollup/plugin-typescript": "^11.1.6",
"@rollup/pluginutils": "^5.1.0",
"@sveltejs/adapter-auto": "^3.0.0",
"@sveltejs/adapter-node": "^4.0.1",
Expand All @@ -38,11 +42,13 @@
"@types/uuid": "^9.0.8",
"autoprefixer": "^10.4.17",
"blessed": "^0.1.81",
"chokidar": "^3.5.3",
"jsdom": "^24.0.0",
"mock-fs": "^5.2.0",
"postcss": "^8.4.35",
"prettier": "^3.1.1",
"prettier-plugin-svelte": "^3.1.2",
"rollup": "^4.10.0",
"svelte": "^4.2.7",
"svelte-check": "^3.6.0",
"tailwindcss": "^3.4.1",
Expand All @@ -51,7 +57,6 @@
"uuid": "^9.0.1",
"vite": "^5.0.3",
"vite-node": "^1.3.1",
"chokidar": "^3.5.3",
"vitest": "^1.2.2"
},
"dependencies": {
Expand All @@ -62,6 +67,7 @@
"@latitude-data/source-manager": "workspace:^",
"@latitude-data/svelte": "workspace:*",
"cheerio": "1.0.0-rc.12",
"lodash-es": "^4.17.21"
"lodash-es": "^4.17.21",
"ora": "^8.0.1"
}
}
18 changes: 18 additions & 0 deletions apps/server/scripts.rollup.config.mjs
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
import typescript from '@rollup/plugin-typescript'

/**
* @typedef {import('rollup').RollupOptions} RollupOptions
* @type {RollupOptions}
*/
export default {
input: 'scripts/materialize_queries/index.ts',
output: {
file: 'scripts/dist/materialize_queries.js',
format: 'esm',
sourcemap: true,
},
plugins: [
typescript({ tsconfig: './tsconfig.scripts.json', sourceMap: true }),
],
external: ['fs', 'ora', '@latitude-data/source-manager'],
}
82 changes: 82 additions & 0 deletions apps/server/scripts/materialize_queries/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
import fs from 'fs'
import { DiskDriver, StorageDriver } from '@latitude-data/source-manager'
import sourceManager from '../../src/lib/server/sourceManager'
import ora from 'ora'

type CommandArgs = {
debug: boolean
}

function getArgs(): CommandArgs {
const args = process.argv.slice(2)
const debug = args[0] === 'debug'

return { debug }
}

function ensureMaterializeDirExists(storage: StorageDriver) {
if (!(storage instanceof DiskDriver)) return

const basePath = storage.basePath
if (!fs.existsSync(basePath)) {
fs.mkdirSync(basePath, { recursive: true })
}
}

const BATCH_SIZE = 4096
async function materializeQueries(debug = false) {
let startTime = 0
if (debug) {
startTime = performance.now()
}
const spinner = ora().start()
const defaultArgs = debug
? {
onDebug: ({ memoryUsageInMb }: { memoryUsageInMb: string }) => {
spinner.text = `Memory: ${memoryUsageInMb}`
},
}
: {}
try {
// TODO: This is faked. Do the logic to get only materializable queries
const allQueries = ['postgresql/query']
const storage = sourceManager.materializeStorage

ensureMaterializeDirExists(storage)

for (const [index, query] of allQueries.entries()) {
const status = `${index + 1} of ${allQueries.length} ${query}`
if (debug) {
console.log(status)
} else {
spinner.text = status
}
const url = await storage.writeParquet({
...defaultArgs,
queryPath: query,
params: {},
batchSize: BATCH_SIZE,
})

if (debug) {
console.table({ query, batchSize: BATCH_SIZE, url })
const endTime = performance.now()
const min = Math.floor((endTime - startTime) / 60000)
const seconds = ((endTime - startTime) % 60000) / 1000
console.log(`Time: ${min}:${seconds} minutes`)
}
}

spinner.stop()
console.log('\nMaterialization complete 🎉')
process.exit(0)
} catch (e) {
spinner.fail('Error materializing')
console.error(e)
process.exit(1)
}
}

const args = getArgs()

materializeQueries(args.debug)
25 changes: 10 additions & 15 deletions apps/server/src/lib/server/sourceManager.test.ts
Original file line number Diff line number Diff line change
@@ -1,11 +1,6 @@
import mockFs from 'mock-fs'
import { vi, describe, it, expect } from 'vitest'
import {
SourceManager,
STORAGE_TYPES,
type StorageType,
buildStorageDriver,
} from '@latitude-data/source-manager'
import { SourceManager, DiskDriver } from '@latitude-data/source-manager'
import { BASE_STATIC_PATH, MATERIALIZE_DIR, QUERIES_DIR } from '$lib/constants'
import { buildSourceManager } from '$lib/server/sourceManager'

Expand Down Expand Up @@ -34,10 +29,10 @@ describe('buildSourceManager', () => {

buildSourceManager()
expect(SourceManager).toHaveBeenCalledWith(QUERIES_DIR, {
materializeStorage: buildStorageDriver({
type: STORAGE_TYPES.disk as StorageType,
materialize: {
Klass: DiskDriver,
config: { path: MATERIALIZE_DIR },
}),
},
})
})

Expand All @@ -57,10 +52,10 @@ describe('buildSourceManager', () => {

buildSourceManager()
expect(SourceManager).toHaveBeenCalledWith(QUERIES_DIR, {
materializeStorage: buildStorageDriver({
type: STORAGE_TYPES.disk as StorageType,
materialize: {
Klass: DiskDriver,
config: { path: MATERIALIZE_DIR },
}),
},
})
})

Expand All @@ -72,10 +67,10 @@ describe('buildSourceManager', () => {
})
buildSourceManager()
expect(SourceManager).toHaveBeenCalledWith(QUERIES_DIR, {
materializeStorage: buildStorageDriver({
type: STORAGE_TYPES.disk as StorageType,
materialize: {
Klass: DiskDriver,
config: { path: MATERIALIZE_DIR },
}),
},
})
})
})
30 changes: 17 additions & 13 deletions apps/server/src/lib/server/sourceManager.ts
Original file line number Diff line number Diff line change
@@ -1,30 +1,33 @@
import fs from 'fs'
import { APP_CONFIG_PATH, MATERIALIZE_DIR, QUERIES_DIR } from '$lib/constants'
import { APP_CONFIG_PATH, MATERIALIZE_DIR, QUERIES_DIR } from '../constants'
import {
SourceManager,
StorageConfig,
STORAGE_TYPES,
buildStorageDriver,
type StorageType,
StorageType,
getDriverKlass,
StorageConfig,
} from '@latitude-data/source-manager'

const DEFAULT_STORAGE_CONFIG = {
type: STORAGE_TYPES.disk as StorageType,
config: { path: MATERIALIZE_DIR },
}
} as StorageConfig<StorageType>

function loadStorageConfig(): StorageConfig<StorageType> {
function loadStorage(): StorageConfig<StorageType> {
if (!fs.existsSync(APP_CONFIG_PATH)) {
return DEFAULT_STORAGE_CONFIG
}

const file = fs.readFileSync(APP_CONFIG_PATH, 'utf8')
try {
const config = JSON.parse(file)
// We set the `path` to the default materialize directory
if (config.type == STORAGE_TYPES.disk) return DEFAULT_STORAGE_CONFIG
const materialize = config?.materializeStorage ?? {}

if (!materialize.type) return DEFAULT_STORAGE_CONFIG
const isDisk = config?.materializeStorage?.type == STORAGE_TYPES.disk
if (isDisk && !materialize.config) return DEFAULT_STORAGE_CONFIG

return config
return materialize as StorageConfig<StorageType>
} catch (e) {
return DEFAULT_STORAGE_CONFIG
}
Expand All @@ -36,10 +39,11 @@ function loadStorageConfig(): StorageConfig<StorageType> {
* We configure materialize storage driver based on the config.
*/
export function buildSourceManager() {
const config = loadStorageConfig()
const storageDriver = buildStorageDriver(config)
const driver = storageDriver || buildStorageDriver(DEFAULT_STORAGE_CONFIG)
return new SourceManager(QUERIES_DIR, { materializeStorage: driver })
const storage = loadStorage()
const driverKlass = getDriverKlass({ type: storage.type })
return new SourceManager(QUERIES_DIR, {
materialize: { Klass: driverKlass, config: storage.config },
})
}

const sourceManager = buildSourceManager()
Expand Down
17 changes: 17 additions & 0 deletions apps/server/tsconfig.scripts.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
{
"extends": "./tsconfig.json",
"compilerOptions": {
"module": "esnext",
"esModuleInterop": true,
"target": "ES2018",
"moduleResolution": "node",
"outDir": "./scripts/dist",
"rootDir": ".",
"baseUrl": "..",
"paths": {
"@/*": ["./packages/source_manager/src/*"]
}
},
"include": ["./scripts/**/*.ts"],
"exclude": ["node_modules", "**/*.svelte"]
}
5 changes: 3 additions & 2 deletions packages/connectors/databricks/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import {
ConnectorOptions,
} from '@latitude-data/source-manager'
import QueryResult, { DataType, Field } from '@latitude-data/query_result'
import { DBSQLClient } from '@databricks/sql'
import { DBSQLClient, DBSQLParameter } from '@databricks/sql'
import { ConnectionOptions } from '@databricks/sql/dist/contracts/IDBSQLClient'
import { TTypeDesc } from '@databricks/sql/thrift/TCLIService_types'

Expand Down Expand Up @@ -83,8 +83,9 @@ export default class DatabricksConnector extends BaseConnector<ConnectionParams>
const queryOperation = await session.executeStatement(compiledQuery.sql, {
namedParameters: compiledQuery.resolvedParams.reduce(
(acc, param, index) => {
return { ...acc, [`var_${index}`]: param.value }
return { ...acc, [`var_${index}`]: param.value as DBSQLParameter }
},
{} as Record<string, DBSQLParameter>,
),
})
const result = await queryOperation.fetchAll()
Expand Down
2 changes: 1 addition & 1 deletion packages/connectors/materialized/src/index.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ describe('materializedRef function', async () => {
},
})
expect(compiled.sql).toBe(
`SELECT * FROM (read_parquet('${MATERIALIZE_QUERIES_DIR}/c669ba7574cadcfd9527e449feeb6a3fe8c23e23d0fef0893d3011c85ac88624.parquet'))`,
`SELECT * FROM read_parquet('${MATERIALIZE_QUERIES_DIR}/c669ba7574cadcfd9527e449feeb6a3fe8c23e23d0fef0893d3011c85ac88624.parquet')`,
)
})

Expand Down
4 changes: 2 additions & 2 deletions packages/connectors/materialized/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -76,11 +76,11 @@ export default class MaterializedConnector extends DuckdbConnector {
const storage = await this.source.manager.materializeStorage
const materializeUrl = await storage.getUrl({
sql: compiledSubQuery.sql,
queryPath: refSource.path,
sourcePath: refSource.path,
queryName: `materializedRef('${referencedQuery}')`,
})

return `(read_parquet('${materializeUrl}'))`
return `read_parquet('${materializeUrl}')`
},
}
}
Expand Down
5 changes: 4 additions & 1 deletion packages/connectors/materialized/src/tests/helpers.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,10 @@ export async function buildMaterializedConnector({
const connParams = connectionParams ?? {}
const sourceSchema = sourceParams.schema ?? {}
const sourceManager = new SourceManager(queriesDir ?? QUERIES_DIR, {
materializeStorage: new DiskDriver({ path: MATERIALIZE_QUERIES_DIR }),
materialize: {
Klass: DiskDriver,
config: { path: MATERIALIZE_QUERIES_DIR },
},
})
const source = new Source({
path: sourceParams.path,
Expand Down
12 changes: 12 additions & 0 deletions packages/connectors/postgresql/devDb/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
version: '3.1'
services:
db:
image: postgres
restart: always
environment:
POSTGRES_USER: 'latitude'
POSTGRES_PASSWORD: 'secret'
ports:
- '5436:5432'
volumes:
- ./init-db.sh:/docker-entrypoint-initdb.d/init-db.sh
Loading

0 comments on commit 203e297

Please sign in to comment.