Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
53 commits
Select commit Hold shift + click to select a range
843315e
initialize
chikamura Mar 18, 2024
ceb6964
fix document
chikamura Mar 24, 2024
947092c
fix masked connection pwd
chikamura Mar 24, 2024
5e3f024
refactoring
chikamura Mar 24, 2024
c9957c9
fix readme
chikamura Mar 24, 2024
6db7ebf
fix comment
chikamura Mar 24, 2024
bed7f75
add comment
chikamura Mar 25, 2024
eba331b
fix buildColumnType
chikamura Mar 25, 2024
d236d27
refactoring
chikamura Mar 25, 2024
47352c4
merge
chikamura Mar 25, 2024
39c7048
refactoring merge
chikamura Mar 25, 2024
e86c548
fix comment
chikamura Mar 25, 2024
e9d5dea
refactoring
chikamura Mar 25, 2024
8b5b4d3
fix null handling
chikamura Mar 25, 2024
94da2aa
fix example
chikamura Mar 31, 2024
c2d5ed0
refactoring
chikamura Apr 1, 2024
b52ad28
add test
chikamura Apr 1, 2024
7b85649
fix test
chikamura Apr 1, 2024
13f04ba
fix test
chikamura Apr 1, 2024
5098b4b
fix versioning
chikamura Apr 1, 2024
9af0b15
fix test
chikamura Apr 1, 2024
4404a24
fix test
chikamura Apr 1, 2024
7ddefa3
add test
chikamura Apr 1, 2024
ab0ce54
add staging_volume_name_prefix option
chikamura Apr 2, 2024
a119e1a
refactoring
chikamura Apr 2, 2024
c551124
add test
chikamura Apr 2, 2024
0e499b8
refactoring
chikamura Apr 6, 2024
af0b18e
fix test
chikamura Apr 6, 2024
84b453f
add test
chikamura Apr 6, 2024
285c84f
add test
chikamura Apr 6, 2024
082a09b
add test
chikamura Apr 6, 2024
8e8bbc8
fix document
chikamura Apr 6, 2024
613736e
fix build.gradle
chikamura Apr 6, 2024
be26044
refactoring
chikamura Apr 7, 2024
21e1fef
add merge rule test
chikamura Apr 7, 2024
95738e5
refactoring
chikamura Apr 7, 2024
83a17bf
add sql test
chikamura Apr 7, 2024
0af3500
fix typo
chikamura Apr 7, 2024
976df02
fix document
chikamura Apr 8, 2024
54bfd3b
fix document
chikamura Apr 16, 2024
6663b9f
fix document
chikamura May 5, 2024
e645aa5
add timestamp test
chikamura May 5, 2024
08b1943
add note to readme
chikamura May 7, 2024
78d17c1
add test
chikamura May 12, 2024
5a609bc
update github workflow
chikamura May 12, 2024
317fc7e
fix comment
chikamura May 20, 2024
d723346
test refactroring. merge into TestDatabricksOutputPluginByOutputType.…
chikamura Jun 2, 2024
0264a26
add timestamp_ntz warning comment
chikamura Jun 3, 2024
10bff5a
add test
chikamura Jun 3, 2024
64f4b4c
document fix
chikamura Jun 3, 2024
b77b2dc
add test
chikamura Jun 3, 2024
e1f1c5b
Update .github/workflows/main.yml
chikamura Jun 11, 2024
f72bc18
add new line to .gitignore
chikamura Jun 11, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 57 additions & 0 deletions .github/workflows/main.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
name: main

on:
push:
branches:
- 'main'
tags:
- '*'
pull_request:
branches:
- 'main'
types: [opened, synchronize]
pull_request_target:
branches:
- 'main'
types: [labeled]

