diff --git a/.github/ISSUE_TEMPLATE.md b/.github/ISSUE_TEMPLATE.md new file mode 100644 index 0000000..218d3f5 --- /dev/null +++ b/.github/ISSUE_TEMPLATE.md @@ -0,0 +1,23 @@ + + +**Version**: + +**Expected behavior**: + +**Actual behavior**: + +**Steps to reproduce**: + + + 1. + 2. + diff --git a/.github/PULL_REQUEST_TEMPLATE.md b/.github/PULL_REQUEST_TEMPLATE.md new file mode 100644 index 0000000..a6f8f34 --- /dev/null +++ b/.github/PULL_REQUEST_TEMPLATE.md @@ -0,0 +1,12 @@ + + diff --git a/.github/check_tag.sh b/.github/check_tag.sh new file mode 100755 index 0000000..a6146a0 --- /dev/null +++ b/.github/check_tag.sh @@ -0,0 +1,14 @@ +#!/bin/bash + +set -e + +tag=$1 + +project_version=$(sbt version -Dsbt.log.noformat=true | perl -ne 'print "$1\n" if /info.*(\d+\.\d+\.\d+[^\r\n]*)/' | tail -n 1 | tr -d '\n') + +if [[ "${tag}" = "${project_version}" ]]; then + echo "Tag version (${tag}) matches project version (${project_version}). Deploying!" +else + echo "Tag version (${tag}) doesn't match version in scala project (${project_version}). Aborting!" + exit 1 +fi \ No newline at end of file diff --git a/.github/schemas/me.chuwy/pg-test/jsonschema/1-0-0 b/.github/schemas/me.chuwy/pg-test/jsonschema/1-0-0 new file mode 100644 index 0000000..a918c2b --- /dev/null +++ b/.github/schemas/me.chuwy/pg-test/jsonschema/1-0-0 @@ -0,0 +1,22 @@ +{ + "self": { + "vendor": "me.chuwy", + "name": "pg-test", + "format": "jsonschema", + "version": "1-0-0" + }, + "properties": { + "requiredString": { "type": "string" }, + "requiredUnion": { "type": ["string", "boolean"] }, + "nested": { + "properties": { + "a": { "type": "number" }, + "b": {} + }, + "required": ["a"] + }, + "someArray": { "type": "array" }, + "id": { "type": "string", "format": "uuid" } + }, + "required": ["requiredString", "requiredUnion"] +} diff --git a/.github/schemas/me.chuwy/pg-test/jsonschema/1-0-1 b/.github/schemas/me.chuwy/pg-test/jsonschema/1-0-1 new file mode 100644 index 0000000..f54d021 --- /dev/null +++ b/.github/schemas/me.chuwy/pg-test/jsonschema/1-0-1 @@ -0,0 +1,24 @@ +{ + "self": { + "vendor": "me.chuwy", + "name": "pg-test", + "format": "jsonschema", + "version": "1-0-1" + }, + "properties": { + "requiredString": { "type": "string" }, + "requiredUnion": { "type": ["string", "boolean"] }, + "nested": { + "properties": { + "a": { "type": "number" }, + "b": {}, + "c": { "type": ["integer", "null"] } + }, + "required": ["a"] + }, + "someArray": { "type": "array" }, + "id": { "type": "string", "format": "uuid" }, + "someDate": { "type": "string", "format": "date-time" } + }, + "required": ["requiredString", "requiredUnion"] +} diff --git a/.github/schemas/me.chuwy/pg-test/jsonschema/1-0-2 b/.github/schemas/me.chuwy/pg-test/jsonschema/1-0-2 new file mode 100644 index 0000000..a2511ef --- /dev/null +++ b/.github/schemas/me.chuwy/pg-test/jsonschema/1-0-2 @@ -0,0 +1,25 @@ +{ + "self": { + "vendor": "me.chuwy", + "name": "pg-test", + "format": "jsonschema", + "version": "1-0-2" + }, + "properties": { + "requiredString": { "type": "string" }, + "requiredUnion": { "type": ["string", "boolean"] }, + "nested": { + "properties": { + "a": { "type": "number" }, + "b": {}, + "c": { "type": ["integer", "null"] } + }, + "required": ["a"] + }, + "someArray": { "type": "array" }, + "id": { "type": "string", "format": "uuid" }, + "someDate": { "type": "string", "format": "date-time" }, + "bigInt": { "type": "integer", "maximum": 100000000000000 } + }, + "required": ["requiredString", "requiredUnion"] +} diff --git a/.github/server.conf b/.github/server.conf new file mode 100644 index 0000000..0f6caf3 --- /dev/null +++ b/.github/server.conf @@ -0,0 +1,16 @@ +# Dummy Iglu Server configuration to assist in testing + +repo-server { + interface = "0.0.0.0" + port = 8080 + idleTimeout = 5 + threadPool = { + type = "global" + } +} + +database { + type = "dummy" +} + +debug = true \ No newline at end of file diff --git a/.github/start_environment.sh b/.github/start_environment.sh new file mode 100755 index 0000000..672604d --- /dev/null +++ b/.github/start_environment.sh @@ -0,0 +1,42 @@ +#!/bin/sh + +set -e + +if [ -z ${GITHUB_WORKSPACE+x} ]; then + echo "GITHUB_WORKSPACE is unset"; + exit 1 +fi + +IGLUCTL_ZIP="igluctl_0.7.2_rc1.zip" +IGLUCTL_URI="http://dl.bintray.com/snowplow/snowplow-generic/$IGLUCTL_ZIP" +IGLUCENTRAL_PATH="$GITHUB_WORKSPACE/iglu-central" +SCHEMAS_PATH="$IGLUCENTRAL_PATH/schemas/" +TEST_SCHEMAS="$GITHUB_WORKSPACE/.github/schemas/" +POSTGRES_PASSWORD=mysecretpassword + +git clone https://github.com/snowplow/iglu-central.git $IGLUCENTRAL_PATH + +docker run \ + -p 8080:8080 \ + -v $GITHUB_WORKSPACE/.github:/iglu \ + --rm -d \ + snowplow-docker-registry.bintray.io/snowplow/iglu-server:0.6.1 \ + --config /iglu/server.conf + +echo "Waiting for Iglu Server..." +sleep 5 + +wget $IGLUCTL_URI +unzip -j $IGLUCTL_ZIP + +./igluctl static push \ + $SCHEMAS_PATH \ + http://localhost:8080/ \ + 48b267d7-cd2b-4f22-bae4-0f002008b5ad \ + --public + +./igluctl static push \ + $TEST_SCHEMAS \ + http://localhost:8080/ \ + 48b267d7-cd2b-4f22-bae4-0f002008b5ad \ + --public diff --git a/.github/workflows/snyk.yml b/.github/workflows/snyk.yml new file mode 100644 index 0000000..356e980 --- /dev/null +++ b/.github/workflows/snyk.yml @@ -0,0 +1,20 @@ +name: Snyk + +on: + push: + branches: [ master ] + +jobs: + security: + runs-on: ubuntu-latest + + steps: + - uses: actions/checkout@v2 + + - name: Run Snyk to check for vulnerabilities + uses: snyk/actions/scala@master + with: + command: monitor + args: --project-name=snowplow-postgres-loader + env: + SNYK_TOKEN: ${{ secrets.SNYK_TOKEN }} diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..c474d2e --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,56 @@ +name: Test + +on: [push, pull_request] + +jobs: + test: + runs-on: ubuntu-latest + + services: + postgres: + image: postgres + ports: + - 5432:5432 + env: + # See src/test/scala/com/snowplowanalytics/snowplow/postgres/loader/Database.scala + POSTGRES_USER: postgres + POSTGRES_PASSWORD: mysecretpassword + POSTGRES_DB: snowplow + POSTGRES_PORT: 5432 + options: --health-cmd pg_isready --health-interval 10s --health-timeout 5s --health-retries 5 + + steps: + - uses: actions/checkout@v2 + - name: Set up JDK 1.8 + uses: actions/setup-java@v1 + with: + java-version: 1.8 + - name: Prepare test environment + run: $GITHUB_WORKSPACE/.github/start_environment.sh + - name: Run tests + run: sbt clean coverage test + - name: Submit coveralls data + if: ${{ always() }} + run: sbt coverageReport coveralls + env: + COVERALLS_REPO_TOKEN: ${{ secrets.COVERALLS_REPO_TOKEN }} + + deploy: + needs: test + if: startsWith(github.ref, 'refs/tags/') + runs-on: ubuntu-latest + steps: + - uses: actions/checkout@v2 + - name: Set up JDK 1.8 + uses: actions/setup-java@v1 + with: + java-version: 1.8 + - name: Compare SBT version with git tag + run: .github/check_tag.sh ${GITHUB_REF##*/} + - name: Docker login + run: docker login -u $DOCKER_USERNAME -p $DOCKER_PASSWORD + env: + DOCKER_USERNAME: ${{ secrets.DOCKER_USERNAME }} + DOCKER_PASSWORD: ${{ secrets.DOCKER_PASSWORD }} + - name: Build and publish Docker image + run: sbt docker:publish diff --git a/.gitignore b/.gitignore index 43f3f27..cf79df4 100644 --- a/.gitignore +++ b/.gitignore @@ -1,93 +1,4 @@ -# Covers JetBrains IDEs: IntelliJ, RubyMine, PhpStorm, AppCode, PyCharm, CLion, Android Studio, WebStorm and Rider -# Reference: https://intellij-support.jetbrains.com/hc/en-us/articles/206544839 - -# .idea/ - -# User-specific stuff -.idea/**/workspace.xml -.idea/**/tasks.xml -.idea/**/usage.statistics.xml -.idea/**/dictionaries -.idea/**/shelf - -# Generated files -.idea/**/contentModel.xml - -# Sensitive or high-churn files -.idea/**/dataSources/ -.idea/**/dataSources.ids -.idea/**/dataSources.local.xml -.idea/**/sqlDataSources.xml -.idea/**/dynamic.xml -.idea/**/uiDesigner.xml -.idea/**/dbnavigator.xml - -# Gradle -.idea/**/gradle.xml -.idea/**/libraries - -# Gradle and Maven with auto-import -# When using Gradle or Maven with auto-import, you should exclude module files, -# since they will be recreated, and may cause churn. Uncomment if using -# auto-import. -# .idea/artifacts -# .idea/compiler.xml -# .idea/jarRepositories.xml -# .idea/modules.xml -# .idea/*.iml -# .idea/modules -# *.iml -# *.ipr - -# CMake -cmake-build-*/ - -# Mongo Explorer plugin -.idea/**/mongoSettings.xml - -# File-based project format -*.iws - -# IntelliJ -out/ - -# mpeltonen/sbt-idea plugin -.idea_modules/ - -# JIRA plugin -atlassian-ide-plugin.xml - -# Cursive Clojure plugin -.idea/replstate.xml - -# Crashlytics plugin (for Android Studio and IntelliJ) -com_crashlytics_export_strings.xml -crashlytics.properties -crashlytics-build.properties -fabric.properties - -# Editor-based Rest Client -.idea/httpRequests - -# Android studio 3.1+ serialized cache file -.idea/caches/build_file_checksums.ser - -# Misc -*.class -*.log target/ project/target/ -project/boot/ -dist/ -boot/ -logs/ -tmp/ -projectFilesBackup/ -.history -.DS_STORE -.cache -.settings -.project -.classpath -version.properties -RUNNING_PID + +.travis/igluctl* diff --git a/.scalafix.conf b/.scalafix.conf new file mode 100644 index 0000000..c6e4e6d --- /dev/null +++ b/.scalafix.conf @@ -0,0 +1,17 @@ +rules = [ + DisableSyntax +] +DisableSyntax.noUniversalEquality = true +DisableSyntax.noVars = true +DisableSyntax.noNulls = true +DisableSyntax.noReturns = true +DisableSyntax.noWhileLoops = true +DisableSyntax.noAsInstanceOf = true +DisableSyntax.noIsInstanceOf = true +DisableSyntax.noXml = true +DisableSyntax.noDefaultArgs = true +DisableSyntax.noFinalVal = true +DisableSyntax.noFinalize = true +DisableSyntax.noValPatterns = true +DisableSyntax.noUniversalEquality = true +// DisableSyntax.noThrows = true diff --git a/.scalafmt.conf b/.scalafmt.conf new file mode 100644 index 0000000..2c944fb --- /dev/null +++ b/.scalafmt.conf @@ -0,0 +1,22 @@ +version = 2.5.2 +style = default +maxColumn = 140 +optIn.breakChainOnFirstMethodDot = false +assumeStandardLibraryStripMargin = true +align = none +align.openParenCallSite = true +align.openParenDefnSite = true +danglingParentheses = true +verticalMultiline.newlineAfterOpenParen = true +newlines.afterCurlyLambda = preserve +continuationIndent.defnSite = 2 +rewrite.rules = [ + AsciiSortImports, + AvoidInfix, + PreferCurlyFors, + RedundantBraces, + RedundantParens, + SortModifiers +] +project.git = true +includeNoParensInSelectChains = true diff --git a/.travis.yml b/.travis.yml new file mode 100644 index 0000000..711e2b0 --- /dev/null +++ b/.travis.yml @@ -0,0 +1,31 @@ +language: scala +dist: trusty +scala: + - 2.13.2 +cache: + directories: + - $HOME/.ivy2 + - $HOME/.coursier + - $HOME/.sbt +jdk: + - oraclejdk8 +services: + - docker +script: + - sbt test +before_install: + - bash ./.travis/checkTag.sh $TRAVIS_TAG + - docker login -u $DOCKER_USERNAME -p $DOCKER_PASSWORD + - sbt publishLocal +deploy: + provider: script + script: ./.travis/deploy.sh $TRAVIS_TAG + skip_cleanup: true + on: + tags: true +env: + global: + # DOCKER_USERNAME + - secure: encrypted_username + # DOCKER_PASSWORD + - secure: encrypted_password diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md new file mode 100644 index 0000000..0da24f4 --- /dev/null +++ b/CONTRIBUTING.md @@ -0,0 +1,104 @@ +# Contributor guide + +Snowplow is maintained by the pipeline team at Snowplow Analytics and improved on by external contributors for which we are +extremely grateful. + +## Getting in touch + +### Community support requests + +First and foremost, please do not log an issue if you are asking for support, all of our community support requests go through +our Discourse forum: https://discourse.snowplowanalytics.com/. + +Posting your problem there ensures more people will see it and you should get support faster than creating a new issue on +GitHub. Please do create a new issue on GitHub if you think you've found a bug though! + +### Gitter + +If you want to discuss already created issues, potential bugs, new features you would like to work on or any kind of developer +chat, you can head over to our [Gitter room](https://gitter.im/snowplow/snowplow). + +## Roadmap visibility + +Being an open source company, transparency is very important to us, that's why we try to share as much as possible regarding +what we will be working on next so that you can: + +- see how your contributions fit into our roadmap +- help us design new features +- share your opinions on the technical direction of the Snowplow pipeline + +You can peek into what the pipeline team is working on by looking at +[the open GitHub projects](https://github.com/snowplow/snowplow/projects). + +For insights into what we will be working on next, you can look at +[the RFC category in our Discourse](https://discourse.snowplowanalytics.com/c/roadmap/rfcs). + +## Repository structure + +The `snowplow/snowplow` project is split into different Scala projects: + +- [`2-collectors/scala-stream-collector`](https://github.com/snowplow/snowplow/tree/master/2-collectors/scala-stream-collector) +which contains the code to collect events as HTTP requests and output raw events to a streaming platform (Kafka, Kinesis, +NSQ or PubSub) +- [`3-enrich/scala-common-enrich`](https://github.com/snowplow/snowplow/tree/master/3-enrich/scala-common-enrich), a +library common to all the enrichers listed below which turns the raw events outputted by a collector into validated and +enriched events +- [`3-enrich/stream-enrich`](https://github.com/snowplow/snowplow/tree/master/3-enrich/stream-enrich), the pipeline which +turns a stream of raw events into a stream of validated and enriched events and pushes them to a streaming platform (Kafka, +Kinesis, NSQ or PubSub) + +All of these projects can be built and tested with [SBT](https://www.scala-sbt.org/). + +## Issues + +### Creating an issue + +The project contains an issue template which should help guiding you through the process. However, please keep in mind +that support requests should go to our Discourse forum: https://discourse.snowplowanalytics.com/ and not GitHub issues. + +It's also a good idea to log an issue before starting to work on a pull request to discuss it with the maintainers. + +### Working on an issue + +If you see an issue you would like to work on, please let us know in the issue! That will help us in terms of scheduling and +not doubling the amount of work. + +If you don't know where to start contributing, you can look at +[the issues labeled `good first issue`](https://github.com/snowplow/snowplow/labels/good%20first%20issue). + +## Pull requests + +These are a few guidelines to keep in mind when opening pull requests, there is a GitHub template that reiterates most of the +points described here. + +### Commit hygiene + +We keep a strict 1-to-1 correspondance between commits and issues, as such our commit messages are formatted in the following +fashion: + +`Component: add issues description (closes #1234)` + +for example: + +`Scala Common Enrich: add Vero adapter (closes #1234)` + +### Writing tests + +Whenever necessary, it's good practice to add the corresponding unit tests to whichever feature you are working on. + +### Feedback cycle + +Reviews should happen fairly quickly during weekdays. If you feel your pull request has been forgotten, please ping one +or more maintainers in the pull request. + +### Getting your pull request merged + +If your pull request is fairly chunky, there might be a non-trivial delay between the moment the pull request is approved and +the moment it gets merged. This is because your pull request will have been scheduled for a specific milestone which might or +might not be actively worked on by a maintainer at the moment. + +### Contributor license agreement + +We require outside contributors to sign a Contributor license agreement (or CLA) before we can merge their pull requests. +You can find more information on the topic in [the dedicated wiki page](https://github.com/snowplow/snowplow/wiki/CLA). +The @snowplowcla bot will guide you through the process. diff --git a/README.md b/README.md new file mode 100644 index 0000000..055e520 --- /dev/null +++ b/README.md @@ -0,0 +1,48 @@ +[![License][license-image]][license] +[![Coverage Status][coveralls-image]][coveralls] +[![Test][test-image]][test] +[![Release][release-image]][release] + +# Snowplow Postgres Loader + +## Quickstart + +Assuming [Docker][docker] is installed: + +1. Add own `config.json` (specify connection and stream details) +2. Add own `resolver.json` (all schemas must be on [Iglu Server][iglu-server]) +3. Run the Docker image: + +```bash +$ docker run --rm -v $PWD/config:/snowplow/config snowplow-postgres-loader \ + --resolver /snowplow/config/resolver.json \ + --config /snowplow/config/config.json +``` + +## Copyright and License + +Snowplow pg-loader is copyright 2020 Snowplow Analytics Ltd. + +Licensed under the **[Apache License, Version 2.0][license]** (the "License"); +you may not use this software except in compliance with the License. + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. + +[docker]: https://www.docker.com/ +[iglu-server]: https://github.com/snowplow-incubator/iglu-server + +[release-image]: http://img.shields.io/badge/release-0.1.0-blue.svg?style=flat +[release]: https://github.com/snowplow/pgloader/releases + +[test]: https://github.com/snowplow/enrich/actions?query=workflow%3ATest +[test-image]: https://github.com/snowplow/enrich/workflows/Test/badge.svg + +[license]: http://www.apache.org/licenses/LICENSE-2.0 +[license-image]: http://img.shields.io/badge/license-Apache--2-blue.svg?style=flat + +[coveralls]: https://coveralls.io/github/snowplow/enrich?branch=master +[coveralls-image]: https://coveralls.io/repos/github/snowplow/enrich/badge.svg?branch=master diff --git a/build.sbt b/build.sbt new file mode 100644 index 0000000..9b13859 --- /dev/null +++ b/build.sbt @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ + +lazy val root = project + .in(file(".")) + .enablePlugins(JavaAppPackaging, DockerPlugin, BuildInfoPlugin) + .settings(BuildSettings.projectSettings) + .settings(BuildSettings.dockerSettings) + .settings(BuildSettings.buildInfoSettings) + .settings(BuildSettings.scoverageSettings) + .settings( + resolvers += Dependencies.SnowplowBintray, + libraryDependencies ++= Seq( + Dependencies.logger, + Dependencies.postgres, + Dependencies.commons, + Dependencies.catsEffect, + Dependencies.decline, + Dependencies.circe, + Dependencies.circeGeneric, + Dependencies.circeExtras, + Dependencies.circeParser, + Dependencies.circeLiteral, + Dependencies.doobie, + Dependencies.doobiePg, + Dependencies.doobiePgCirce, + Dependencies.doobieHikari, + Dependencies.fs2Aws, + Dependencies.fs2PubSub, + Dependencies.analyticsSdk, + Dependencies.badRows, + Dependencies.schemaDdl, + Dependencies.specs2 + ) + ) + +addCompilerPlugin("com.olegpy" %% "better-monadic-for" % "0.3.1") diff --git a/config/config.json b/config/config.json new file mode 100644 index 0000000..6c179de --- /dev/null +++ b/config/config.json @@ -0,0 +1,25 @@ +{ + "schema": "iglu:com.snowplowanalytics.snowplow.storage/postgresql_config/jsonschema/3-0-0", + "data": { + "name": "Acme Ltd. Snowplow Postgres", + "id": "5c5e4353-4eeb-43da-98f8-2de6dc7fa947", + "source": { + "kinesis": { + "appName": "acme-postgres-loader", + "streamName": "enriched-events", + "region": "eu-central-1", + "initialPosition": "TRIM_HORIZON" + } + }, + + "host": "localhost", + "database": "snowplow", + "port": 5432, + "username": "postgres", + "password": "mysecretpassword", + "schema": "atomic", + + "sslMode": "REQUIRE", + "purpose": "ENRICHED_EVENTS" + } +} diff --git a/config/resolver.json b/config/resolver.json new file mode 100644 index 0000000..e4a3ac3 --- /dev/null +++ b/config/resolver.json @@ -0,0 +1,34 @@ +{ + "schema": "iglu:com.snowplowanalytics.iglu/resolver-config/jsonschema/1-0-2", + "data": { + "cacheSize": 500, + "cacheTtl": 600, + "repositories": [ + { + "name": "Iglu Central", + "priority": 1, + "vendorPrefixes": [ + "com.snowplowanalytics" + ], + "connection": { + "http": { + "uri": "http://iglucentral.com" + } + } + }, + + { + "name": "Iglu Central - Mirror 01", + "priority": 1, + "vendorPrefixes": [ + "com.snowplowanalytics" + ], + "connection": { + "http": { + "uri": "http://mirror01.iglucentral.com" + } + } + } + ] + } +} diff --git a/project/BuildSettings.scala b/project/BuildSettings.scala new file mode 100644 index 0000000..0832e4c --- /dev/null +++ b/project/BuildSettings.scala @@ -0,0 +1,61 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ + +// sbt +import sbt._ +import Keys._ + +import sbtbuildinfo.BuildInfoKey +import sbtbuildinfo.BuildInfoKeys._ + +import com.typesafe.sbt.SbtNativePackager.autoImport._ +import com.typesafe.sbt.packager.linux.LinuxPlugin.autoImport._ +import com.typesafe.sbt.packager.docker.DockerPlugin.autoImport._ + +import scoverage.ScoverageKeys._ + +object BuildSettings { + lazy val projectSettings = Seq( + organization := "com.snowplowanalytics", + name := "snowplow-postgres-loader", + version := "0.1.0-rc1", + scalaVersion := "2.13.2", + description := "Loading Snowplow enriched data into PostgreSQL in real-time", + parallelExecution in Test := false + ) + + lazy val buildInfoSettings = Seq( + buildInfoKeys := Seq[BuildInfoKey](name, version), + buildInfoPackage := "com.snowplowanalytics.snowplow.postgres.loader.generated" + ) + + /** Docker image settings */ + lazy val dockerSettings = Seq( + maintainer in Docker := "Snowplow Analytics Ltd. ", + dockerBaseImage := "snowplow-docker-registry.bintray.io/snowplow/base-debian:0.1.0", + daemonUser in Docker := "snowplow", + dockerUpdateLatest := true, + + daemonUserUid in Docker := None, + defaultLinuxInstallLocation in Docker := "/home/snowplow", // must be home directory of daemonUser + ) + + lazy val scoverageSettings = Seq( + coverageMinimum := 50, + coverageFailOnMinimum := false, + coverageExcludedPackages := "^target/.*", + (test in Test) := { + (coverageReport dependsOn (test in Test)).value + } + ) +} diff --git a/project/Dependencies.scala b/project/Dependencies.scala new file mode 100644 index 0000000..e23de8b --- /dev/null +++ b/project/Dependencies.scala @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ + +import sbt._ + +object Dependencies { + + lazy val SnowplowBintray = "Snowplow Bintray" at "https://snowplow.bintray.com/snowplow-maven" + + object V { + // Java + val slf4j = "1.7.30" + val postgres = "42.2.14" + val commons = "1.13" + + // Scala third-party + val decline = "1.2.0" + val catsEffect = "2.1.2" + val circe = "0.13.0" + val fs2Aws = "2.28.42" + val fs2PubSub = "0.15.0" + val doobie = "0.9.0" + val fs2 = "2.4.2" + + val analyticsSdk = "2.0.1" + val badRows = "2.0.0" + val schemaDdl = "0.11.0" + + // Testing + val specs2 = "4.9.4" + val scalaCheck = "1.14.3" + } + + // Java + val logger = "org.slf4j" % "slf4j-simple" % V.slf4j + + // Snyk warnings + val postgres = "org.postgresql" % "postgresql" % V.postgres + val commons = "commons-codec" % "commons-codec" % V.commons + + // Scala third-party + val decline = "com.monovore" %% "decline" % V.decline + val catsEffect = "org.typelevel" %% "cats-effect" % V.catsEffect + val fs2 = "co.fs2" %% "fs2-core" % V.fs2 + val fs2Io = "co.fs2" %% "fs2-io" % V.fs2 + val circe = "io.circe" %% "circe-core" % V.circe + val circeGeneric = "io.circe" %% "circe-generic" % V.circe + val circeExtras = "io.circe" %% "circe-generic-extras" % V.circe + val circeParser = "io.circe" %% "circe-parser" % V.circe + val circeLiteral = "io.circe" %% "circe-literal" % V.circe + val fs2Aws = "io.laserdisc" %% "fs2-aws" % V.fs2Aws + val fs2PubSub = "com.permutive" %% "fs2-google-pubsub-grpc" % V.fs2PubSub + val doobie = "org.tpolecat" %% "doobie-core" % V.doobie + val doobiePg = "org.tpolecat" %% "doobie-postgres" % V.doobie + val doobiePgCirce = "org.tpolecat" %% "doobie-postgres-circe" % V.doobie + val doobieHikari = "org.tpolecat" %% "doobie-hikari" % V.doobie + + // Scala first-party + val analyticsSdk = "com.snowplowanalytics" %% "snowplow-scala-analytics-sdk" % V.analyticsSdk + val badRows = "com.snowplowanalytics" %% "snowplow-badrows" % V.badRows + val schemaDdl = "com.snowplowanalytics" %% "schema-ddl" % V.schemaDdl + + // Testing + val specs2 = "org.specs2" %% "specs2-core" % V.specs2 % Test + val scalaCheck = "org.scalacheck" %% "scalacheck" % V.scalaCheck % Test + +} diff --git a/project/build.properties b/project/build.properties new file mode 100644 index 0000000..654fe70 --- /dev/null +++ b/project/build.properties @@ -0,0 +1 @@ +sbt.version=1.3.12 diff --git a/project/plugins.sbt b/project/plugins.sbt new file mode 100644 index 0000000..c881427 --- /dev/null +++ b/project/plugins.sbt @@ -0,0 +1,9 @@ +logLevel := Level.Warn + +addSbtPlugin("org.scalameta" % "sbt-scalafmt" % "2.4.0") +addSbtPlugin("org.scoverage" % "sbt-scoverage" % "1.6.1") +addSbtPlugin("com.typesafe.sbt" % "sbt-native-packager" % "1.7.3") +addSbtPlugin("io.github.davidgregory084" % "sbt-tpolecat" % "0.1.11") +addSbtPlugin("ch.epfl.scala" % "sbt-scalafix" % "0.9.15") +addSbtPlugin("com.eed3si9n" % "sbt-buildinfo" % "0.9.0") +addSbtPlugin("org.scoverage" % "sbt-coveralls" % "1.2.7") diff --git a/src/main/resources/iglu-client-embedded/schemas/com.snowplowanalytics.snowplow.storage/postgresql_config/jsonschema/3-0-0 b/src/main/resources/iglu-client-embedded/schemas/com.snowplowanalytics.snowplow.storage/postgresql_config/jsonschema/3-0-0 new file mode 100644 index 0000000..5cb1e8c --- /dev/null +++ b/src/main/resources/iglu-client-embedded/schemas/com.snowplowanalytics.snowplow.storage/postgresql_config/jsonschema/3-0-0 @@ -0,0 +1,218 @@ +{ + "$schema": "http://iglucentral.com/schemas/com.snowplowanalytics.self-desc/schema/jsonschema/1-0-0#", + "description": "Snowplow PostgreSQL storage configuration", + "self": { + "vendor": "com.snowplowanalytics.snowplow.storage", + "name": "postgresql_config", + "format": "jsonschema", + "version": "3-0-0" + }, + "type": "object", + "properties": { + "name": { + "description": "Human-readable storage target name, used only for logging", + "type": "string", + "maxLength": 255 + }, + "id": { + "description": "Machine-readable unique identificator", + "type": "string", + "format": "uuid" + }, + "source": { + "type": "object", + "oneOf": [ + { + "properties": { + "kinesis": { + "properties": { + "appName": { + "description": "Kinesis app name", + "type": "string" + }, + "streamName": { + "description": "Kinesis stream", + "type": "string" + }, + "region": { + "description": "AWS Region", + "type": "string" + }, + "initialPosition": { + "description": "Initial position in the Kinesis stream", + "oneOf": [ + { + "enum": ["TRIM_HORIZON", "LATEST"] + }, + { + "type": "object", + "properties": { + "AT_TIMESTAMP": { + "properties": { + "timestamp": { + "description": "Timestamp to load data from, e.g. 2020-06-06T00:00:00Z", + "type": "string", + "format": "date-time" + } + }, + "additionalProperties": false, + "required": ["timestamp"] + } + }, + "additionalProperties": false, + "required": ["AT_TIMESTAMP"] + } + ] + } + }, + "required": ["appName", "streamName", "region", "initialPosition"] + } + } + } + ] + }, + "host": { + "description": "PostgreSQL host ('localhost' for enabled SSH Tunnel)", + "type": "string", + "anyOf": [ + { "format": "hostname" }, + { "format": "ipv4" }, + { "format": "ipv6" } + ] + }, + "database": { + "description": "PostgreSQL database name", + "type": "string", + "minLength": 1, + "maxLength": 64 + }, + "port": { + "description": "PostgreSQL database port", + "type": "integer", + "minimum": 1, + "maximum": 65535 + }, + "username": { + "description": "PostgreSQL user used to load data", + "type": "string", + "maxLength": 64 + }, + "password": { + "description": "PostgreSQL password, either plain-text or encrypted key for EC2 Parameter Storage", + "type": ["string", "object"], + "properties": { + "ec2ParameterStore": { + "description": "EC2 Parameter Storage configuration", + "type": "object", + "properties": { + "parameterName": { + "description": "EC2 Parameter with encrypted password", + "type": "string" + } + }, + "required": ["parameterName"] + } + }, + "required": ["ec2ParameterStore"] + }, + "schema": { + "description": "PostgreSQL database schema (e.g. 'atomic')", + "type": "string", + "maxLength": 64 + }, + "sshTunnel": { + "description": "Optional SSH Tunnel configuration", + "type": ["object", "null"], + "properties": { + "bastion": { + "description": "Bastion host configuration", + "type": "object", + "properties": { + "host": { + "description": "Bastion SSH host", + "type": "string", + "anyOf": [ + { "format": "hostname" }, + { "format": "ipv4" }, + { "format": "ipv6" } + ] + }, + "port": { + "description": "Bastion SSH port", + "type": "integer", + "minimum": 1, + "maximum": 65535 + }, + "user": { + "description": "SSH user", + "type": "string" + }, + "passphrase": { + "description": "Plain-text SSH user's passphrase", + "type": ["string", "null"], + "maxLength": 2048 + }, + "key": { + "description": "SSH-key stored in EC2 Parameter Storage", + "type": ["object", "null"], + "properties": { + "ec2ParameterStore": { + "type": "object", + "properties": { + "parameterName": { + "type": "string", + "maxLength": 2048 + } + }, + "required": ["parameterName"] + } + }, + "required": ["ec2ParameterStore"] + } + }, + "required": ["host", "port", "user", "passphrase", "key"] + }, + "destination": { + "description": "Database socket inside private network", + "type": "object", + "properties": { + "host": { + "description": "PostgreSQL host inside private network (root-level host should be changed to 'localhost')", + "type": "string", + "anyOf": [ + { "format": "hostname" }, + { "format": "ipv4" }, + { "format": "ipv6" } + ] + }, + "port": { + "description": "PostgreSQL port inside private network (root-level port should be changed to be indentical to 'localPort')", + "type": "integer", + "minimum": 1, + "maximum": 65535 + } + }, + "required": ["host", "port"] + }, + "localPort": { + "description": "Arbitrary port on node, running Loader (shoul be identical to root-level 'port')", + "type": "integer", + "minimum": 1, + "maximum": 65535 + } + }, + "required": ["bastion", "destination", "localPort"] + }, + "sslMode": { + "description": "JDBC sslMode", + "type": "string", + "enum": ["DISABLE", "REQUIRE", "VERIFY_CA", "VERIFY_FULL"] + }, + "purpose": { + "description": "Kind of data stored in this instance", + "enum": ["ENRICHED_EVENTS"] + } + }, + "additionalProperties": false, + "required": ["name", "id", "host", "database", "port", "username", "password", "schema", "sslMode", "purpose"] +} \ No newline at end of file diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala new file mode 100644 index 0000000..bf30261 --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/Main.scala @@ -0,0 +1,51 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader + +import cats.effect.{ExitCode, IO, IOApp} +import cats.implicits._ + +import doobie.util.log.LogHandler + +import com.snowplowanalytics.snowplow.postgres.loader.config.Cli +import com.snowplowanalytics.snowplow.postgres.loader.config.LoaderConfig.Purpose +import com.snowplowanalytics.snowplow.postgres.loader.storage.utils +import com.snowplowanalytics.snowplow.postgres.loader.streaming.{sink, source} + +object Main extends IOApp { + + def run(args: List[String]): IO[ExitCode] = + Cli.parse[IO](args).value.flatMap { + case Right(Cli(postgres, iglu, debug)) => + val logger = if (debug) LogHandler.jdkLogHandler else LogHandler.nop + resources.initialize[IO](postgres, logger, iglu).use { + case (blocker, xa, state) => + source.getSource[IO](blocker, postgres.purpose, postgres.source) match { + case Right(dataStream) => + for { + _ <- postgres.purpose match { + case Purpose.Enriched => utils.prepare[IO](postgres.schema, xa, logger) + case Purpose.SelfDescribing => IO.unit + } + goodSink = sink.goodSink[IO](xa, logger, postgres.schema, state, iglu) + _ <- dataStream.observeEither(sink.badSink[IO], goodSink).compile.drain + } yield ExitCode.Success + case Left(error) => + IO.delay(System.err.println(s"Source initialization error\n${error.getMessage}")).as(ExitCode.Error) + } + } + + case Left(error) => + IO.delay(System.err.println(s"Configuration initialization failure\n$error")).as(ExitCode.Error) + } +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/config/Base64Encoded.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/config/Base64Encoded.scala new file mode 100644 index 0000000..fd119b4 --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/config/Base64Encoded.scala @@ -0,0 +1,47 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.config + +import java.util.Base64 + +import cats.syntax.either._ +import cats.data.ValidatedNel + +import io.circe.Json +import io.circe.parser.{ parse => jsonParse } + +import com.monovore.decline.Argument + +/** Base64-encoded JSON */ +case class Base64Encoded(json: Json) extends AnyVal + +object Base64Encoded { + def parse(string: String): Either[String, Base64Encoded] = + Either + .catchOnly[IllegalArgumentException](Base64.getDecoder.decode(string)) + .map(bytes => new String(bytes)) + .flatMap(str => jsonParse(str)) + .leftMap(e => s"Cannot parse ${string} as Base64-encoded JSON: ${e.getMessage}") + .map(json => Base64Encoded(json)) + + implicit def base64EncodedDeclineArg: Argument[Base64Encoded] = + new Argument[Base64Encoded] { + def read(string: String): ValidatedNel[String, Base64Encoded] = + Base64Encoded.parse(string).toValidatedNel + + def defaultMetavar: String = "base64" + } + +} + + diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/config/Cli.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/config/Cli.scala new file mode 100644 index 0000000..f2dba6a --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/config/Cli.scala @@ -0,0 +1,120 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.config + +import java.nio.file.{InvalidPathException, Files, Path, Paths} +import java.util.Base64 + +import cats.data.{EitherT, ValidatedNel} +import cats.implicits._ + +import cats.effect.{Sync, Clock} + +import io.circe.Json +import io.circe.syntax._ +import io.circe.parser.{parse => jsonParse} + +import com.snowplowanalytics.iglu.core.SelfDescribingData +import com.snowplowanalytics.iglu.core.circe.implicits._ + +import com.snowplowanalytics.iglu.client.Client + +import com.snowplowanalytics.snowplow.badrows.Processor +import com.snowplowanalytics.snowplow.postgres.loader.generated.BuildInfo + +import com.monovore.decline._ + +case class Cli[F[_]](postgres: LoaderConfig, iglu: Client[F, Json], debug: Boolean) + +object Cli { + + val processor = Processor(BuildInfo.name, BuildInfo.version) + + /** Parse list of arguments, validate against schema and initialize */ + def parse[F[_]: Sync: Clock](args: List[String]): EitherT[F, String, Cli[F]] = + command.parse(args) match { + case Left(help) => EitherT.leftT[F, Cli[F]](help.show) + case Right(rawConfig) => fromRawConfig(rawConfig) + } + + private def fromRawConfig[F[_]: Sync: Clock](rawConfig: RawConfig): EitherT[F, String, Cli[F]] = { + for { + resolverJson <- PathOrJson.load(rawConfig.resolver) + igluClient <- Client.parseDefault[F](resolverJson).leftMap(_.show) + configJson <- PathOrJson.load(rawConfig.config) + configData <- SelfDescribingData.parse(configJson).leftMap(e => s"Configuration JSON is not self-describing, ${e.message(configJson.noSpaces)}").toEitherT[F] + _ <- igluClient.check(configData).leftMap(e => s"Iglu validation failed with following error\n: ${e.asJson.spaces2}") + pgConfig <- configData.data.as[LoaderConfig].toEitherT[F].leftMap(e => s"Error while decoding configuration JSON, ${e.show}") + } yield Cli(pgConfig, igluClient, rawConfig.debug) + } + + /** Config files for Loader can be passed either as FS path + * or as base64-encoded JSON (if `--base64` is provided) */ + type PathOrJson = Either[Path, Json] + + object PathOrJson { + def parse(string: String, encoded: Boolean): ValidatedNel[String, PathOrJson] = { + val result = if (encoded) + Either + .catchOnly[IllegalArgumentException](new String(Base64.getDecoder.decode(string))) + .leftMap(_.getMessage) + .flatMap(s => jsonParse(s).leftMap(_.show)) + .map(_.asRight) + else Either.catchOnly[InvalidPathException](Paths.get(string).asLeft).leftMap(_.getMessage) + result + .leftMap(error => s"Cannot parse as ${if (encoded) "base64-encoded JSON" else "FS path"}: $error") + .toValidatedNel + } + + def load[F[_]: Sync](value: PathOrJson): EitherT[F, String, Json] = + value match { + case Right(json) => + EitherT.rightT[F, String](json) + case Left(path) => + Either + .catchNonFatal(new String(Files.readAllBytes(path))) + .leftMap(e => s"Cannot read the file path: $e") + .flatMap(s => jsonParse(s).leftMap(_.show)) + .toEitherT[F] + } + } + + val resolver = Opts.option[String]( + long = "resolver", + help = "Iglu Resolver JSON config, FS path or base64-encoded" + ) + + val config = Opts.option[String]( + long = "config", + help = "Self-describing JSON configuration" + ) + + val base64 = Opts.flag( + long = "base64", + help = "Configuration passed as Base64-encoded string, not as file path" + ).orFalse + + val debug = Opts.flag( + long = "debug", + help = "Show verbose SQL logging" + ).orFalse + + /** Temporary, pure config */ + private case class RawConfig(config: PathOrJson, resolver: PathOrJson, debug: Boolean) + + private val command: Command[RawConfig] = + Command[(String, String, Boolean, Boolean)](BuildInfo.name, BuildInfo.version)((config, resolver, base64, debug).tupled) + .mapValidated { case (cfg, res, enc, deb) => + (PathOrJson.parse(cfg, enc), PathOrJson.parse(res, enc), deb.validNel).mapN(RawConfig.apply) + } +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/config/LoaderConfig.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/config/LoaderConfig.scala new file mode 100644 index 0000000..370b82a --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/config/LoaderConfig.scala @@ -0,0 +1,126 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.config + +import java.util.{UUID, Date} +import java.time.Instant + +import scala.jdk.CollectionConverters._ + +import cats.syntax.either._ + +import io.circe.Decoder +import io.circe.generic.semiauto.deriveDecoder +import io.circe.generic.extras.Configuration +import io.circe.generic.extras.semiauto.deriveConfiguredDecoder + +import com.snowplowanalytics.snowplow.postgres.loader.config.LoaderConfig.{JdbcUri, Source, Purpose} + +import software.amazon.awssdk.regions.Region +import software.amazon.kinesis.common.InitialPositionInStream + +case class LoaderConfig(name: String, + id: UUID, + source: Source, + host: String, + port: Int, + database: String, + username: String, + password: String, // TODO: can be EC2 store + sslMode: String, + schema: String, + purpose: Purpose) { + def getJdbc: JdbcUri = + JdbcUri(host, port, database) +} + +object LoaderConfig { + + sealed trait Purpose extends Product with Serializable + object Purpose { + case object Enriched extends Purpose + case object SelfDescribing extends Purpose + + implicit def ioCirceConfigPurposeDecoder: Decoder[Purpose] = + Decoder.decodeString.emap { + case "ENRICHED_EVENTS" => Enriched.asRight + case "JSON" => SelfDescribing.asRight + case other => s"$other is not supported purpose, choose from ENRICHED_EVENTS and JSON".asLeft + } + } + + implicit val awsRegionDecoder: Decoder[Region] = + Decoder.decodeString.emap { s => + val allRegions = Region.regions().asScala.toSet.map((r: Region) => r.id()) + if (allRegions.contains(s)) Region.of(s).asRight + else s"Region $s is unknown, choose from [${allRegions.mkString(", ")}]".asLeft + } + + sealed trait InitPosition { + /** Turn it into fs2-aws-compatible structure */ + def unwrap: Either[InitialPositionInStream, Date] = this match { + case InitPosition.Latest => InitialPositionInStream.LATEST.asLeft + case InitPosition.TrimHorizon => InitialPositionInStream.TRIM_HORIZON.asLeft + case InitPosition.AtTimestamp(date) => Date.from(date).asRight + } + } + object InitPosition { + case object Latest extends InitPosition + case object TrimHorizon extends InitPosition + case class AtTimestamp(timestamp: Instant) extends InitPosition + + implicit val ioCirceInitPositionDecoder: Decoder[InitPosition] = + Decoder.decodeJson.emap { json => + json.asString match { + case Some("TRIM_HORIZON") => TrimHorizon.asRight + case Some("LATEST") => Latest.asRight + case Some(other) => + s"Initial position $other is unknown. Choose from LATEST and TRIM_HORIZEON. AT_TIMESTAMP must provide the timestamp".asLeft + case None => + val result = for { + root <- json.asObject.map(_.toMap) + atTimestamp <- root.get("AT_TIMESTAMP") + atTimestampObj <- atTimestamp.asObject.map(_.toMap) + timestampStr <- atTimestampObj.get("timestamp") + timestamp <- timestampStr.as[Instant].toOption + } yield AtTimestamp(timestamp) + result match { + case Some(atTimestamp) => atTimestamp.asRight + case None => "Initial position can be either LATEST or TRIM_HORIZON string or AT_TIMESTAMP object (e.g. 2020-06-03T00:00:00Z)".asLeft + } + } + } + } + + sealed trait Source extends Product with Serializable + object Source { + + case class Kinesis(appName: String, streamName: String, region: Region, initialPosition: InitPosition) extends Source + case class PubSub(projectId: String, subscriptionId: String) extends Source + + implicit val config: Configuration = + Configuration.default.withSnakeCaseConstructorNames + + implicit def ioCirceConfigSourceDecoder: Decoder[Source] = + deriveConfiguredDecoder[Source] + } + + case class JdbcUri(host: String, port: Int, database: String) { + override def toString = + s"jdbc:postgresql://$host:$port/$database" + } + + implicit def ioCirceConfigDecoder: Decoder[LoaderConfig] = + deriveDecoder[LoaderConfig] + +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/package.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/package.scala new file mode 100644 index 0000000..147d919 --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/package.scala @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres + +import cats.Eq + +import com.snowplowanalytics.iglu.core.SchemaKey + +import com.snowplowanalytics.iglu.schemaddl.jsonschema.{Pointer, JsonSchemaProperty} +import com.snowplowanalytics.iglu.schemaddl.jsonschema.Pointer.{SchemaProperty, Cursor} +import com.snowplowanalytics.iglu.schemaddl.jsonschema.properties.CommonProperties.Type + +package object loader { + implicit val typeEq: Eq[Type] = Eq.fromUniversalEquals[Type] + implicit val schemaPropertyEq: Eq[SchemaProperty] = Eq.fromUniversalEquals[SchemaProperty] + implicit val jsonSchemaPropertyEq: Eq[JsonSchemaProperty] = Eq.fromUniversalEquals[JsonSchemaProperty] + implicit val cursorEq: Eq[Cursor] = Eq.fromUniversalEquals[Cursor] + implicit val pointerEq: Eq[Pointer] = Eq.fromUniversalEquals[Pointer] + implicit val schemaKeyEq: Eq[SchemaKey] = Eq.fromUniversalEquals[SchemaKey] +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/resources.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/resources.scala new file mode 100644 index 0000000..de7c6ee --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/resources.scala @@ -0,0 +1,53 @@ +package com.snowplowanalytics.snowplow.postgres.loader + +import cats.implicits._ + +import cats.effect.concurrent.Ref +import cats.effect.{ContextShift, Async, Blocker, Clock, Resource, Sync} + +import doobie.hikari._ +import doobie.util.ExecutionContexts +import doobie.util.log.LogHandler +import doobie.util.transactor.Transactor + +import io.circe.Json + +import com.snowplowanalytics.iglu.client.Client + +import com.snowplowanalytics.snowplow.postgres.loader.config.LoaderConfig +import com.snowplowanalytics.snowplow.postgres.loader.config.LoaderConfig.JdbcUri +import com.snowplowanalytics.snowplow.postgres.loader.storage.PgState + +object resources { + + /** Initialise Blocking Thread Pool, Connection Pool and DB state */ + def initialize[F[_]: Async: Clock: ContextShift](postgres: LoaderConfig, + logger: LogHandler, + iglu: Client[F, Json]) = + for { + blocker <- Blocker[F] + xa <- resources.getTransactor[F](postgres.getJdbc, postgres.username, postgres.password, blocker) + + initState = storage.PgState.init[F](xa, logger, iglu.resolver, postgres.schema).value.flatMap { + case Left(error) => + val exception = new RuntimeException(s"Couldn't initalise the state $error") + Sync[F].raiseError[Ref[F, PgState]](exception) + case Right((issues, state)) => + issues.traverse(issue => Sync[F].delay(println(issue))).as(state) + } + state <- Resource.liftF(initState) + } yield (blocker, xa, state) + + /** Get a HikariCP transactor */ + def getTransactor[F[_]: Async: ContextShift](jdbcUri: JdbcUri, user: String, password: String, be: Blocker): Resource[F, HikariTransactor[F]] = + for { + ce <- ExecutionContexts.fixedThreadPool[F](32) + xa <- HikariTransactor.newHikariTransactor[F]("org.postgresql.Driver", jdbcUri.toString, user, password, ce, be) + } yield xa + + /** Get default single-threaded transactor (use for tests only) */ + def getTransactorDefault[F[_]: Async: ContextShift](jdbcUri: JdbcUri, username: String, password: String): Transactor[F] = + Transactor.fromDriverManager[F]( + "org.postgresql.Driver", jdbcUri.toString, username, password + ) +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/shredding/Entity.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/shredding/Entity.scala new file mode 100644 index 0000000..cdf37fb --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/shredding/Entity.scala @@ -0,0 +1,30 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.shredding + +import com.snowplowanalytics.iglu.core.SchemaKey + +import com.snowplowanalytics.snowplow.postgres.loader.shredding.Entity.Column + +case class Entity(tableName: String, origin: SchemaKey, columns: List[Column]) + +object Entity { + /** + * Table cell with value and meta info + * @param name Postgres column name + * @param dataType Postgres data type + * @param value ready-to-be-inserted value + */ + case class Column(name: String, dataType: Type, value: Value) + +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/shredding/Type.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/shredding/Type.scala new file mode 100644 index 0000000..497f121 --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/shredding/Type.scala @@ -0,0 +1,246 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.shredding + +import cats.implicits._ + +import io.circe.Json + +import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema +import com.snowplowanalytics.iglu.schemaddl.jsonschema.properties.CommonProperties.{Type => SType} +import com.snowplowanalytics.iglu.schemaddl.jsonschema.properties.NumberProperty.{MultipleOf, Maximum} +import com.snowplowanalytics.iglu.schemaddl.jsonschema.properties.StringProperty.{Format, MaxLength, MinLength} + +import com.snowplowanalytics.snowplow.postgres.loader._ + +sealed trait Type { + def ddl: String = this match { + case Type.Char(size) => s"CHAR($size)" + case Type.Varchar(size) => s"VARCHAR($size)" + case Type.Uuid => "UUID" + case Type.Timestamp => "TIMESTAMP" + case Type.Date => "DATE" + case Type.Integer => "INTEGER" + case Type.BigInt => "BIGINT" + case Type.Double => "DOUBLE PRECISION" + case Type.Bool => "BOOLEAN" + case Type.Jsonb => "JSONB" + } +} + +object Type { + + case class Char(size: Int) extends Type + case class Varchar(size: Int) extends Type + case object Uuid extends Type + case object Timestamp extends Type + case object Date extends Type + case object Integer extends Type + case object BigInt extends Type + case object Double extends Type + case object Bool extends Type + case object Jsonb extends Type + + + type DataTypeSuggestion = (Schema, String) => Option[Type] + + // For complex enums Suggest VARCHAR with length of longest element + val complexEnumSuggestion: DataTypeSuggestion = (properties, _) => + properties.enum match { + case Some(enums) if isComplexEnum(enums.value) => + val longest = excludeNull(enums.value).map(_.noSpaces.length).maximumOption.getOrElse(16) + Some(Type.Varchar(longest)) + case _ => None + } + + val productSuggestion: DataTypeSuggestion = (properties, _) => + properties.`type` match { + case Some(t: SType.Union) if t.isUnion => + Some(Type.Jsonb) + case Some(t: SType) if t === (SType.Array: SType) => + Some(Type.Jsonb) + case Some(SType.Union(types)) if types.contains(SType.Array) => + Some(Type.Jsonb) + case _ => None + } + + val timestampSuggestion: DataTypeSuggestion = (properties, _) => + (properties.`type`, properties.format) match { + case (Some(types), Some(Format.DateTimeFormat)) if types.possiblyWithNull(SType.String) => + Some(Type.Timestamp) + case _ => None + } + + val dateSuggestion: DataTypeSuggestion = (properties, _) => + (properties.`type`, properties.format) match { + case (Some(types), Some(Format.DateFormat)) if types.possiblyWithNull(SType.String) => + Some(Type.Date) + case _ => None + } + + val arraySuggestion: DataTypeSuggestion = (properties, _) => + properties.`type` match { + case Some(types) if types.possiblyWithNull(SType.Array) => + Some(Type.Varchar(4096)) + case _ => None + } + + val numberSuggestion: DataTypeSuggestion = (properties, _) => + (properties.`type`, properties.multipleOf) match { + case (Some(types), Some(MultipleOf.NumberMultipleOf(m))) if types.possiblyWithNull(SType.Number) && m === BigDecimal(1, 2) => + Some(Type.Double) + case (Some(types), _) if types.possiblyWithNull(SType.Number) => + Some(Type.Double) + case (Some(types: SType.Union), _) if (types.value - SType.Null) === Set(SType.Integer, SType.Number) => + Some(Type.Double) + case _ => + None + } + + // TODO: add more sizes + val integerSuggestion: DataTypeSuggestion = (properties, _) => { + (properties.`type`, properties.maximum, properties.enum, properties.multipleOf) match { + case (Some(types), Some(maximum), _, _) if types.possiblyWithNull(SType.Integer) => + if (isBigInt(maximum)) Type.BigInt.some + else Type.Integer.some + case (Some(types), None, _, _) if types.possiblyWithNull(SType.Integer) => + Type.BigInt.some + // Contains only enum + case (types, _, Some(_), _) if types.isEmpty || types.get.possiblyWithNull(SType.Integer) => + Type.Integer.some + case (Some(types), _, _, _) if types.possiblyWithNull(SType.Integer) => + Type.Integer.some + case (_, _, _, Some(MultipleOf.IntegerMultipleOf(_))) => + Type.Integer.some + case _ => None + } + } + + val charSuggestion: DataTypeSuggestion = (properties, _) => { + (properties.`type`, properties.minLength, properties.maxLength) match { + case (Some(types), Some(MinLength(min)), Some(MaxLength(max))) + if min === max && types.possiblyWithNull(SType.String) => + Some(Type.Char(min.toInt)) + case _ => None + } + } + + val booleanSuggestion: DataTypeSuggestion = (properties, _) => { + properties.`type` match { + case Some(types) if types.possiblyWithNull(SType.Boolean) => Some(Type.Bool) + case _ => None + } + } + + val uuidSuggestion: DataTypeSuggestion = (properties, _) => { + (properties.`type`, properties.format) match { + case (Some(types), Some(Format.UuidFormat)) if types.possiblyWithNull(SType.String) => + Some(Type.Uuid) + case _ => None + } + } + + val varcharSuggestion: DataTypeSuggestion = (properties, _) => { + (properties.`type`, properties.maxLength, properties.enum, properties.format) match { + case (Some(types), Some(maxLength), _, _) if types.possiblyWithNull(SType.String) => + Some(Type.Varchar(maxLength.value.toInt)) + case (_, _, Some(enum), _) => + enum.value.map(jsonLength).maximumOption match { + case Some(maxLength) if enum.value.lengthCompare(1) === 0 => + Some(Type.Varchar(maxLength)) + case Some(maxLength) => + Some(Type.Varchar(maxLength)) + case None => None + } + case _ => None + } + } + + + val dataTypeSuggestions: List[DataTypeSuggestion] = List( + complexEnumSuggestion, + productSuggestion, + timestampSuggestion, + dateSuggestion, + arraySuggestion, + integerSuggestion, + numberSuggestion, + booleanSuggestion, + charSuggestion, + uuidSuggestion, + varcharSuggestion + ) + + def getDataType(properties: Schema, + varcharSize: Int, + columnName: String, + suggestions: List[DataTypeSuggestion]): Type = { + + suggestions match { + case Nil => Type.Varchar(4096) // Generic + case suggestion :: tail => suggestion(properties, columnName) match { + case Some(format) => format + case None => getDataType(properties, varcharSize, columnName, tail) + } + } + } + + private def jsonLength(json: Json): Int = + json.fold(0, b => b.toString.length, _ => json.noSpaces.length, _.length, _ => json.noSpaces.length, _ => json.noSpaces.length) + + /** + * Get set of types or enum as string excluding null + * + * @param types comma-separated types + * @return set of strings + */ + private def excludeNull(types: List[Json]): List[Json] = + types.filterNot(_.isNull) + + /** + * Check enum contains some different types + * (string and number or number and boolean) + */ + private def isComplexEnum(enum: List[Json]) = { + // Predicates + def isNumeric(s: Json) = s.isNumber + def isNonNumeric(s: Json) = !isNumeric(s) + def isBoolean(s: Json) = s.isBoolean + + val nonNullEnum = excludeNull(enum) + somePredicates(nonNullEnum, List(isNumeric _, isNonNumeric _, isBoolean _), 2) + } + + def isBigInt(long: Maximum): Boolean = + long match { + case Maximum.IntegerMaximum(bigInt) => bigInt > 2147483647L + case _ => false + } + + /** + * Check at least some `quantity` of `predicates` are true on `instances` + * + * @param instances list of instances to check on + * @param predicates list of predicates to check + * @param quantity required quantity + */ + private def somePredicates(instances: List[Json], predicates: List[Json => Boolean], quantity: Int): Boolean = + if (quantity === 0) true + else + predicates match { + case Nil => false + case h :: tail if instances.exists(h) => somePredicates(instances, tail, quantity - 1) + case _ :: tail => somePredicates(instances, tail, quantity) + } +} + diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/shredding/Value.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/shredding/Value.scala new file mode 100644 index 0000000..7bae7d6 --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/shredding/Value.scala @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.shredding + +import java.sql.{Timestamp => JTimestamp} +import java.util.UUID +import java.time.Instant + +import io.circe.Json + +import doobie.syntax.string._ +import doobie.implicits.javasql._ +import doobie.postgres.implicits._ +import doobie.postgres.circe.jsonb.implicits._ +import doobie.util.fragment.Fragment + +sealed trait Value { + def fragment: Fragment = this match { + case Value.Uuid(value) => fr"$value" + case Value.Char(value) => fr"$value" + case Value.Varchar(value) => fr"$value" + case Value.Timestamp(value) => fr"$value" + case Value.Integer(value) => fr"$value" + case Value.BigInt(value) => fr"$value" + case Value.Double(value) => fr"$value" + case Value.Bool(value) => fr"$value" + case Value.Jsonb(value) => fr"$value" + } +} + +object Value { + case class Uuid(value: UUID) extends Value + case class Char(value: String) extends Value + case class Varchar(value: String) extends Value + case class Timestamp(value: JTimestamp) extends Value + case class Integer(value: Int) extends Value + case class BigInt(value: Long) extends Value + case class Double(value: scala.Double) extends Value + case class Bool(value: Boolean) extends Value + case class Jsonb(value: Json) extends Value + + object Timestamp { + def apply(instant: Instant): Timestamp = Timestamp(JTimestamp.from(instant)) + } +} + diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/shredding/schema.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/shredding/schema.scala new file mode 100644 index 0000000..8d017df --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/shredding/schema.scala @@ -0,0 +1,67 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.shredding + +import cats.Monad +import cats.data.EitherT + +import cats.effect.Clock + +import io.circe.Json + +import com.snowplowanalytics.iglu.client.{ClientError, Resolver} +import com.snowplowanalytics.iglu.client.resolver.registries.RegistryLookup +import com.snowplowanalytics.iglu.core.{SchemaList, SchemaCriterion, SelfDescribingSchema, SchemaKey, SchemaMap} + + +import com.snowplowanalytics.iglu.schemaddl.{IgluSchema, Properties} +import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema +import com.snowplowanalytics.iglu.schemaddl.jsonschema.properties.CommonProperties.{ Type => SType } +import com.snowplowanalytics.iglu.schemaddl.jsonschema.circe.implicits._ +import com.snowplowanalytics.iglu.schemaddl.migrations.{FlatSchema, SchemaList => DdlSchemaList} + +import com.snowplowanalytics.snowplow.badrows.FailureDetails + +/** Generic schema functionality, related to JSON schema (Iglu) transformations */ +object schema { + + def fetch[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F]) + (key: SchemaKey): EitherT[F, FailureDetails.LoaderIgluError, IgluSchema] = + for { + json <- EitherT(resolver.lookupSchema(key)).leftMap(error => FailureDetails.LoaderIgluError.IgluError(key, error): FailureDetails.LoaderIgluError) + schema <- EitherT.fromEither[F](Schema.parse(json).toRight(buildFailure(json, key))) + } yield SelfDescribingSchema(SchemaMap(key), schema) + + def buildFailure(json: Json, key: SchemaKey): FailureDetails.LoaderIgluError = + FailureDetails.LoaderIgluError.InvalidSchema(key, s"JSON ${json.noSpaces} cannot be parsed as JSON Schema"): FailureDetails.LoaderIgluError + + + def getOrdered[F[_]: Monad: RegistryLookup: Clock](resolver: Resolver[F]) + (vendor: String, name: String, model: Int): EitherT[F, FailureDetails.LoaderIgluError, Properties] = { + + val criterion = SchemaCriterion(vendor, name, "jsonschema", Some(model), None, None) + val schemaList = resolver.listSchemas(vendor, name, model) + for { + schemaList <- EitherT[F, ClientError.ResolutionError, SchemaList](schemaList).leftMap(error => FailureDetails.LoaderIgluError.SchemaListNotFound(criterion, error)) + ordered <- DdlSchemaList.fromSchemaList(schemaList, fetch(resolver)) + properties = FlatSchema.extractProperties(ordered) + } yield properties + } + + def canBeNull(schema: Schema): Boolean = + schema.enum.exists(_.value.exists(_.isNull)) || (schema.`type` match { + case Some(SType.Union(types)) => types.contains(SType.Null) + case Some(t) => t == SType.Null + case None => false + }) +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/shredding/transform.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/shredding/transform.scala new file mode 100644 index 0000000..666ada3 --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/shredding/transform.scala @@ -0,0 +1,288 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.shredding + +import java.time.Instant +import java.time.format.DateTimeParseException +import java.util.UUID +import java.sql.Timestamp + +import cats.data.{EitherT, EitherNel, NonEmptyList} +import cats.implicits._ + +import cats.effect.{Sync, Clock} + +import io.circe.{JsonNumber, Json, ACursor} + +import com.snowplowanalytics.iglu.core._ + +import com.snowplowanalytics.iglu.client.Client + +import com.snowplowanalytics.iglu.schemaddl.jsonschema.Pointer.SchemaPointer +import com.snowplowanalytics.iglu.schemaddl.{Properties, StringUtils} +import com.snowplowanalytics.iglu.schemaddl.jsonschema.{Pointer, Schema} +import com.snowplowanalytics.iglu.schemaddl.migrations.FlatSchema + +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.badrows.{FailureDetails, BadRow, Failure, Payload} +import com.snowplowanalytics.snowplow.postgres.loader.config.Cli +import com.snowplowanalytics.snowplow.postgres.loader.shredding.Entity.Column + +object transform { + val Atomic = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)) + + /** Transform the whole `Event` (canonical and JSONs) into list of independent entities ready to be inserted */ + def shredEvent[F[_]: Sync: Clock](client: Client[F, Json], event: Event): EitherT[F, BadRow, List[Entity]] = { + val entities = event.contexts.data ++ event.derived_contexts.data ++ event.unstruct_event.data.toList + val wholeEvent = entities + .parTraverse(shredJson(client)) + .value + .map { shreddedOrError => + (shreddedOrError, shredAtomic(Map())(event)).mapN { + (shreddedEntities, atomic) => atomic :: shreddedEntities.map(addMetadata(event.event_id, event.collector_tstamp)) + } + } + EitherT(wholeEvent).leftMap[BadRow](buildBadRow(event)) + } + + def addMetadata(eventId: UUID, tstamp: Instant)(entity: Entity): Entity = { + val metaColumns = List( + Column("schema_vendor", Type.Varchar(128), Value.Varchar(entity.origin.vendor)), + Column("schema_name", Type.Varchar(128), Value.Varchar(entity.origin.name)), + Column("schema_format", Type.Varchar(16), Value.Varchar(entity.origin.format)), + Column("schema_version", Type.Varchar(8), Value.Varchar(entity.origin.version.asString)), + Column("root_id", Type.Uuid, Value.Uuid(eventId)), + Column("root_tstamp", Type.Timestamp, Value.Timestamp(tstamp)), + ) + + entity.copy(columns = metaColumns ++ entity.columns) + } + + /** Remove all properties which are roots for other properties, + * Otherwise table will have structure of [nested, nested.a, nested.b], + * where we need just [nested.a, nested.b] + */ + def removeRoots(props: Properties): Properties = { + val pointers = props.map(_._1).toSet + props.filterNot { case (pointer, _) => + pointer.value.isEmpty || { + val problem = pointers.exists { p => pointer.isParentOf(p) && p != pointer } + problem + } + } + } + + /** Transform JSON into [[Entity]] */ + def shredJson[F[_]: Sync: Clock](client: Client[F, Json]) + (data: SelfDescribingData[Json]): EitherT[F, NonEmptyList[FailureDetails.LoaderIgluError], Entity] = { + val key = data.schema + schema.getOrdered(client.resolver)(key.vendor, key.name, key.version.model) + .leftMap { error => NonEmptyList.of(error) } + .subflatMap { properties => + val shredded = getNameTypeVal(properties)(data.data) + .parTraverse { case (columnName, pgType, value) => + cast(value, pgType).toEitherNel.map { value => + value.map { v => Entity.Column(columnName, pgType, v) } + } + } + + shredded + .leftMap { errors => errors.map { error => + FailureDetails.LoaderIgluError.WrongType(data.schema, Json.Null, error) // TODO + } } + .map { cols => + val columns = cols.collect { case Some(c) => c } + val tableName = data.schema match { + case Atomic => "events" + case other => StringUtils.getTableName(SchemaMap(other)) + } + Entity(tableName, data.schema, columns) + } + } + } + + /** Transform only canonical part of `Event` (128 non-JSON fields) into `ShreddedEntity` */ + def shredAtomic(lengths: Map[String, Int])(event: Event): EitherNel[FailureDetails.LoaderIgluError, Entity] = { + def tranformDate(col: String)(s: String): Either[FailureDetails.LoaderIgluError, Entity.Column] = + Either + .catchOnly[DateTimeParseException](Instant.parse(s)) + .map { parsed => Entity.Column(col, Type.Timestamp, Value.Timestamp(parsed)) } + .leftMap { _ => FailureDetails.LoaderIgluError.WrongType(Atomic, Json.fromString(s), "date-time") } + + def transformUuid(col: String)(s: String): Either[FailureDetails.LoaderIgluError, Entity.Column] = + Either.catchOnly[IllegalArgumentException](UUID.fromString(s)) + .map { parsed => Entity.Column(col, Type.Uuid, Value.Uuid(parsed)) } + .leftMap { _ => FailureDetails.LoaderIgluError.WrongType(Atomic, Json.fromString(s), "uuid") } + + def transformBool(col: String)(b: Boolean): Entity.Column = + if (b) Entity.Column(col, Type.Bool, Value.Bool(true)) + else Entity.Column(col, Type.Bool, Value.Bool(false)) + + def truncate(col: String)(value: String): Entity.Column = + lengths.get(col) match { + case Some(len) => + Entity.Column(col, Type.Varchar(len), Value.Varchar(value.take(len))) + case None => + Entity.Column(col, Type.Varchar(1024), Value.Varchar(value.take(1024))) + } + + def transformNumber(col: String)(num: JsonNumber): Entity.Column = + num.toInt match { + case Some(int) => Entity.Column(col, Type.Integer, Value.Integer(int)) + case None => Entity.Column(col, Type.Double, Value.Double(num.toDouble)) + } + + def castError(expected: String)(value: Json) = + FailureDetails.LoaderIgluError.WrongType(Atomic, value, expected).asLeft[Option[Entity.Column]] + + val data = event.ordered.parTraverse { + case ("contexts" | "derived_contexts" | "unstruct_event", _) => + none.asRight.toEitherNel + case (key @ ("event_id" | "domain_sessionid"), Some(value)) => + val error = castError("uuid") _ + value.fold( + none.asRight.toEitherNel, + b => error(Json.fromBoolean(b)).toEitherNel, + n => error(Json.fromJsonNumber(n)).toEitherNel, + s => transformUuid(key)(s).map(_.some).toEitherNel, + a => error(Json.arr(a: _*)).toEitherNel, + o => error(Json.fromJsonObject(o)).toEitherNel + ) + case (key, Some(value)) if key.endsWith("_tstamp") => + val error = castError("date-time") _ + value.fold( + none.asRight.toEitherNel, + b => error(Json.fromBoolean(b)).toEitherNel, + n => error(Json.fromJsonNumber(n)).toEitherNel, + s => tranformDate(key)(s).map(_.some).toEitherNel, + a => error(Json.arr(a: _*)).toEitherNel, + o => error(Json.fromJsonObject(o)).toEitherNel + ) + case (key, Some(value)) => + value.fold( + none.asRight.toEitherNel, + b => transformBool(key)(b).some.asRight.toEitherNel, + n => transformNumber(key)(n).some.asRight.toEitherNel, + s => truncate(key)(s).some.asRight.toEitherNel, + _ => none.asRight.toEitherNel, + _ => none.asRight.toEitherNel + ) + case (_, None) => none.asRight.toEitherNel + } + data.map(_.unite).map { columns => Entity("events", Atomic, columns) } + } + + def cast(json: Option[Json], dataType: Type): Either[String, Option[Value]] = { + val error = s"Invalid type ${dataType.ddl} for value $json".asLeft[Option[Value]] + json match { + case Some(j) => + dataType match { + case Type.Uuid => + j.asString match { + case Some(s) => Value.Uuid(UUID.fromString(s)).some.asRight // TODO + case None => error + } + case Type.Varchar(_) => + val result = j.asString match { + case Some(s) => s + case None => j.noSpaces + } + Value.Varchar(result).some.asRight[String] + case Type.Bool => + j.asBoolean match { + case Some(b) => Value.Bool(b).some.asRight + case None => error + } + case Type.Char(len) => + j.asString match { + case Some(s) if s.length === len => Value.Char(s).some.asRight + case Some(_) => error + case None => error + } + case Type.Integer => + j.asNumber.flatMap(_.toInt) match { + case Some(int) => Value.Integer(int).some.asRight + case None => error + } + case Type.BigInt => + j.asNumber.flatMap(_.toLong) match { + case Some(long) => Value.BigInt(long).some.asRight + case None => error + } + case Type.Double => + j.asNumber.map(_.toDouble) match { + case Some(int) => Value.Double(int).some.asRight + case None => error + } + case Type.Jsonb => + Value.Jsonb(j).some.asRight + case Type.Date => + error // TODO + case Type.Timestamp => + j.asString match { + case Some(s) => + Either.catchOnly[DateTimeParseException](Instant.parse(s)).leftMap(_.getMessage).map { instant => + Value.Timestamp(Timestamp.from(instant)).some + } + case None => error + } + } + case None => none.asRight + } + } + + def getPath(pointer: Pointer.JsonPointer, json: Json): Option[Json] = { + def go(cursor: List[Pointer.Cursor], data: ACursor): Option[Json] = + cursor match { + case Nil => + data.focus + case Pointer.Cursor.DownField(field) :: t => + go(t, data.downField(field)) + case Pointer.Cursor.At(i) :: t => + go(t, data.downN(i)) + case Pointer.Cursor.DownProperty(_) :: _ => + throw new IllegalStateException(s"Iglu Schema DDL tried to use invalid pointer ${pointer.show} for payload ${json.noSpaces}") + } + + go(pointer.get, json.hcursor) + } + + /** + * Transform Schema properties into information that can be transformed into DDL columns + * It's very important to implement it and [[getNameTypeVal]] using same logic as + * former is an implementation for DDL, while latter is implementation for data shredding + * @return list of JSON Pointer, column name, inferred DB type, nullability + */ + def getNameType(properties: Properties): List[(SchemaPointer, String, Type, Boolean)] = + removeRoots(properties).map { case (pointer, s: Schema) => + val columnName: String = FlatSchema.getName(pointer) + val pgType = Type.getDataType(s, 4096, columnName, Type.dataTypeSuggestions) + (pointer, columnName, pgType, schema.canBeNull(s)) + } + + /** + * Extract JSON Paths from an actual JSON data + * It's very important to implement [[getNameType]] and this function using same logic as + * former is an implementation for DDL, while latter is implementation for data shredding + * @return list column name, inferred DB type, value + */ + def getNameTypeVal(properties: Properties)(data: Json) = + getNameType(properties).map { case (pointer, columnName, dataType, _) => + val value = getPath(pointer.forData, data) + (columnName, dataType, value) + } + + private def buildBadRow(event: Event)(errors: NonEmptyList[FailureDetails.LoaderIgluError]) = + BadRow.LoaderIgluError(Cli.processor, Failure.LoaderIgluErrors(errors), Payload.LoaderPayload(event)) + +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/CommentIssue.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/CommentIssue.scala new file mode 100644 index 0000000..14cd3ec --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/CommentIssue.scala @@ -0,0 +1,37 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.storage + +import cats.{Eq, Show} + +import com.snowplowanalytics.iglu.core.ParseError + +/** Error with table comment, preventing from [[PgState]] initialisation */ +sealed trait CommentIssue extends Product with Serializable + +object CommentIssue { + /** Table missing a comment */ + case class Missing(table: String) extends CommentIssue + /** Comment is not an Iglu URI */ + case class Invalid(table: String, comment: String, error: ParseError) extends CommentIssue + + implicit val commentIssueShow: Show[CommentIssue] = Show.show { + case Missing(table) => + s"Iglu comment is missing in table $table; The table will be ignored" + case Invalid(table, comment, error) => + s"Comment on table $table ($comment) is not valid Iglu URI (${error.code})" + } + + implicit val commentIssueEq: Eq[CommentIssue] = Eq.fromUniversalEquals[CommentIssue] +} + diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/PgState.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/PgState.scala new file mode 100644 index 0000000..e0ab3dc --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/PgState.scala @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.storage + +import cats.data.EitherT + +import cats.implicits._ +import cats.effect.concurrent.Ref +import cats.effect.{Sync, Clock} + +import doobie.implicits._ +import doobie.util.log.LogHandler +import doobie.util.transactor.Transactor + +import com.snowplowanalytics.iglu.core.SchemaKey + +import com.snowplowanalytics.iglu.client.resolver.Resolver + +import com.snowplowanalytics.iglu.schemaddl.ModelGroup +import com.snowplowanalytics.iglu.schemaddl.migrations.SchemaList + +import com.snowplowanalytics.snowplow.badrows.FailureDetails.LoaderIgluError +import com.snowplowanalytics.snowplow.postgres.loader._ +import com.snowplowanalytics.snowplow.postgres.loader.storage.PgState.TableState +import com.snowplowanalytics.snowplow.postgres.loader.shredding.transform.Atomic +import com.snowplowanalytics.snowplow.postgres.loader.shredding.schema + +/** + * State of the DB schema, where every `ModelGroup` (read "table") + * is associated with list of schemas. Each of these schemas is reflected + * in the structure of the table. If `SchemaKey` matches the `ModelGroup`, + * but not associated with it - the table is outdated. After table has been + * migrated to reflect the newest schema - state need to be updated up to + * that schema + */ +case class PgState(tables: Map[ModelGroup, SchemaList]) { + /** + * Check if `SchemaKey` is known to the state + * @param entity `SchemaKey` taken from table comment + * @return one of three possible tables states + */ + private[loader] def check(entity: SchemaKey): TableState = { + val group = (entity.vendor, entity.name, entity.version.model) + + group match { + case (Atomic.vendor, Atomic.name, Atomic.version.model) => + TableState.Match + case _ => tables.get(group) match { + case Some(SchemaList.Full(schemas)) => + if (schemas.toList.map(_.self.schemaKey).contains(entity)) TableState.Match else TableState.Outdated + case Some(SchemaList.Single(schema)) => + if (schema.self.schemaKey === entity) TableState.Match else TableState.Outdated + case None => + TableState.Missing + } + } + } + + /** Add a whole `SchemaList` to the state (replace if it exists) */ + def put(list: SchemaList): PgState = { + val entity = list.latest.schemaKey + val modelGroup = (entity.vendor, entity.name, entity.version.model) + PgState(tables ++ Map(modelGroup -> list)) + } +} + +object PgState { + /** + * Initialize internal mutable state by traversing all table comments to get their latest version + * For every schema URI, the whole list will be fetched to keep ordering consistent + * All newer versions (present on registry, but not reflected on table) will be dropped + * + * @param xa DB transactor + * @param logger doobie logger + * @param resolver Iglu Resolver attached to Iglu Server + * @param pgSchema database schema + * @return a list of potential schema issues (not fatal errors, to be logged) and + * an actual mutable reference with the state + */ + def init[F[_]: Sync: Clock](xa: Transactor[F], logger: LogHandler, resolver: Resolver[F], pgSchema: String) = + EitherT.liftF(query.getComments(pgSchema, logger).transact(xa)).flatMap { comments => + val initial = PgState(Map.empty) + val (issues, keys) = comments.separate + val availableSchemas = keys.traverse { key => + EitherT(resolver.listSchemas(key.vendor, key.name, key.version.model)) + .leftMap { resolutionError => LoaderIgluError.IgluError(key, resolutionError) } + .flatMap { schemaKeyList => SchemaList.fromSchemaList(schemaKeyList, schema.fetch(resolver)) } + .map { list => list.until(key) match { + case Some(updatedList) => updatedList + case None => throw new IllegalStateException(s"SchemaList $list doesn't match vendor of ${key.toSchemaUri}") + } } + } + availableSchemas + .map { list => list.foldLeft(initial) { (acc, cur) => acc.put(cur) } } + .flatMap { state => EitherT.liftF[F, LoaderIgluError, Ref[F, PgState]](Ref.of[F, PgState](state)) } + .map { state => (issues.filterNot { issue => issue === CommentIssue.Missing("events") }, state) } + } + + private[loader] sealed trait TableState extends Product with Serializable + private[loader] object TableState { + case object Match extends TableState + case object Outdated extends TableState + case object Missing extends TableState + } +} + diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/ddl.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/ddl.scala new file mode 100644 index 0000000..6cf6c2a --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/ddl.scala @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.storage + +import cats.data.EitherT +import cats.implicits._ + +import cats.effect.{Sync, Clock} +import cats.effect.concurrent.Ref + +import doobie.{ConnectionIO, LogHandler} +import doobie.implicits._ +import doobie.util.fragment.Fragment + +import com.snowplowanalytics.iglu.core.SchemaCriterion + +import com.snowplowanalytics.iglu.client.Resolver + +import com.snowplowanalytics.iglu.schemaddl.migrations.{SchemaList => DdlSchemaList} + +import com.snowplowanalytics.snowplow.badrows.FailureDetails + +import com.snowplowanalytics.snowplow.postgres.loader.shredding.Entity +import com.snowplowanalytics.snowplow.postgres.loader.shredding.schema.fetch +import com.snowplowanalytics.snowplow.postgres.loader.streaming.IgluErrors +import com.snowplowanalytics.snowplow.postgres.loader.streaming.sink.Insert + + +object ddl { + + /** Function that can produce DDL, based on `DdlSchemaList` */ + type Generator = DdlSchemaList => Fragment + + def createTable[F[_]: Sync: Clock](resolver: Resolver[F], + state: Ref[F, PgState], + logger: LogHandler, + schema: String, + entity: Entity, + meta: Boolean): EitherT[F, IgluErrors, Insert] = { + val generator: Generator = schemaList => sql.createTable(schema, entity, schemaList, meta) + manage(resolver, state, logger, schema, entity, generator) + } + + // TODO: tables need to be updated in transaction, because situation where one node tries to mutate it after its state + // been update are completely predictable + def alterTable[F[_]: Sync: Clock](resolver: Resolver[F], state: Ref[F, PgState], logger: LogHandler, schema: String, entity: Entity): EitherT[F, IgluErrors, Insert] = { + val generator: Generator = schemaList => sql.migrateTable(schema, entity, schemaList) + manage(resolver, state, logger, schema, entity, generator) + } + + def createEventsTable(schema: String, logger: LogHandler): ConnectionIO[Unit] = + definitions.atomicSql(schema).update(logger).run.void + + /** + * Perform some DB management: create or mutate the table according to current + * schema state (where state is all known versions on the iglu registry) + * First, check the current state of the schema on registry and validate it, + * Then, create an actual update action using `generator` and comment on table + * with latest schema from schema list retrieved from the registry + * At last, update internal mutable state. + * + * Note that it doesn't actually perform a DB action (no `Transactor`) + * + * @param resolver Iglu Resolver tied to Iglu Server (it needs schema list endpoint) + * @param state internal mutable state, containing all known schemas + * @param logger doobie logger + * @param schema database schema + * @param entity an actual shredded entity that we manage tables for + * @param generator a function generating SQL from `DdlSchemaList` + * @return an action that is either failure because of Iglu subsystem + * or doobie IO + */ + def manage[F[_]: Sync: Clock](resolver: Resolver[F], + state: Ref[F, PgState], + logger: LogHandler, + schema: String, + entity: Entity, + generator: Generator): EitherT[F, IgluErrors, Insert] = { + val group = (entity.origin.vendor, entity.origin.name, entity.origin.version.model) + val criterion = SchemaCriterion(entity.origin.vendor, entity.origin.name, "jsonschema", entity.origin.version.model) + val (vendor, name, model) = group + + EitherT(resolver.listSchemas(vendor, name, model)) + .leftMap(error => IgluErrors.of(FailureDetails.LoaderIgluError.SchemaListNotFound(criterion, error))) + .flatMap(list => DdlSchemaList.fromSchemaList(list, fetch[F](resolver)).map(l => l -> generator(l)).leftMap(IgluErrors.of)) + .map { case (list, statement) => (list, statement.update(logger).run.void) } + .map { case (list, statement) => (list, statement *> sql.commentTable(logger, schema, entity.tableName, list.latest)) } + .flatMap { case (list, insert) => EitherT.liftF[F, IgluErrors, Unit](state.update(_.put(list))).as(insert) } + } +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/definitions.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/definitions.scala new file mode 100644 index 0000000..5c186eb --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/definitions.scala @@ -0,0 +1,212 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.storage + +import doobie.Fragment +import doobie.implicits._ + +import com.snowplowanalytics.snowplow.postgres.loader.shredding.Type + +object definitions { + + /** Columns prepended to every shredded type table */ + val metaColumns: List[(String, Type, Boolean)] = List( + ("schema_vendor", Type.Varchar(128), true), + ("schema_name", Type.Varchar(128), true), + ("schema_format", Type.Varchar(128), true), + ("schema_version", Type.Varchar(128), true), + ("root_id", Type.Uuid, true), + ("root_tstamp", Type.Timestamp, true), + ) + + val atomicColumns: List[(String, Type, Boolean)] = List( + // App + ("app_id", Type.Varchar(255), false), + ("platform", Type.Varchar(255), false), + // Date/time + ("etl_tstamp", Type.Timestamp, false), + ("collector_tstamp", Type.Timestamp, true), + ("dvce_created_tstamp", Type.Timestamp, false), + // Date/time + ("event", Type.Varchar(128), false), + ("event_id", Type.Uuid, true), + ("txn_id", Type.Integer, false), + // Versioning + ("name_tracker", Type.Varchar(128), false), + ("v_tracker", Type.Varchar(100), false), + ("v_collector", Type.Varchar(100), true), + ("v_etl", Type.Varchar(100), true), + // User and visit + ("user_id", Type.Varchar(255), false), + ("user_ipaddress", Type.Varchar(45), false), + ("user_fingerprint", Type.Varchar(50), false), + ("domain_userid", Type.Varchar(36), false), + ("domain_sessionidx", Type.Integer, false), + ("network_userid", Type.Varchar(38), false), + // Location + ("geo_country", Type.Char(2), false), + ("geo_region", Type.Char(3), false), + ("geo_city", Type.Varchar(75), false), + ("geo_zipcode", Type.Varchar(15), false), + ("geo_latitude", Type.Double, false), + ("geo_longitude", Type.Double, false), + ("geo_region_name", Type.Varchar(100), false), + // IP lookups + ("ip_isp", Type.Varchar(100), false), + ("ip_organization", Type.Varchar(100), false), + ("ip_domain", Type.Varchar(100), false), + ("ip_netspeed", Type.Varchar(100), false), + // Page + ("page_url", Type.Varchar(4096), false), + ("page_title", Type.Varchar(2000), false), + ("page_referrer", Type.Varchar(4096), false), + // Page URL components + ("page_urlscheme", Type.Varchar(16), false), + ("page_urlhost", Type.Varchar(255), false), + ("page_urlport", Type.Integer, false), + ("page_urlpath", Type.Varchar(3000), false), + ("page_urlquery", Type.Varchar(6000), false), + ("page_urlfragment", Type.Varchar(3000), false), + // Referrer URL components + ("refr_urlscheme", Type.Varchar(16), false), + ("refr_urlhost", Type.Varchar(255), false), + ("refr_urlport", Type.Integer, false), + ("refr_urlpath", Type.Varchar(6000), false), + ("refr_urlquery", Type.Varchar(6000), false), + ("refr_urlfragment", Type.Varchar(3000), false), + // Referrer details + ("refr_medium", Type.Varchar(25), false), + ("refr_source", Type.Varchar(50), false), + ("refr_term", Type.Varchar(255), false), + // Marketing + ("mkt_medium", Type.Varchar(255), false), + ("mkt_source", Type.Varchar(255), false), + ("mkt_term", Type.Varchar(255), false), + ("mkt_content", Type.Varchar(500), false), + ("mkt_campaign", Type.Varchar(255), false), + // Custom structured event + ("se_category", Type.Varchar(1000), false), + ("se_action", Type.Varchar(1000), false), + ("se_label", Type.Varchar(1000), false), + ("se_property", Type.Varchar(1000), false), + ("se_value", Type.Double, false), + // Ecommerce + ("tr_orderid", Type.Varchar(255), false), + ("tr_affiliation", Type.Varchar(255), false), + ("tr_total", Type.Double, false), + ("tr_tax", Type.Double, false), + ("tr_shipping", Type.Double, false), + ("tr_city", Type.Varchar(255), false), + ("tr_state", Type.Varchar(255), false), + ("tr_country", Type.Varchar(255), false), + ("ti_orderid", Type.Varchar(255), false), + ("ti_sku", Type.Varchar(255), false), + ("ti_name", Type.Varchar(255), false), + ("ti_category", Type.Varchar(255), false), + ("ti_price", Type.Double, false), + ("ti_quantity", Type.Integer, false), + // Page ping + ("pp_xoffset_min", Type.Integer, false), + ("pp_xoffset_max", Type.Integer, false), + ("pp_yoffset_min", Type.Integer, false), + ("pp_yoffset_max", Type.Integer, false), + // User Agent + ("useragent", Type.Varchar(1000), false), + // Browser + ("br_name", Type.Varchar(50), false), + ("br_family", Type.Varchar(50), false), + ("br_version", Type.Varchar(50), false), + ("br_type", Type.Varchar(50), false), + ("br_renderengine", Type.Varchar(50), false), + ("br_lang", Type.Varchar(255), false), + ("br_features_pdf", Type.Bool, false), + ("br_features_flash", Type.Bool, false), + ("br_features_java", Type.Bool, false), + ("br_features_director", Type.Bool, false), + ("br_features_quicktime", Type.Bool, false), + ("br_features_realplayer", Type.Bool, false), + ("br_features_windowsmedia", Type.Bool, false), + ("br_features_gears", Type.Bool, false), + ("br_features_silverlight", Type.Bool, false), + ("br_cookies", Type.Bool, false), + ("br_colordepth", Type.Varchar(12), false), + ("br_viewwidth", Type.Integer, false), + ("br_viewheight", Type.Integer, false), + // Operating System + ("os_name", Type.Varchar(50), false), + ("os_family", Type.Varchar(50), false), + ("os_manufacturer", Type.Varchar(50), false), + ("os_timezone", Type.Varchar(50), false), + // Device/Hardware + ("dvce_type", Type.Varchar(50), false), + ("dvce_ismobile", Type.Bool, false), + ("dvce_screenwidth", Type.Integer, false), + ("dvce_screenheight", Type.Integer, false), + // Document + ("doc_charset", Type.Varchar(128), false), + ("doc_width", Type.Integer, false), + ("doc_height", Type.Integer, false), + // Currency + ("tr_currency", Type.Char(3), false), + ("tr_total_base", Type.Double, false), + ("tr_tax_base", Type.Double, false), + ("tr_shipping_base", Type.Double, false), + ("ti_currency", Type.Char(3), false), + ("ti_price_base", Type.Double, false), + ("base_currency", Type.Char(3), false), + // Geolocation + ("geo_timezone", Type.Varchar(64), false), + // Click ID + ("mkt_clickid", Type.Varchar(128), false), + ("mkt_network", Type.Varchar(64), false), + // ETL tags + ("etl_tags", Type.Varchar(500), false), + // Time event was sent + ("dvce_sent_tstamp", Type.Timestamp, false), + // Referer + ("refr_domain_userid", Type.Varchar(36), false), + ("refr_dvce_tstamp", Type.Timestamp, false), + // Session ID + ("domain_sessionid", Type.Uuid, false), + // Derived Type.Timestamp + ("derived_tstamp", Type.Timestamp, false), + // Event schema + ("event_vendor", Type.Varchar(1000), false), + ("event_name", Type.Varchar(1000), false), + ("event_format", Type.Varchar(128), false), + ("event_version", Type.Varchar(128), false), + // Event fingerprint + ("event_fingerprint", Type.Varchar(128), false), + // True Type.Timestamp + ("true_tstamp", Type.Timestamp, false) + ) + + def atomicSql(schema: String) = { + val columns = atomicColumns.map { + case (n, t, true) => Fragment.const(s"$n ${t.ddl} NOT NULL") + case (n, t, false) => Fragment.const(s"$n ${t.ddl}") + }.foldLeft(Fragment.empty) { (acc, cur) => + val separator = if (acc == Fragment.empty) Fragment.const("\n") else Fragment.const(",\n") + acc ++ separator ++ cur + } + + val schemaFr = Fragment.const0(schema) + + fr"""CREATE TABLE $schemaFr.events ($columns) WITH (OIDS=FALSE)""" + } + + def columnToString(columnName: String, dataType: Type, nullable: Boolean) = { + val notNull = if (nullable) "NULL" else "NOT NULL" + s""""$columnName" ${dataType.ddl} $notNull""" + } +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/query.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/query.scala new file mode 100644 index 0000000..856fb45 --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/query.scala @@ -0,0 +1,56 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.storage + +import cats.syntax.traverse._ +import cats.syntax.either._ +import cats.instances.list._ + +import doobie.ConnectionIO +import doobie.implicits._ +import doobie.util.log.LogHandler + +import com.snowplowanalytics.iglu.core.SchemaKey + +/** Functions to query the storage for state and metadata */ +object query { + + def tableExists(schema: String, name: String, logger: LogHandler): ConnectionIO[Boolean] = + fr"SELECT EXISTS (SELECT FROM information_schema.tables WHERE table_name = $name AND table_schema = $schema);" + .queryWithLogHandler[Boolean](logger) + .unique + + def listTables(schema: String): ConnectionIO[List[String]] = + fr"SELECT tablename FROM pg_catalog.pg_tables WHERE schemaname = $schema".query[String].to[List] + + def getComment(schema: String, logger: LogHandler)(tableName: String): ConnectionIO[Either[CommentIssue, SchemaKey]] = + (fr"""SELECT obj_description(oid) FROM pg_class WHERE relkind = 'r' AND relnamespace = ( + SELECT oid + FROM pg_catalog.pg_namespace + WHERE nspname = $schema + ) AND relname = $tableName""") + .queryWithLogHandler[Option[String]](logger) // It can be NULL, thus query[String].option will fail + .unique + .map { + case Some(comment) => + SchemaKey.fromUri(comment) match { + case Right(key) => key.asRight + case Left(error) => CommentIssue.Invalid(tableName, comment, error).asLeft + } + case None => + CommentIssue.Missing(tableName).asLeft + } + + def getComments(schema: String, logger: LogHandler): ConnectionIO[List[Either[CommentIssue, SchemaKey]]] = + listTables(schema).flatMap(_.traverse(getComment(schema, logger))) +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/sql.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/sql.scala new file mode 100644 index 0000000..009332c --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/sql.scala @@ -0,0 +1,121 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.storage + +import cats.syntax.functor._ + +import doobie.Fragment +import doobie.free.connection.ConnectionIO +import doobie.implicits._ +import doobie.util.log.LogHandler + +import com.snowplowanalytics.iglu.core.SchemaMap + +import com.snowplowanalytics.iglu.schemaddl.StringUtils.getTableName +import com.snowplowanalytics.iglu.schemaddl.jsonschema.{Pointer, Schema} +import com.snowplowanalytics.iglu.schemaddl.migrations.{FlatSchema, Migration, SchemaList} + +import com.snowplowanalytics.snowplow.postgres.loader.shredding +import com.snowplowanalytics.snowplow.postgres.loader.shredding.{Type, Entity, transform} +import com.snowplowanalytics.snowplow.postgres.loader.storage.definitions.metaColumns + + +object sql { + + val DefaultVarcharSize = 4096 + + /** + * Generate the `CREATE TABLE` DDL statement + * @param schema database schema + * @param entity shredded entity + * @param schemaList state of the + * @param meta whether meta columns should be prepended + * @return pure SQL expression with `CREATE TABLE` statement + */ + def createTable(schema: String, entity: shredding.Entity, schemaList: SchemaList, meta: Boolean): Fragment = { + val subschemas = FlatSchema.extractProperties(schemaList) + + // Columns derived from schema (no metadata) + val entityColumns = transform.getNameType(subschemas).map { + case (_, columnName, dataType, nullability) => + definitions.columnToString(columnName, dataType, nullability) + } + + val columns = (if (meta) metaColumns.map((definitions.columnToString _).tupled) else Nil) ++ entityColumns + val table = s"$schema.${entity.tableName}" + + Fragment.const(s"CREATE TABLE $table (\n${columns.mkString(",\n")}\n)") + } + + def commentTable(logger: LogHandler, schema: String, tableName: String, schemaKey: SchemaMap): ConnectionIO[Unit] = { + val uri = schemaKey.schemaKey.toSchemaUri + val table = s"$schema.$tableName" + Fragment.const(s"COMMENT ON TABLE $table IS '$uri'") + .update(logger) + .run + .void + } + + + def migrateTable(schema: String, entity: Entity, schemaList: SchemaList) = + schemaList match { + case s: SchemaList.Full => + val migrationList = s.extractSegments.map(Migration.fromSegment) + migrationList.find(_.from == entity.origin.version) match { + case Some(migration) => + val schemaMap = SchemaMap(migration.vendor, migration.name, "jsonschema", migration.to) + val tableName = getTableName(schemaMap) // e.g. com_acme_event_1 + val tableNameFull = s"$schema.$tableName" + + if (migration.diff.added.nonEmpty) { + val columns = migration.diff.added.map { + case (pointer, schema) => + buildColumn(DefaultVarcharSize, (pointer, schema)) + } + + val columnFragments = columns.foldLeft(Fragment.empty) { (acc, cur) => + val separator = if (acc == Fragment.empty) Fragment.const("\n") else Fragment.const(",\n") + acc ++ separator ++ cur.toFragment + } + + Fragment.const0(s"""ALTER TABLE $tableNameFull $columnFragments""") + } else Fragment.empty + case None => + Fragment.empty // TODO: This should be a warning + } + case _: SchemaList.Single => + Fragment.empty // TODO: This should be a warning + } + + /** + * Generate single ALTER TABLE statement for some new property + * + * @param varcharSize default size for VARCHAR + * @param pair pair of property name and its Schema properties like + * length, maximum, etc + * @return DDL statement altering single column in table + */ + def buildColumn(varcharSize: Int, pair: (Pointer.SchemaPointer, Schema)): Column = + pair match { + case (pointer, properties) => + val columnName = FlatSchema.getName(pointer) + val dataType = Type.getDataType(properties, varcharSize, columnName, Type.dataTypeSuggestions) + Column(columnName, dataType, shredding.schema.canBeNull(properties)) + } + + case class Column(name: String, dataType: Type, nullable: Boolean) { + /** "column_name VARCHAR(128) NOT NULL" */ + def toFragment: Fragment = + Fragment.const0(s"$name ${dataType.ddl} ${if (nullable) "NULL" else "NOT NULL"}") + } +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/utils.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/utils.scala new file mode 100644 index 0000000..22b4ded --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/utils.scala @@ -0,0 +1,40 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.storage + +import cats.Monad +import cats.implicits._ + +import cats.effect.Sync + +import doobie.ConnectionIO +import doobie.implicits._ +import doobie.util.transactor.Transactor +import doobie.util.log.LogHandler + +import com.snowplowanalytics.snowplow.postgres.loader.storage.query.tableExists + +object utils { + + def prepareEventsTable(schema: String, logger: LogHandler): ConnectionIO[Boolean] = { + val create = ddl.createEventsTable(schema, logger).as(false) + val exists = Monad[ConnectionIO].pure(true) + Monad[ConnectionIO].ifM(tableExists(schema, "events", logger))(exists, create) + } + + def prepare[F[_]: Sync](schema: String, xa: Transactor[F], logger: LogHandler): F[Unit] = + prepareEventsTable(schema, logger).transact(xa).flatMap { + case true => Sync[F].delay(println(s"$schema.events table already exists")) + case false => Sync[F].delay(println(s"$schema.events table created")) + } +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/streaming/package.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/streaming/package.scala new file mode 100644 index 0000000..2bfc198 --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/streaming/package.scala @@ -0,0 +1,27 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader + +import cats.data.NonEmptyList + +import com.snowplowanalytics.snowplow.badrows.FailureDetails + +package object streaming { + + type IgluErrors = NonEmptyList[FailureDetails.LoaderIgluError] + + object IgluErrors { + def of(error: FailureDetails.LoaderIgluError): NonEmptyList[FailureDetails.LoaderIgluError] = + NonEmptyList.of(error) + } +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/streaming/sink.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/streaming/sink.scala new file mode 100644 index 0000000..74f1805 --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/streaming/sink.scala @@ -0,0 +1,124 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.streaming + +import cats.Monad +import cats.data.EitherT +import cats.implicits._ + +import cats.effect.{Sync, Clock} +import cats.effect.concurrent.Ref + +import fs2.Pipe + +import doobie._ +import doobie.implicits._ +import doobie.util.transactor.Transactor + +import io.circe.Json + +import com.snowplowanalytics.iglu.core.circe.implicits._ + +import com.snowplowanalytics.iglu.client.{Resolver, Client} + +import com.snowplowanalytics.snowplow.badrows.{BadRow, Failure, Payload} +import com.snowplowanalytics.snowplow.postgres.loader.config.Cli.processor +import com.snowplowanalytics.snowplow.postgres.loader.storage.{PgState, ddl} +import com.snowplowanalytics.snowplow.postgres.loader.shredding.{Entity, transform} +import com.snowplowanalytics.snowplow.postgres.loader.streaming.source.{Data, BadData} + +object sink { + + type Insert = ConnectionIO[Unit] + + def insertStatement(logger: LogHandler, schema: String, row: Entity): Insert = { + val length = row.columns.length + + val columns = Fragment.const0(row.columns.map(c => s"""\"${c.name}\"""").mkString(",")) + + val table = Fragment.const0(s"$schema.${row.tableName}") + val values = row.columns.zipWithIndex.foldLeft(fr0"") { + case (acc, (cur, i)) if i < length - 1 => acc ++ cur.value.fragment ++ fr0"," + case (acc, (cur, _)) => acc ++ cur.value.fragment + } + + fr"""INSERT INTO $table ($columns) VALUES ($values)""".update(logger).run.void + } + + def badSink[F[_]: Sync]: Pipe[F, BadData, Unit] = + _.evalMap { + case BadData.BadEnriched(row) => Sync[F].delay(println(row.compact)) + case BadData.BadJson(payload, error) => Sync[F].delay(println(s"Cannot parse $payload. $error")) + } + + def goodSink[F[_]: Sync: Clock](xa: Transactor[F], + logger: LogHandler, + schema: String, + state: Ref[F, PgState], + client: Client[F, Json]): Pipe[F, Data, Unit] = + _.evalMap { payload => + val addMeta = payload.snowplow + val result = for { + entities <- payload match { + case Data.Snowplow(event) => transform.shredEvent[F](client, event).leftMap(bad => BadData.BadEnriched(bad)) + case Data.SelfDescribing(json) => transform.shredJson(client)(json).map(entity => List(entity)).leftMap(errors => BadData.BadJson(json.normalize.noSpaces, errors.toString)) + } + insert <- insertData(client.resolver, logger, schema, state, entities, addMeta).leftMap { errors => + payload match { + case Data.Snowplow(event) => + val badRow = BadRow.LoaderIgluError(processor, Failure.LoaderIgluErrors(errors), Payload.LoaderPayload(event)) + BadData.BadEnriched(badRow) + case Data.SelfDescribing(json) => + BadData.BadJson(json.normalize.noSpaces, s"Cannot insert: $errors") + + } + } + } yield insert + + result.value.flatMap { + case Right(insert) => insert.transact[F](xa) + case Left(badRow) => Sync[F].delay(println(badRow)) + } + } + + /** + * Prepare tables for incoming data if necessary and insert the data + * Tables will be updated/created if info is missing in `state` + * @param resolver resolver to fetch missing schemas + * @param state current state of the Postgres schema + * @param event all shredded enitites from a single event, ready to be inserted + */ + def insertData[F[_]: Sync: Clock](resolver: Resolver[F], + logger: LogHandler, + schema: String, + state: Ref[F, PgState], + event: List[Entity], + meta: Boolean): EitherT[F, IgluErrors, Insert] = { + val inserts = event.parTraverse { entity => + val tableMutation = EitherT.liftF[F, IgluErrors, PgState](state.get).flatMap { pgState => + pgState.check(entity.origin) match { + case PgState.TableState.Match => + EitherT.rightT[F, IgluErrors](Monad[ConnectionIO].unit) + case PgState.TableState.Missing => + ddl.createTable[F](resolver, state, logger, schema, entity, meta) + case PgState.TableState.Outdated => + ddl.alterTable[F](resolver, state, logger, schema, entity) + } + } + + tableMutation.map(mut => mut *> insertStatement(logger, schema, entity)) + } + + inserts.map(_.sequence_) + } +} diff --git a/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/streaming/source.scala b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/streaming/source.scala new file mode 100644 index 0000000..e4a769d --- /dev/null +++ b/src/main/scala/com/snowplowanalytics/snowplow/postgres/loader/streaming/source.scala @@ -0,0 +1,151 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.streaming + +import java.util.Base64 +import java.nio.charset.StandardCharsets + +import cats.implicits._ + +import cats.effect.{ContextShift, ConcurrentEffect, Blocker, Sync} + +import fs2.Stream +import fs2.aws.kinesis.{CommittableRecord, KinesisConsumerSettings} +import fs2.aws.kinesis.consumer.readFromKinesisStream + +import com.permutive.pubsub.consumer.grpc.{PubsubGoogleConsumer, PubsubGoogleConsumerConfig} +import io.circe.Json +import io.circe.parser.{parse => parseCirce} + +import com.snowplowanalytics.iglu.core.SelfDescribingData +import com.snowplowanalytics.iglu.core.circe.implicits._ + +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event +import com.snowplowanalytics.snowplow.analytics.scalasdk.ParsingError.NotTSV +import com.snowplowanalytics.snowplow.badrows.{BadRow, Payload} +import com.snowplowanalytics.snowplow.postgres.loader.config.{LoaderConfig, Cli} +import com.snowplowanalytics.snowplow.postgres.loader.config.LoaderConfig.Purpose + +import com.google.pubsub.v1.PubsubMessage +import com.permutive.pubsub.consumer.Model.{Subscription, ProjectId} +import com.permutive.pubsub.consumer.decoder.MessageDecoder + +object source { + + /** + * Acquire a stream of parsed payloads + * + * @param blocker thread pool for pulling events (used only in PubSub) + * @param purpose kind of data, enriched or plain JSONs + * @param config source configuration + * @return either error or stream of parsed payloads + */ + def getSource[F[_]: ConcurrentEffect: ContextShift](blocker: Blocker, + purpose: LoaderConfig.Purpose, + config: LoaderConfig.Source) = + config match { + case LoaderConfig.Source.Kinesis(appName, streamName, region, position) => + KinesisConsumerSettings.apply(streamName, appName, region, initialPositionInStream = position.unwrap) match { + case Right(settings) => + source.readData[F](settings, purpose).asRight + case Left(error) => + error.asLeft + } + case LoaderConfig.Source.PubSub(projectId, subscriptionId) => + implicit val decoder: MessageDecoder[Either[BadData, Data]] = pubsubDataDecoder(purpose) + val project = ProjectId(projectId) + val subscription = Subscription(subscriptionId) + val pubsubConfig = PubsubGoogleConsumerConfig[F](onFailedTerminate = pubsubOnFailedTerminate[F]) + PubsubGoogleConsumer.subscribeAndAck[F, Either[BadData, Data]](blocker, project, subscription, pubsubErrorHandler[F], pubsubConfig).asRight + } + + /** Stream data from AWS Kinesis */ + def readData[F[_]: ConcurrentEffect: ContextShift](settings: KinesisConsumerSettings, kind: Purpose): Stream[F, Either[BadData, Data]] = + readFromKinesisStream[F](settings).map(processRecord(kind)) + + /** + * Parse Kinesis record into a valid Loader's record, either enriched event or self-describing JSON, + * depending on purpose of the Loader + */ + def processRecord(kind: Purpose)(record: CommittableRecord): Either[BadData, Data] = { + val string = try { + StandardCharsets.UTF_8.decode(record.record.data()).toString.asRight[BadData] + } catch { + case _: IllegalArgumentException => + val payload = StandardCharsets.UTF_8.decode(Base64.getEncoder.encode(record.record.data())).toString + kind match { + case Purpose.Enriched => + val badRow = BadRow.LoaderParsingError(Cli.processor, NotTSV, Payload.RawPayload(payload)) + BadData.BadEnriched(badRow).asLeft + case Purpose.SelfDescribing => + BadData.BadJson(payload, "Cannot deserialize self-describing JSON from Kinesis record").asLeft + } + } + + string.flatMap { payload => + kind match { + case Purpose.Enriched => + parseEventString(payload).map(Data.Snowplow.apply) + case Purpose.SelfDescribing => + parseJson(payload).map(Data.SelfDescribing.apply) + } + } + } + + def parseEventString(s: String): Either[BadData, Event] = + Event.parse(s).toEither.leftMap { error => + val badRow = BadRow.LoaderParsingError(Cli.processor, error, Payload.RawPayload(s)) + BadData.BadEnriched(badRow) + } + + def parseJson(s: String): Either[BadData, SelfDescribingData[Json]] = + parseCirce(s) + .leftMap(_.show) + .flatMap(json => SelfDescribingData.parse[Json](json).leftMap(_.message(json.noSpaces))) + .leftMap(error => BadData.BadJson(s, error)) + + /** Kind of data flowing through the Loader */ + sealed trait Data extends Product with Serializable { + def snowplow: Boolean = this match { + case _: Data.Snowplow => true + case _: Data.SelfDescribing => false + } + } + object Data { + case class Snowplow(data: Event) extends Data + case class SelfDescribing(data: SelfDescribingData[Json]) extends Data + } + + sealed trait BadData extends Throwable with Product with Serializable + object BadData { + case class BadEnriched(data: BadRow) extends BadData + case class BadJson(payload: String, error: String) extends BadData + } + + def pubsubDataDecoder(purpose: Purpose): MessageDecoder[Either[BadData, Data]] = + purpose match { + case Purpose.Enriched => + (message: Array[Byte]) => parseEventString(new String(message)).map(Data.Snowplow.apply).asRight + case Purpose.SelfDescribing => + (message: Array[Byte]) => parseJson(new String(message)).map(Data.SelfDescribing.apply).asRight + } + + def pubsubErrorHandler[F[_]: Sync](message: PubsubMessage, error: Throwable, ack: F[Unit], nack: F[Unit]): F[Unit] = { + val _ = error + val _ = nack + Sync[F].delay(println(s"Couldn't handle ${message.getData.toStringUtf8}")) *> ack + } + + def pubsubOnFailedTerminate[F[_]: Sync](error: Throwable): F[Unit] = + Sync[F].delay(println(s"Cannot terminate pubsub consumer properly\n${error.getMessage}")) +} diff --git a/src/test/scala/com/snowplowanalytics/snowplow/postgres/loader/Database.scala b/src/test/scala/com/snowplowanalytics/snowplow/postgres/loader/Database.scala new file mode 100644 index 0000000..78d6a37 --- /dev/null +++ b/src/test/scala/com/snowplowanalytics/snowplow/postgres/loader/Database.scala @@ -0,0 +1,96 @@ +package com.snowplowanalytics.snowplow.postgres.loader + +import java.net.URI +import java.util.UUID + +import cats.data.EitherT + +import com.snowplowanalytics.snowplow.postgres.loader.storage.utils + +import org.specs2.mutable.Specification +import org.specs2.specification.BeforeAfterEach +import cats.implicits._ + +import cats.effect.{IO, Clock, ContextShift} + +import doobie._ +import doobie.implicits._ +import doobie.postgres.implicits._ + +import io.circe.Json + +import com.snowplowanalytics.iglu.client.Client +import com.snowplowanalytics.iglu.client.resolver.Resolver +import com.snowplowanalytics.iglu.client.resolver.registries.Registry +import com.snowplowanalytics.iglu.client.resolver.registries.Registry.{HttpConnection, Config, Http} +import com.snowplowanalytics.iglu.client.validator.CirceValidator + +import com.snowplowanalytics.snowplow.badrows.FailureDetails +import com.snowplowanalytics.snowplow.postgres.loader.config.LoaderConfig.JdbcUri + +trait Database extends Specification with BeforeAfterEach { + import Database._ + + implicit val ioClock: Clock[IO] = Clock.create[IO] + + def before = + (dropAll *> utils.prepare[IO](Schema, xa, logger)).unsafeRunSync() + + def after = + dropAll.unsafeRunSync() + + sequential + +} + +object Database { + + val Schema = "public" + + val logger: LogHandler = LogHandler.nop + implicit val CS: ContextShift[IO] = IO.contextShift(concurrent.ExecutionContext.global) + + val jdbcUri = JdbcUri("localhost", 5432, "snowplow") + val registry = Http(Config("localhost registry", 1, Nil), HttpConnection(URI.create("http://localhost:8080/api/"), None)) + val igluClient = Client[IO, Json](Resolver(List(Registry.IgluCentral, registry), None), CirceValidator) + val xa: Transactor[IO] = resources.getTransactorDefault[IO](jdbcUri, "postgres", "mysecretpassword") + + case class ColumnInfo(columnName: String, + columnDefault: Option[String], + isNullable: Boolean, + dataType: String, + characterMaximumLength: Option[Int]) + + def query: IO[List[UUID]] = + fr"SELECT event_id FROM events".query[UUID].to[List].transact(xa) + + def count(table: String): IO[Int] = + (fr"SELECT count(*) FROM " ++ Fragment.const(table)).query[Int].unique.transact(xa) + + def describeTable(tableName: String) = + sql"""SELECT column_name::VARCHAR, + column_default::VARCHAR, + is_nullable::BOOLEAN, + data_type::VARCHAR, + character_maximum_length::INTEGER + FROM information_schema.columns + WHERE table_name = $tableName""" + .query[(String, Option[String], Boolean, String, Option[Int])] + .map(ColumnInfo.tupled) + .to[List] + .transact(xa) + + def dropAll: IO[Unit] = { + val schemaFr = Fragment.const(Schema) + List( + fr"DROP SCHEMA $schemaFr CASCADE;", + fr"CREATE SCHEMA $schemaFr;", + fr"GRANT ALL ON SCHEMA public TO postgres;", + fr"GRANT ALL ON SCHEMA public TO $schemaFr;" + ).map(_.update.run).traverse_(_.transact(xa).void) + } + + implicit class ActionOps[A](io: IO[A]) { + def action = EitherT.liftF[IO, FailureDetails.LoaderIgluError, A](io) + } +} diff --git a/src/test/scala/com/snowplowanalytics/snowplow/postgres/loader/queryspec.scala b/src/test/scala/com/snowplowanalytics/snowplow/postgres/loader/queryspec.scala new file mode 100644 index 0000000..3a656c2 --- /dev/null +++ b/src/test/scala/com/snowplowanalytics/snowplow/postgres/loader/queryspec.scala @@ -0,0 +1,59 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader + +import doobie.implicits._ + +import com.snowplowanalytics.snowplow.postgres.loader.storage.query + +class queryspec extends Database { + "listTables" should { + "return single events table (after prepare executed)" >> { + val expected = List("events") + val result = query.listTables("public").transact(Database.xa).unsafeRunSync() + + result must beEqualTo(expected) + } + + "return no tables (prepare executed only for 'public')" >> { + val expected = List() + val result = query.listTables("empty").transact(Database.xa).unsafeRunSync() + + result must beEqualTo(expected) + } + } + + "tableExists" should { + "return false if table does not exist" >> { + val expected = false + val result = query.tableExists("empty", "non-existent", Database.logger).transact(Database.xa).unsafeRunSync() + + result must beEqualTo(expected) + } + + "return true if table exists (created by Database.before)" >> { + val expected = true + val result = query.tableExists(Database.Schema, "events", Database.logger).transact(Database.xa).unsafeRunSync() + + result must beEqualTo(expected) + } + } + + "getComments" should { + "not fail if schema does not exist" >> { + val expected = List() + val result = query.getComments("empty", Database.logger).transact(Database.xa).unsafeRunSync() + result must beEqualTo(expected) + } + } +} diff --git a/src/test/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/PgStateSpec.scala b/src/test/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/PgStateSpec.scala new file mode 100644 index 0000000..d9ff965 --- /dev/null +++ b/src/test/scala/com/snowplowanalytics/snowplow/postgres/loader/storage/PgStateSpec.scala @@ -0,0 +1,87 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.storage + +import cats.data.EitherT + +import cats.effect.IO + +import com.snowplowanalytics.iglu.core.{SchemaKey, SelfDescribingSchema, SchemaVer, SchemaMap, SchemaList => CoreSchemaList} + +import com.snowplowanalytics.iglu.schemaddl.IgluSchema +import com.snowplowanalytics.iglu.schemaddl.jsonschema.Schema +import com.snowplowanalytics.iglu.schemaddl.migrations.SchemaList + +import com.snowplowanalytics.snowplow.postgres.loader.Database +import com.snowplowanalytics.snowplow.postgres.loader.storage.PgState.TableState + +class PgStateSpec extends Database { + "init" should { + "initialize an empty state if no tables exist" >> { + val state = PgState.init(Database.xa, Database.logger, Database.igluClient.resolver, "empty") + val result = state + .semiflatMap { case (issues, ref) => ref.get.map { state => (issues, state) } } + .value + .unsafeRunSync() + val expected = (List(), PgState(Map())) + result must beRight(expected) + } + } + + "check" should { + "confirm table exists with a same key as in state" >> { + val key = SchemaKey("com.acme", "event", "jsonschema", SchemaVer.Full(1,0,0)) + val schemaList = PgStateSpec.buildSchemaList(List(key)) + + val init = Map(("com.acme", "event", 1) -> schemaList) + val state = PgState(init) + state.check(key) must beEqualTo(TableState.Match) + } + + "claim table is outdated for 1-0-1 key if only 1-0-0 is known" >> { + val key = SchemaKey("com.acme", "event", "jsonschema", SchemaVer.Full(1,0,0)) + val schemaList = PgStateSpec.buildSchemaList(List(key)) + + val init = Map(("com.acme", "event", 1) -> schemaList) + val state = PgState(init) + state.check(SchemaKey("com.acme", "event", "jsonschema", SchemaVer.Full(1,0,1))) must beEqualTo(TableState.Outdated) + } + + "claim table is missing for bumped model" >> { + val key = SchemaKey("com.acme", "event", "jsonschema", SchemaVer.Full(1,0,0)) + val schemaList = PgStateSpec.buildSchemaList(List(key)) + + val init = Map(("com.acme", "event", 1) -> schemaList) + val state = PgState(init) + state.check(SchemaKey("com.acme", "event", "jsonschema", SchemaVer.Full(2,0,0))) must beEqualTo(TableState.Missing) + } + + "always assume events table exists" >> { + val atomic = SchemaKey("com.snowplowanalytics.snowplow", "atomic", "jsonschema", SchemaVer.Full(1,0,0)) + val state = PgState(Map()) + state.check(atomic) must beEqualTo(TableState.Match) + } + } +} + +object PgStateSpec { + + val fetch: SchemaKey => EitherT[IO, String, IgluSchema] = + key => EitherT.pure[IO, String](SelfDescribingSchema(SchemaMap(key), Schema.empty)) + + /** Bypass the `SchemaList` construction boilerplate */ + def buildSchemaList(keys: List[SchemaKey]): SchemaList = { + val coreSchemaList = CoreSchemaList.parseUnsafe(keys) + SchemaList.fromSchemaList(coreSchemaList, fetch).value.unsafeRunSync().getOrElse(throw new IllegalStateException) + } +} diff --git a/src/test/scala/com/snowplowanalytics/snowplow/postgres/loader/streaming/sinkspec.scala b/src/test/scala/com/snowplowanalytics/snowplow/postgres/loader/streaming/sinkspec.scala new file mode 100644 index 0000000..00c2c10 --- /dev/null +++ b/src/test/scala/com/snowplowanalytics/snowplow/postgres/loader/streaming/sinkspec.scala @@ -0,0 +1,123 @@ +/* + * Copyright (c) 2020 Snowplow Analytics Ltd. All rights reserved. + * + * This program is licensed to you under the Apache License Version 2.0, + * and you may not use this file except in compliance with the Apache License Version 2.0. + * You may obtain a copy of the Apache License Version 2.0 at http://www.apache.org/licenses/LICENSE-2.0. + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the Apache License Version 2.0 is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the Apache License Version 2.0 for the specific language governing permissions and limitations there under. + */ +package com.snowplowanalytics.snowplow.postgres.loader.streaming + +import java.util.UUID + +import cats.effect.concurrent.Ref +import cats.effect.IO + +import fs2.Stream + +import io.circe.Json +import io.circe.literal._ + +import com.snowplowanalytics.iglu.core.SelfDescribingData +import com.snowplowanalytics.iglu.core.circe.implicits._ + +import com.snowplowanalytics.snowplow.analytics.scalasdk.Event + +import com.snowplowanalytics.snowplow.postgres.loader.Database +import com.snowplowanalytics.snowplow.postgres.loader.storage.PgState +import com.snowplowanalytics.snowplow.postgres.loader.streaming.source.Data + +class sinkspec extends Database { + import Database._ + + "insertData" should { + "return Right with no entities (nothing to insert)" >> { + val action = for { + state <- Ref.of[IO, PgState](PgState(Map())) + result <- sink.insertData[IO](igluClient.resolver, logger, Schema, state, List.empty, true).value + } yield result + + action.unsafeRunSync() must beRight + } + } + + "goodSink" should { + "sink a single good event" >> { + val line = "snowplowweb\tweb\t2018-12-18 15:07:17.970\t2016-03-29 07:28:18.611\t2016-03-29 07:28:18.634\tpage_view\t11cdec7b-4cbd-4aa4-a4c9-3874ab9663d4\t\tsnplow6\tjs-2.6.0\tssc-0.6.0-kinesis\tspark-1.16.0-common-0.35.0\t34df2c48bc170c87befb441732a94196\t372d1f2983860eefd262b58e6592dfbc\t80546dc70f4a91f1283c4b6247e31bcf\t26e6412a2421eb923d9d40258ca9ca69\t1\t3a12e8b8e3e91a4d092b833d583c7e30\tDK\t82\tOdder\t8300\t42.0001\t42.003\tCentral Jutland\tTDC Danmark\tTDC Danmark\t\t\thttp://snowplowanalytics.com/documentation/recipes/catalog-analytics/market-basket-analysis-identifying-products-that-sell-well-together.html\tMarket basket analysis - identifying products and content that go well together – Snowplow\thttp://snowplowanalytics.com/analytics/catalog-analytics/market-basket-analysis-identifying-products-that-sell-well-together.html\thttp\tsnowplowanalytics.com\t80\t/documentation/recipes/catalog-analytics/market-basket-analysis-identifying-products-that-sell-well-together.html\t\t\thttp\tsnowplowanalytics.com\t80\t/analytics/catalog-analytics/market-basket-analysis-identifying-products-that-sell-well-together.html\t\t\tinternal\t\t\t\t\t\t\t\t{\"schema\":\"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0\",\"data\":[{\"schema\":\"iglu:com.snowplowanalytics.snowplow/web_page/jsonschema/1-0-0\",\"data\":{\"id\":\"05862d26-0dde-4d7a-a494-fc9aae283d23\"}},{\"schema\":\"iglu:org.schema/WebPage/jsonschema/1-0-0\",\"data\":{\"genre\":\"documentation\",\"inLanguage\":\"en-US\"}},{\"schema\":\"iglu:org.w3/PerformanceTiming/jsonschema/1-0-0\",\"data\":{\"navigationStart\":1459236496534,\"unloadEventStart\":1459236496838,\"unloadEventEnd\":1459236496838,\"redirectStart\":0,\"redirectEnd\":0,\"fetchStart\":1459236496534,\"domainLookupStart\":1459236496534,\"domainLookupEnd\":1459236496534,\"connectStart\":1459236496534,\"connectEnd\":1459236496534,\"secureConnectionStart\":0,\"requestStart\":1459236496580,\"responseStart\":1459236496834,\"responseEnd\":1459236496844,\"domLoading\":1459236496853,\"domInteractive\":1459236497780,\"domContentLoadedEventStart\":1459236497780,\"domContentLoadedEventEnd\":1459236498038,\"domComplete\":0,\"loadEventStart\":0,\"loadEventEnd\":0,\"chromeFirstPaint\":1459236498203}}]}\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\tMozilla/5.0 (Windows NT 6.1) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/49.0.2623.87 Safari/537.36\tChrome 49\tChrome\t49.0.2623.87\tBrowser\tWEBKIT\ten-US\t1\t1\t0\t0\t0\t0\t0\t0\t0\t1\t24\t1920\t1075\tWindows 7\tWindows\tMicrosoft Corporation\tEurope/Berlin\tComputer\t0\t1920\t1200\tUTF-8\t1903\t11214\t\t\t\t\t\t\t\tEurope/Copenhagen\t\t\t\t2016-03-29 07:28:18.636\t\t\t{\"schema\":\"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-1\",\"data\":[{\"schema\":\"iglu:com.snowplowanalytics.snowplow/ua_parser_context/jsonschema/1-0-0\",\"data\":{\"useragentFamily\":\"Chrome\",\"useragentMajor\":\"49\",\"useragentMinor\":\"0\",\"useragentPatch\":\"2623\",\"useragentVersion\":\"Chrome 49.0.2623\",\"osFamily\":\"Windows\",\"osMajor\":\"7\",\"osMinor\":null,\"osPatch\":null,\"osPatchMinor\":null,\"osVersion\":\"Windows 7\",\"deviceFamily\":\"Other\"}}]}\t88c23330-ac4d-4c82-8a18-aa83c1e0c163\t2016-03-29 07:28:18.609\tcom.snowplowanalytics.snowplow\tpage_view\tjsonschema\t1-0-0\tcab5ba164038f31d8e10befc4eb199df\t" + val event = Event.parse(line).getOrElse(throw new RuntimeException("Event is invalid")) + val stream = Stream.emit[IO, Data](Data.Snowplow(event)) + + val action = for { + (issues, state) <- PgState.init[IO](xa, logger, igluClient.resolver, Schema) + _ <- stream.through(sink.goodSink(xa, logger, Schema, state, igluClient)).compile.drain.action + eventIds <- query.action + uaParserCtxs <- count("com_snowplowanalytics_snowplow_ua_parser_context_1").action + } yield (issues, eventIds, uaParserCtxs) + + val result = action.value.unsafeRunSync() + val ExpectedEventId = UUID.fromString("11cdec7b-4cbd-4aa4-a4c9-3874ab9663d4") + result must beRight.like { + case (Nil, List(ExpectedEventId), 1) => ok + case (issues, ids, ctxs) => ko(s"Unexpected result. Issues: $issues; Event ids: $ids; Contexts: $ctxs") + } + } + + "sink a single self-describing JSON" >> { + val row = json"""{"schema":"iglu:com.getvero/bounced/jsonschema/1-0-0","data":{"bounce_type":"one"}}""" + val json = SelfDescribingData.parse(row).getOrElse(throw new RuntimeException("Invalid SelfDescribingData")) + val stream = Stream.emit[IO, Data](Data.SelfDescribing(json)) + + val action = for { + (issues, state) <- PgState.init[IO](xa, logger, igluClient.resolver, Schema) + _ <- stream.through(sink.goodSink(xa, logger, Schema, state, igluClient)).compile.drain.action + eventIds <- query.action + rows <- count("com_getvero_bounced_1").action + } yield (issues, eventIds, rows) + + val result = action.value.unsafeRunSync() + result must beRight.like { + case (Nil, Nil, 1) => ok + case (issues, ids, ctxs) => ko(s"Unexpected result. Issues: ${issues.mkString(", ")}; Event ids: ${ids.mkString(", ")}; Contexts: $ctxs") + } + } + + "sink a several self-describing JSONs with migrations" >> { + val rows = List( + json"""{"schema":"iglu:me.chuwy/pg-test/jsonschema/1-0-0","data":{"requiredString":"one","requiredUnion":false,"nested":{"a": 1}}}""", + json"""{"schema":"iglu:me.chuwy/pg-test/jsonschema/1-0-1","data":{"requiredString":"two", "requiredUnion": false, "nested": {"a": 2}, "someArray": [2,"two",{}]}}""", + json"""{"schema":"iglu:me.chuwy/pg-test/jsonschema/1-0-2","data":{"requiredString":"three","requiredUnion":"three","nested":{"a": 3},"bigInt": 3}}""" + ).map(SelfDescribingData.parse[Json]).map(_.getOrElse(throw new RuntimeException("Invalid SelfDescribingData"))).map(Data.SelfDescribing.apply) + + val stream = Stream.emits[IO, Data](rows) + + val ExpectedColumnInfo = List( + ColumnInfo("required_string", None, false, "character varying", Some(4096)), + ColumnInfo("required_union", None, false, "jsonb", None), + ColumnInfo("id", None, true, "uuid", None), + ColumnInfo("nested.a", None, true, "double precision", None), + ColumnInfo("nested.b", None, true, "character varying", Some(4096)), + ColumnInfo("some_array", None, true, "jsonb", None), + ColumnInfo("nested.c", None, true, "bigint", None), + ColumnInfo("some_date", None, true, "timestamp without time zone", None), + ColumnInfo("big_int", None, true, "bigint", None), + ) + + val action = for { + (issues, state) <- PgState.init[IO](xa, logger, igluClient.resolver, Schema) + _ <- stream.through(sink.goodSink(xa, logger, Schema, state, igluClient)).compile.drain.action + rows <- count("me_chuwy_pg_test_1").action + table <- describeTable("me_chuwy_pg_test_1").action + } yield (issues, rows, table) + + val result = action.value.unsafeRunSync() + result must beRight.like { + case (Nil, 3, ExpectedColumnInfo) => ok + case (issues, ctxs, cols) => ko(s"Unexpected result. Issues: $issues; Number of rows: $ctxs; Columns ${cols}") + } + } + } +}