Skip to content

Commit

Permalink
Add Azure support
Browse files Browse the repository at this point in the history
In this commit, we introduce necessary changes and assets to make it possible to run RDB Loader with Azure services. These are the changes:

* Introduce new transformer-kafka asset that will be able to read events from Kafka topic and writes transformed events to Azure Blob Storage
* Make necessary changes on the Loader module to read shredding complete messages from Kafka module. Also, loader needs to interact with blob storage for folder monitoring feature. We've made necessary changes on the Loader module to make it possible to interact with Azure Blob Storage as well.
  • Loading branch information
pondzix authored and spenes committed Jul 19, 2023
1 parent 07a2923 commit 82bedf6
Show file tree
Hide file tree
Showing 54 changed files with 2,623 additions and 238 deletions.
5 changes: 4 additions & 1 deletion .github/workflows/ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@ jobs:
- databricksLoader
- transformerKinesis
- transformerPubsub
- transformerKafka
steps:
- name: Checkout Github
uses: actions/checkout@v2
Expand Down Expand Up @@ -164,7 +165,8 @@ jobs:
'project databricksLoader; assembly' \
'project transformerBatch; assembly' \
'project transformerKinesis; assembly' \
'project transformerPubsub; assembly'
'project transformerPubsub; assembly' \
'project transformerKafka; assembly'
- name: Get current version
id: ver
run: echo "::set-output name=project_version::${GITHUB_REF#refs/tags/}"
Expand All @@ -182,5 +184,6 @@ jobs:
modules/transformer-batch/target/scala-2.12/snowplow-transformer-batch-${{ steps.ver.outputs.project_version }}.jar
modules/transformer-kinesis/target/scala-2.12/snowplow-transformer-kinesis-${{ steps.ver.outputs.project_version }}.jar
modules/transformer-pubsub/target/scala-2.12/snowplow-transformer-pubsub-${{ steps.ver.outputs.project_version }}.jar
modules/transformer-kafka/target/scala-2.12/snowplow-transformer-kafka-${{ steps.ver.outputs.project_version }}.jar
env:
GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }}
35 changes: 34 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ lazy val root = project.in(file("."))
.aggregate(
aws,
gcp,
azure,
common,
commonTransformerStream,
loader,
Expand All @@ -38,6 +39,7 @@ lazy val root = project.in(file("."))
transformerBatch,
transformerKinesis,
transformerPubsub,
transformerKafka
)

lazy val common: Project = project
Expand All @@ -60,6 +62,13 @@ lazy val gcp: Project = project
.dependsOn(common % "compile->compile;test->test")
.enablePlugins(BuildInfoPlugin)

lazy val azure: Project = project
.in(file("modules/azure"))
.settings(BuildSettings.azureBuildSettings)
.settings(libraryDependencies ++= Dependencies.azureDependencies)
.dependsOn(common % "compile->compile;test->test")
.enablePlugins(BuildInfoPlugin)

lazy val commonTransformerStream = project
.in(file("modules/common-transformer-stream"))
.settings(BuildSettings.commonStreamTransformerBuildSettings)
Expand All @@ -74,7 +83,11 @@ lazy val loader = project
.settings(BuildSettings.loaderBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.loaderDependencies)
.dependsOn(aws % "compile->compile;test->test;runtime->runtime", gcp % "compile->compile;test->test")
.dependsOn(
aws % "compile->compile;test->test;runtime->runtime",
gcp % "compile->compile;test->test",
azure % "compile->compile;test->test"
)

lazy val redshiftLoader = project
.in(file("modules/redshift-loader"))
Expand Down Expand Up @@ -169,3 +182,23 @@ lazy val transformerPubsubDistroless = project
.settings(excludeDependencies ++= Dependencies.commonStreamTransformerExclusions)
.dependsOn(commonTransformerStream % "compile->compile;test->test;runtime->runtime", gcp % "compile->compile;test->test")
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)

lazy val transformerKafka = project
.in(file("modules/transformer-kafka"))
.settings(BuildSettings.transformerKafkaBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.transformerKafkaDependencies)
.settings(excludeDependencies ++= Dependencies.commonStreamTransformerExclusions)
.dependsOn(commonTransformerStream % "compile->compile;test->test;runtime->runtime", azure % "compile->compile;test->test;runtime->runtime")
.enablePlugins(JavaAppPackaging, SnowplowDockerPlugin, BuildInfoPlugin)