jobs:
test:
name: test
runs-on: ubuntu-latest
if: >
${{
github.event_name == 'pull_request' ||
(github.event_name == 'pull_request_target' && contains(github.event.pull_request.labels.*.name, 'safe to test')) ||
startsWith(github.ref, 'refs/tags/')
}}
steps:
- uses: actions/checkout@v2
- name: Set up JDK 1.8
uses: actions/setup-java@v1
with:
java-version: 1.8
- name: lint
run: ./gradlew spotlessCheck
- name: Test
run: ./gradlew test
build:
name: Build + Publish
runs-on: ubuntu-latest
permissions:
packages: write
contents: read
needs: [ test ]
if: ${{ github.event_name == 'workflow_dispatch' || contains(github.ref, 'refs/tags/') }}
steps:
- uses: actions/checkout@v2
- name: Set up Ruby 2.7
uses: ruby/setup-ruby@v1
with:
ruby-version: 2.7
- name: push gem
uses: trocco-io/push-gem-to-gpr-action@v1
with:
language: java
gem-path: "./build/gems/*.gem"
github-token: "${{ secrets.GITHUB_TOKEN }}"
17 changes: 17 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
*~
/pkg/
/tmp/
*.gemspec
.gradle/
/classpath/
build/
.idea
/.settings/
/.metadata/
.classpath
.project
config.yml
config.yaml
default_jdbc_driver/
/bin/
example/test.yml
Empty file added CHANGELOG.md
Empty file.
21 changes: 21 additions & 0 deletions LICENSE.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@

MIT License

Permission is hereby granted, free of charge, to any person obtaining
a copy of this software and associated documentation files (the
"Software"), to deal in the Software without restriction, including
without limitation the rights to use, copy, modify, merge, publish,
distribute, sublicense, and/or sell copies of the Software, and to
permit persons to whom the Software is furnished to do so, subject to
the following conditions:

The above copyright notice and this permission notice shall be
included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND,
EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF
MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND
NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE
LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION
OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION
WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.
78 changes: 77 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1 +1,77 @@
# embulk-output-databricks
# Databricks output plugin for Embulk

Databricks output plugin for Embulk loads records to Databricks Delta Table.

## Overview

* **Plugin type**: output
* **Load all or nothing**: depends on the mode. see below.
* **Resume supported**: depends on the mode. see below.

## Configuration