lazy val transformerKafkaDistroless = project
.in(file("modules/distroless/transformer-kafka"))
.settings(sourceDirectory := (transformerKafka / sourceDirectory).value)
.settings(BuildSettings.transformerKafkaBuildSettings)
.settings(addCompilerPlugin(Dependencies.betterMonadicFor))
.settings(libraryDependencies ++= Dependencies.transformerKafkaDependencies)
.settings(excludeDependencies ++= Dependencies.commonStreamTransformerExclusions)
.dependsOn(commonTransformerStream % "compile->compile;test->test;runtime->runtime", azure % "compile->compile;test->test")
.enablePlugins(JavaAppPackaging, SnowplowDistrolessDockerPlugin, BuildInfoPlugin)
15 changes: 15 additions & 0 deletions config/loader/azure/databricks.config.minimal.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
{
"blobStorageEndpoint": "https://accountName.blob.core.windows.net/container-name"
"messageQueue": {
"type": "kafka"
"bootstrapServers": "localhost:9092"
"topicName": "loaderTopic"
},
"storage" : {
"host": "abc.cloud.databricks.com"
"password": "Supersecret1"
"schema": "atomic",
"port": 443,
"httpPath": "/databricks/http/path",
}
}
260 changes: 260 additions & 0 deletions config/loader/azure/databricks.config.reference.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,260 @@
{

# Azure Blob Storage endpoint, should contain container with transformer's output
"blobStorageEndpoint": "https://accountName.blob.core.windows.net/container-name"

# Name of the Azure Key Vault where application secrets are stored.
# Required if secret store is used in storage.password field.
"azureVaultName": "azure-vault"

# Kafka topic used by Transformer and Loader to communicate
"messageQueue": {
"type": "kafka"
"bootstrapServers": "localhost:9092"
"topicName": "loaderTopic"
},

# Warehouse connection details
"storage" : {
# Hostname of Databricks cluster
"host": "abc.cloud.databricks.com",
# DB password
"password": {
# A password can be placed in Azure Key Vault or be a plain text
"secretStore": {
"parameterName": "snowplow.databricks.password"
}
},
# Optional. Override the Databricks default catalog, e.g. with a Unity catalog name.
"catalog": "hive_metastore",
# DB schema
"schema": "atomic",
# Database port
"port": 443,
# Http Path of Databricks cluster
"httpPath": "/databricks/http/path",
# User agent name for Databricks connection. Optional, default value "snowplow-rdbloader-oss"
"userAgent": "snowplow-rdbloader-oss"

# Optimize period per table, that will be used as predicate for the OPTIMIZE command.
"eventsOptimizePeriod": "2 days"

# Optional, default method is 'NoCreds'
# Specifies the auth method to use with 'COPY INTO' statement.
"loadAuthMethod": {
# With 'NoCreds', no credentials will be passed to 'COPY INTO' statement.
# Databricks cluster needs to have permission to access transformer
# output Azure Blob Storage container. More information can be found here:
# https://docs.databricks.com/storage/azure-storage.html
"type": "NoCreds"
}
#"loadAuthMethod": {
# # With 'TempCreds', temporary credentials will be created for every
# # load operation and these temporary credentials will be passed to
# # 'COPY INTO' statement. With this way, Databricks cluster doesn't need
# # permission to access to transformer output Azure Blob Storage container.
# # This access will be provided by temporary credentials.
# "type": "TempCreds"
#
# # If 'TempCreds' load auth method is used, this value will be used as a session duration
# # of temporary credentials used for loading data and folder monitoring.
# # Optional, default value "1 hour"
# "credentialsTtl": "1 hour"
#}
},

"schedules": {
# Periodic schedules to stop loading, e.g. for Databricks maintenance window
# Any amount of schedules is supported, but recommended to not overlap them
# The schedule works with machine's local timezone (and UTC is recommended)
"noOperation": [
{
# Human-readable name of the no-op window
"name": "Maintenance window",
# Cron expression with second granularity
"when": "0 0 12 * * ?",
# For how long the loader should be paused
"duration": "1 hour"
}
],
# Loader runs periodic OPTIMIZE statements to prevent growing number of files behind delta tables.
"optimizeEvents": "0 0 0 ? * *",
"optimizeManifest": "0 0 5 ? * *"
}

# Observability and reporting options
"monitoring": {
# Snowplow tracking (optional)
"snowplow": {
"appId": "databricks-loader",
"collector": "snplow.acme.com",
},

# An endpoint for alerts and infromational events
# Everything sent to snowplow collector (as properly formed self-describing events)
# will also be sent to the webhook as POST payloads with self-describing JSONs
"webhook": {
# An actual HTTP endpoint
"endpoint": "https://webhook.acme.com",
# Set of arbitrary key-value pairs attached to the payload
"tags": {
"pipeline": "production"
}
},

# Optional, for tracking runtime exceptions
"sentry": {
"dsn": "http://sentry.acme.com"
},

# Optional, configure how metrics are reported
"metrics": {
# Optional, send metrics to StatsD server
"statsd": {
"hostname": "localhost",
"port": 8125,
# Any key-value pairs to be tagged on every StatsD metric
"tags": {
"app": "rdb-loader"
}
# Optional, override the default metric prefix
# "prefix": "snowplow.rdbloader."
},

# Optional, print metrics on stdout (with slf4j)
"stdout": {
# Optional, override the default metric prefix
# "prefix": "snowplow.rdbloader."
}

# Optional, period for metrics emitted periodically
# Default value 5 minutes
# There is only one periodic metric at the moment.
# This metric is minimum_age_of_loaded_data.
# It specifies how old is the latest event in the warehouse.
"period": "5 minutes"
},

# Optional, configuration for periodic unloaded/corrupted folders checks
"folders": {
# Path where Loader could store auxiliary logs
# Loader should be able to write here, Databricks should be able to load from here
"staging": "https://accountName.blob.core.windows.net/staging/",
# How often to check
"period": "1 hour"
# Specifies since when folder monitoring will check
"since": "14 days"
# Specifies until when folder monitoring will check
"until": "7 days"
# Path to transformer archive (must be same as Transformer's `output.path`)
"transformerOutput": "https://accountName.blob.core.windows.net/transformed/"
# How many times the check can fail before generating an alarm instead of warning
"failBeforeAlarm": 3
},

# Periodic DB health-check, raising a warning if DB hasn't responded to `SELECT 1`
"healthCheck": {
# How often query a DB
"frequency": "20 minutes",
# How long to wait for a response
"timeout": "15 seconds"
}
},

# Immediate retries configuration
# Unlike retryQueue these retries happen immediately, without proceeding to another message
"retries": {
# Starting backoff period
"backoff": "30 seconds"
# A strategy to use when deciding on next backoff
"strategy": "EXPONENTIAL"
# How many attempts to make before sending the message into retry queue
# If missing - the loader will be retrying until cumulative bound
"attempts": 3,
# When backoff reaches this delay the Loader will stop retrying
# Missing cumulativeBound with missing attempts will force to retry inifintely
"cumulativeBound": "1 hour"
},

# Check the target destination to make sure it is ready.
# Retry the checking until target got ready and block the application in the meantime
"readyCheck": {
# Starting backoff period
"backoff": "15 seconds"
# A strategy to use when deciding on next backoff
"strategy": "CONSTANT"
# When backoff reaches this delay the Loader will stop retrying
"cumulativeBound": "10 minutes"
},

# Retries configuration for initilization block
# It will retry on all exceptions from there
"initRetries": {
# Starting backoff period
"backoff": "30 seconds"
# A strategy to use when deciding on next backoff
"strategy": "EXPONENTIAL"
# How many attempts to make before sending the message into retry queue
# If missing - the loader will be retrying until cumulative bound
"attempts": 3,
# When backoff reaches this delay the Loader will stop retrying
# Missing cumulativeBound with missing attempts will force to retry inifintely
"cumulativeBound": "1 hour"
},

# Additional backlog of recently failed folders that could be automatically retried
# Retry Queue saves a failed folder and then re-reads the info from shredding_complete S3 file
"retryQueue": {
# How often batch of failed folders should be pulled into a discovery queue
"period": "30 minutes",
# How many failures should be kept in memory
# After the limit is reached new failures are dropped
"size": 64,
# How many attempt to make for each folder
# After the limit is reached new failures are dropped
"maxAttempts": 3,
# Artificial pause after each failed folder being added to the queue
"interval": "5 seconds"
},

"timeouts": {
# How long loading (actual COPY statements) can take before considering Databricks unhealthy
# Without any progress (i.e. different subfolder) within this period, loader
# will abort the transaction
"loading": "45 minutes",

# How long non-loading steps (such as ALTER TABLE or metadata queries) can take
# before considering Databricks unhealthy
"nonLoading": "10 minutes"
}

# Optional. Configure telemetry
# All the fields are optional
"telemetry": {
# Set to true to disable telemetry
"disable": false
# Interval for the heartbeat event
"interval": 15 minutes
# HTTP method used to send the heartbeat event
"method": "POST"
# URI of the collector receiving the heartbeat event
"collectorUri": "collector-g.snowplowanalytics.com"
# Port of the collector receiving the heartbeat event
"collectorPort": 443
# Whether to use https or not
"secure": true
# Identifier intended to tie events together across modules,
# infrastructure and apps when used consistently
"userProvidedId": "my_pipeline"
# ID automatically generated upon running a modules deployment script
# Intended to identify each independent module, and the infrastructure it controls
"autoGeneratedId": "hfy67e5ydhtrd"
# Unique identifier for the VM instance
# Unique for each instance of the app running within a module
"instanceId": "665bhft5u6udjf"
# Name of the terraform module that deployed the app
"moduleName": "rdb-loader-ce"
# Version of the terraform module that deployed the app
"moduleVersion": "1.0.0"
}
}
21 changes: 21 additions & 0 deletions config/loader/azure/snowflake.config.minimal.hocon
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
{
"blobStorageEndpoint": "https://accountName.blob.core.windows.net/container-name"
"messageQueue": {
"type": "kafka"
"bootstrapServers": "localhost:9092"
"topicName": "loaderTopic"
},
"storage" : {
"type": "snowflake",

"snowflakeRegion": "us-west-2",
"username": "admin",
"password": "Supersecret1",
"account": "acme",
"warehouse": "wh",
"schema": "atomic",
"database": "snowplow",

"transformedStage": "snowplow_stage"
}
}
Loading

0 comments on commit 82bedf6

Please sign in to comment.