- **driver_path**: path to the jar file of the JDBC driver. If not set, [the bundled JDBC driver](https://docs.databricks.com/en/integrations/jdbc/index.html) will be used. (string, optional)
- **server_hostname**: The Databricks compute resource’s Server Hostname value, see [Compute settings for the Databricks JDBC Driver](https://docs.databricks.com/en/integrations/jdbc/compute.html). (string, required)
- **http_path**: The Databricks compute resource’s HTTP Path value, see [Compute settings for the Databricks JDBC Driver](https://docs.databricks.com/en/integrations/jdbc/compute.html). (string, required)
- **personal_access_token**: The Databaricks personal_access_token, see [Authentication settings for the Databricks JDBC Driver](https://docs.databricks.com/en/integrations/jdbc/authentication.html#authentication-pat). (string, required)
- **catalog_name**: destination catalog name (string, required)
- **schema_name**: destination schema name (string, required)
- **table**: destination table name (string, required)
- **staging_volume_name_prefix**: temporarily created managed volume prefix (string, default: "embulk_output_databricks_")
- **delete_stage**: whether to delete a temporarily created managed volume after running embulk. (boolean, default: false)
- **delete_stage_on_error**: if delete_stage_on_error is false and delete_stage is true, do not delete temporarily created volumes in case of error. (boolean, default: false)
- **retry_limit**: max retry count for database operations (integer, default: 12). When intermediate table to create already created by another process, this plugin will retry with another table name to avoid collision.
- **retry_wait**: initial retry wait time in milliseconds (integer, default: 1000 (1 second))
- **max_retry_wait**: upper limit of retry wait, which will be doubled at every retry (integer, default: 1800000 (30 minutes))
- **mode**: "insert", "insert_direct", "truncate_insert", "replace" or "merge". See below. (string, required)
- **merge_keys**: key column names for merging records in merge mode (string array, required in merge mode if table doesn't have primary key)
- **merge_rule**: list of column assignments for updating existing records used in merge mode, for example `"foo" = T."foo" + S."foo"` (`T` means target table and `S` means source table). (string array, default: always overwrites with new values)
- **batch_size**: size of a single batch insert (integer, default: 16777216)
- **default_timezone**: If input column type (embulk type) is timestamp, this plugin needs to format the timestamp into a SQL string. This default_timezone option is used to control the timezone. You can overwrite timezone for each columns using column_options option. (string, default: `UTC`)
- **column_options**: advanced: a key-value pairs where key is a column name and value is options for the column.
- **type**: type of a column when this plugin creates new tables (e.g. `STRING`, `INTEGER NOT NULL UNIQUE`). This used when this plugin creates intermediate tables (insert, truncate_insert and merge modes), when it creates the target table (insert_direct and replace modes), and when it creates nonexistent target table automatically. (string, default: depends on input column type. `BIGINT` if input column type is long, `BOOLEAN` if boolean, `DOUBLE` if double, `STRING` if string, `TIMESTAMP` if timestamp, `STRING` if json, see here for [available types](https://docs.databricks.com/en/sql/language-manual/sql-ref-datatypes.html) except binary, array, map, struct, timestamp_ntz and interval. )
- **value_type**: This plugin converts input column type (embulk type) into a database type to build a TSV to put TSV to internal storage. This value_type option controls the type of the value in a TSV. (string, default: depends on the sql type of the column. Available values options are: `byte`, `short`, `int`, `long`, `double`, `float`, `boolean`, `string`, `nstring`, `date`, `time`, `timestamp`, `decimal`, `json`, `null`, `pass`)
- **timestamp_format**: If input column type (embulk type) is timestamp and value_type is `string` or `nstring`, this plugin needs to format the timestamp value into a string. This timestamp_format option is used to control the format of the timestamp. (string, default: `%Y-%m-%d %H:%M:%S.%6N`)
- **timezone**: If input column type (embulk type) is timestamp, this plugin needs to format the timestamp value into a SQL string. In this cases, this timezone option is used to control the timezone. (string, value of default_timezone option is used by default)
- **before_load**: if set, this SQL will be executed before loading all records. In truncate_insert mode, the SQL will be executed after truncating. replace mode doesn't support this option.
- **after_load**: if set, this SQL will be executed after loading all records.

### Modes

* **insert**:
* Behavior: This mode writes rows to some intermediate tables first. If all those tasks run correctly, runs `INSERT INTO <target_table> SELECT * FROM <intermediate_table_1> UNION ALL SELECT * FROM <intermediate_table_2> UNION ALL ...` query. If the target table doesn't exist, it is created automatically.
* Transactional: Yes. This mode successfully writes all rows, or fails with writing zero rows.
* Resumable: No.
* **insert_direct**:
* Behavior: This mode inserts rows to the target table directly. If the target table doesn't exist, it is created automatically.
* Transactional: No. If fails, the target table could have some rows inserted.
* Resumable: No.
* **truncate_insert**:
* Behavior: Same with `insert` mode excepting that it truncates the target table right before the last `INSERT ...` query.
* Transactional: Yes.
* Resumable: No.
* **replace**:
* Behavior: This mode writes rows to an intermediate table first. If all those tasks run correctly, drops the target table and alters the name of the intermediate table into the target table name.
* Transactional: Yes.
* Resumable: No.
* **merge**:
* Behavior: This mode writes rows to some intermediate tables first. If all those tasks run correctly, runs MERGE INTO ... WHEN MATCHED THEN UPDATE ... WHEN NOT MATCHED THEN INSERT ... query. Namely, if merge keys of a record in the intermediate tables already exist in the target table, the target record is updated by the intermediate record, otherwise the intermediate record is inserted. If the target table doesn't exist, it is created automatically.
* Transactional: Yes.
* Resumable: No.

## Note

This plugin does not support TIMESTAMP_NTZ、INTERVAL types, if target tables contain these types, embulk will raise a runtime error.
(Because The official Databricks JDBC driver does not support [TIMESTAMP_NTZ](https://docs.databricks.com/en/sql/language-manual/data-types/timestamp-ntz-type.html#notes)、[INTERVAL](https://docs.databricks.com/en/sql/language-manual/data-types/interval-type.html) types].)

This plugin converts empty string input to null output. If you want to empty string output, you can use continuous double quote string ("").

## Build

```
$ ./gradlew gem # -t to watch change of files and rebuild continuously
```

## TEST

$ EMBULK_OUTPUT_DATABRICKS_TEST_CONFIG="example/test.yml" ./gradlew test # Create example/test.yml based on example/test.yml.example
108 changes: 108 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
plugins {
id "java"
id "maven-publish"
id "org.embulk.embulk-plugins" version "0.4.2"
id "com.palantir.git-version" version "0.12.3"
id "com.diffplug.spotless" version "5.15.0"
id "com.adarshr.test-logger" version "3.0.0"
}
repositories {
mavenCentral()
}

group = "io.trocco"
description = "Dumps records to Databricks."

sourceCompatibility = 1.8
targetCompatibility = 1.8

version = {
def vd = versionDetails()
if (vd.commitDistance == 0 && vd.lastTag ==~ /^v[0-9]+\.[0-9]+\.[0-9]+(\.[a-zA-Z0-9]+)?/) {
vd.lastTag.substring(1)
} else {
"0.0.0.${vd.gitHash}"
}
}()

dependencies {
compileOnly "org.embulk:embulk-api:0.10.31"
compileOnly "org.embulk:embulk-spi:0.10.31"

compile("org.embulk:embulk-util-config:0.3.0") {
// They conflict with embulk-core. They are once excluded here,
// and added explicitly with versions exactly the same with embulk-core:0.10.19.
exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations"
exclude group: "com.fasterxml.jackson.core", module: "jackson-core"
exclude group: "com.fasterxml.jackson.core", module: "jackson-databind"
exclude group: "com.fasterxml.jackson.datatype", module: "jackson-datatype-jdk8"
exclude group: "javax.validation", module: "validation-api"
}

// They are once excluded from transitive dependencies of other dependencies,
// and added explicitly with versions exactly the same with embulk-core:0.10.19.
compile "com.fasterxml.jackson.core:jackson-annotations:2.6.7"
compile "com.fasterxml.jackson.core:jackson-core:2.6.7"
compile "com.fasterxml.jackson.core:jackson-databind:2.6.7"
compile "com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7"
compile "javax.validation:validation-api:1.1.0.Final"

compile("org.embulk:embulk-util-json:0.1.0") {
exclude group: "org.msgpack", module: "msgpack-core" // Included in embulk-api.
}
testImplementation "junit:junit:4.+"
testImplementation "org.embulk:embulk-junit4:0.10.31"
testImplementation "org.embulk:embulk-core:0.10.31"
testImplementation "org.embulk:embulk-core:0.10.31:tests"
testImplementation "org.embulk:embulk-deps:0.10.31"
testImplementation "org.embulk:embulk-input-file:0.10.31"
testImplementation "org.embulk:embulk-parser-csv:0.10.31"

compile "org.embulk:embulk-output-jdbc:0.10.5"
compile "org.embulk:embulk-output-postgresql:0.10.5"
compile 'com.databricks:databricks-jdbc:2.6.36'
compile("com.databricks:databricks-sdk-java:0.20.0") {
exclude group: "org.slf4j", module: "slf4j-api"
exclude group: "com.fasterxml.jackson.core", module: "jackson-annotations"
exclude group: "com.fasterxml.jackson.core", module: "jackson-core"
exclude group: "com.fasterxml.jackson.core", module: "jackson-databind"
exclude group: "com.fasterxml.jackson.datatype", module: "jackson-datatype-jdk8"
exclude group: "javax.validation", module: "validation-api"
}
}
embulkPlugin {
mainClass = "org.embulk.output.DatabricksOutputPlugin"
category = "output"
type = "databricks"
}
// This Gradle plugin's POM dependency modification works for "maven-publish" tasks.
//
// Note that "uploadArchives" is no longer supported. It is deprecated in Gradle 6 to be removed in Gradle 7.
// https://github.com/gradle/gradle/issues/3003#issuecomment-495025844
publishing {
publications {
embulkPluginMaven(MavenPublication) { // Publish it with "publishEmbulkPluginMavenPublicationToMavenRepository".
from components.java // Must be "components.java". The dependency modification works only for it.
}
}
repositories {
maven {
url = "${project.buildDir}/mavenPublishLocal"
}
}
}
gem {
from("LICENSE.txt")
authors = [ "" ]
email = [ "" ]
summary = "Databricks output plugin for Embulk"
homepage = "https://github.com/trocco-io/embulk-output-databricks"
licenses = [ "MIT" ]
}
spotless {
java {
importOrder()
removeUnusedImports()
googleJavaFormat()
}
}
27 changes: 27 additions & 0 deletions example/config.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@

in:
type: file
path_prefix: ./example/data.tsv
parser:
type: csv
delimiter: "\t"
skip_header_lines: 0
null_string: ""
columns:
- { name: id, type: long }
- { name: description, type: string }
- { name: name, type: string }
- { name: t, type: timestamp, format: "%Y-%m-%d %H:%M:%S %z"}
- { name: payload, type: json}
stop_on_invalid_record: true

out:
type: databricks
server_host_name: <YOUR_DATABRICKS_SERVER_HOSTNAME>
http_path: <YOUR_DATABRICKS_HTTP_PATH>
personal_access_token: <YOUR_DATABRICKS_PERSONAL_ACCESS_TOKEN>
catalog_name: <YOUR_DATABRICKS_CATALOG_NAME>
schema_name: <YOUR_DATABRICKS_SCHEMA_NAME>
table: <YOUR_DATABRICKS_TABLE>
mode: insert
default_timezone: UTC
5 changes: 5 additions & 0 deletions example/data.tsv
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
0 c20ef94602 c212c89f91 2017-10-24 03:54:35 +0900 {"a":0,"b":"99"}
1 330a9fc33a e25b33b616 2017-10-22 19:53:31 +0900 {"a":1,"b":"a9"}
2 707b3b7588 90823c6a1f 2017-10-23 23:42:43 +0900 {"a":2,"b":"96"}
3 8d8288e66f 2017-10-22 06:12:13 +0900 {"a":3,"b":"86"}
4 c54d8b6481 e56a40571c 2017-10-23 04:59:16 +0900 {"a":4,"b":"d2"}
9 changes: 9 additions & 0 deletions example/test.yml.example
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
# The catalog_name and schema_name must be created in advance.

server_hostname:
http_path:
personal_access_token:
catalog_name:
schema_name:
table_prefix:
staging_volume_name_prefix:
39 changes: 39 additions & 0 deletions gradle/dependency-locks/embulkPluginRuntime.lockfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
# This is a Gradle generated file for dependency locking.
# Manual edits can break the build and are not advised.
# This file is expected to be part of source control.
com.databricks:databricks-jdbc:2.6.36
com.databricks:databricks-sdk-java:0.20.0
com.fasterxml.jackson.core:jackson-annotations:2.6.7
com.fasterxml.jackson.core:jackson-core:2.6.7
com.fasterxml.jackson.core:jackson-databind:2.6.7.5
com.fasterxml.jackson.datatype:jackson-datatype-jdk8:2.6.7
com.google.auth:google-auth-library-credentials:1.20.0
com.google.auth:google-auth-library-oauth2-http:1.20.0
com.google.auto.value:auto-value-annotations:1.10.4
com.google.code.findbugs:jsr305:3.0.2
com.google.code.gson:gson:2.10.1
com.google.errorprone:error_prone_annotations:2.18.0
com.google.guava:failureaccess:1.0.1
com.google.guava:guava:32.0.0-android
com.google.guava:listenablefuture:9999.0-empty-to-avoid-conflict-with-guava
com.google.http-client:google-http-client-gson:1.43.3
com.google.http-client:google-http-client:1.43.3
com.google.j2objc:j2objc-annotations:2.8
commons-codec:commons-codec:1.11
commons-io:commons-io:2.13.0
commons-logging:commons-logging:1.2
io.grpc:grpc-context:1.27.2
io.opencensus:opencensus-api:0.31.1
io.opencensus:opencensus-contrib-http-util:0.31.1
javax.validation:validation-api:1.1.0.Final
org.apache.httpcomponents:httpclient:4.5.14
org.apache.httpcomponents:httpcore:4.4.16
org.checkerframework:checker-qual:3.33.0
org.embulk:embulk-output-jdbc:0.10.5
org.embulk:embulk-output-postgresql:0.10.5
org.embulk:embulk-util-config:0.3.4
org.embulk:embulk-util-json:0.3.0
org.embulk:embulk-util-retryhelper:0.8.2
org.embulk:embulk-util-rubytime:0.3.3
org.embulk:embulk-util-timestamp:0.2.2
org.ini4j:ini4j:0.5.4
Binary file added gradle/wrapper/gradle-wrapper.jar
Binary file not shown.
5 changes: 5 additions & 0 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.9.1-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
Loading