Skip to content

Setup Guide

Rostyslav Zatserkovnyi edited this page Dec 24, 2018 · 23 revisions

Using Snowplow Snowflake Loader

Overview

Snowflake Loader consists of two independent applications:

  • Snowplow Snowflake Transformer - Spark job responsible for transformning enriched events into Snowflake-compatible format
  • Snowplow Snowflake Loader - CLI application responsible for loading Snowplow-compatible enriched events into Snowflake DB

Both applications communicate through DynamoDB table, called "processing manifest" and used to maintain pipeline state. Both applications use same self-describing JSON configuration file of schema com.snowplowanalytics.snowplow.storage/snowflake_config/jsonschema/1-0-1, which contain both transformer- and loader-specific properties.

Snowflake loader is publicly available since version 0.3.1.

Configuration

Self-describing configuration JSON can contain following properties:

  • name - Required human-readable configuration name, e.g. Snowflake config
  • id - Optional machine-readable configuration id, e.g. UUID
  • auth - Object, containing information about how Snowflake should authenticate itself to load transformed data from S3. Explained below
  • awsRegion - AWS Region used by Transformer to access S3 and DynamoDB
  • manifest - AWS DynamoDB table name with processing manifest. Needs to be created manually (e.g. snowflake-event-manifest)
  • snowflakeRegion - AWS Region used by Snowflake to access its endpoint. Recommended to use same as $AWS_REGION if possible
  • database - Snowflake Database name. Database must be created manually (e.g. acme-snowplow)
  • input - S3 URL to Snowplow enriched events archive (e.g. s3://com-acme-snowplow/archive/enriched/)
  • stage - Arbitrary name for Snowflake Stage. Stage created automatically during setup step (e.g. snowplow_stage)
  • stageUrl - URL for Transformer-processed data (e.g. s3://com-acme-snowplow/snowflake/stage/)
  • warehouse - Snowflake Warehouse. Warehouse automatically created during setup step (e.g. snowplow_wh)
  • schema - Snowflake DB schema. Schema automatically created during setup step (e.g. atomic)
  • account - Snowflake account name. Must be obtained from Snowflake
  • username - Snowflake username. Must be obtained from Snowflake
  • password - Explained below
  • maxError - Optional Snowflake setting - a table copy statement will skip an input file when the number of errors in it exceeds the specified number.
  • jdbcHost - An optional host for the JDBC driver that has priority over automatically derived hosts.
  • purpose - Always ENRICHED_EVENTS

Following authentication options (auth) are possible:

  • roleArn and sessionDuration - Temporary credentials. Recommended.
  • accessKeyId and secretAccessKey - Static AWS credentials. Do not use this when launching loader as EMR step

password can be passed in two forms:

  • As ec2ParameterStore object. Password will be safely stored in EC2 Parameter store and not exposed. Recommended
  • Plain string

Final configuration can look like following:

{
  "schema": "iglu:com.snowplowanalytics.snowplow.storage/snowflake_config/jsonschema/1-0-1",
  "data": {
    "name": "Strawberry",
    "awsRegion": "us-east-1",
    "auth": {
        "roleArn": "arn:aws:iam::123456987010:role/SnowflakeLoadRole",
        "sessionDuration": 900
    },
    "manifest": "snowplow-snowflake-manifest",
    "snowflakeRegion": "us-west-1",
    "database": "snowplowdb",
    "input": "s3://archive/enriched/",
    "stage": "snowplow_stage",
    "stageUrl": "s3://archive/snowflake/transformed/",
    "warehouse": "wh",
    "schema": "atomic",
    "account": "acme",
    "username": "loader",
    "password": {
        "ec2ParameterStore": {
            "parameterName": "snowplow.snowflake.password"
        }
    },
    "maxError": 1,
    "jdbcHost": "acme.snowflakecomputing.com",
    "purpose": "ENRICHED_EVENTS"
  }
}

Also, both loader and transformer require Iglu Resolver configuration JSON. So far it used only to validate configuration JSON.

Both configuration and resolver can be passed as base64-encoded string with additional --base64 flag. Loader also can be invoked from local machine with plain file-paths (--base64 need to be omitted then).

Cross-batch deduplication

(Note that the feature described below is experimental as of version 0.4.0.)

Since version 0.4.0, Snowflake Transformer supports optional cross-batch deduplication that removes duplicate events across multiple loads using an "event manifest" DynamoDB table (not to be confused with the run manifest table). This table keeps the following information identifying events across multiple runs:

  • Event id - used to uniquely identify an event
  • Event fingerprint - used in conjunction with the event ID to uniquely identify duplicates
  • ETL timestamp - used to check if an event is being reprocessed due to a previous run being aborted
  • Time to live - timestamp allowing DynamoDB automatically clean-up stale objects (set to etl_tstamp plus 180 days)

Cross-batch deduplication can be enabled by creating an additional config file with the following properties:

  • name - Required human-readable configuration name, e.g. Snowflake deduplication config
  • id - Required machine-readable configuration id, e.g. UUID
  • auth - An object containing information about authentication use to read and write data to DynamoDB. Similar to the auth object in the main Snowflake config, this can use a accessKeyId/secretAccessKey pair or be set to null, in which case default credentials will be retrieved.
  • awsRegion - AWS Region used by Transformer to access DynamoDB
  • dynamodbTable - DynamoDB table used to store information about duplicate events
  • purpose - Always EVENTS_MANIFEST

An example of this auxiliary configuration is as follows:

{
  "schema": "iglu:com.snowplowanalytics.snowplow.storage/amazon_dynamodb_config/jsonschema/2-0-0",
  "data": {
    "name": "eventsManifest",
    "auth": {
      "accessKeyId": "fakeAccessKeyId",
      "secretAccessKey": "fakeSecretAccessKey"
    },
    "awsRegion": "us-east-1",
    "dynamodbTable": "acme-crossbatch-dedupe",
    "id": "ce6c3ff2-8a05-4b70-bbaa-830c163527da",
    "purpose": "EVENTS_MANIFEST"
  }
}

This configuration can be passed to Transformer using the optional --events-manifest flag, either as a file path or as a base64-encoded string if the --base64 flag is also set.

Setup

Setting up a load role

This section is only for users loading data into Snowflake using roleArn/sessionDuration auth mechanism. With this mechanism, Snowflake Loader sends AssumeRole request to AWS Security Token Service and it returns temporary credentials (with lifespan equal sessionDuration of seconds), which then passed with COPY INTO statement, allowing Snowflake to aunthenticate itself in your account. This is similar to what RDB Loader does for loading Redshift, main difference is that Snowflake Loader authenticates third-party AWS account (belonging to Snowflake Computing) to read data from S3.

We highly recommend to use this method instead of static credentials.

First step is to create necessary AWS IAM entities restricted only to reading data from S3.

Create following IAM Policy, called SnowflakeLoadPolicy:

{
    "Version": "2012-10-17",
    "Statement": [
        {
            "Sid": "VisualEditor0",
            "Effect": "Allow",
            "Action": [
                "s3:GetAccelerateConfiguration",
                "s3:GetObject",
                "s3:GetObjectVersion"
            ],
            "Resource": [
                "arn:aws:s3:::YOUR-SNOWFLAKE-BUCKET/prefix/*"
            ]
        },
        {
            "Sid": "VisualEditor1",
            "Effect": "Allow",
            "Action": [
                "s3:ListBucket"
            ],
            "Resource": [
                "arn:aws:s3:::YOUR-SNOWFLAKE-BUCKET"
            ]
        }
    ]
}

This policy allows read-only access to your S3 bucket.

Next, you need to create an IAM role that will provide credentials.

  1. IAM -> Roles -> Create role -> AWS service -> EC2
  2. Attach just created SnowflakeLoadPolicy
  3. Trust relationships -> Edit Trust relationship
  4. Insert following document (replacing 123456789123 with your account id and EMR_EC2_DefaultRole with your EMR role) and save it:
{
  "Version": "2012-10-17",
  "Statement": [
    {
      "Effect": "Allow",
      "Principal": {
        "AWS": "arn:aws:iam::123456789123:role/EMR_EC2_DefaultRole"
      },
      "Action": "sts:AssumeRole"
    }
  ]
}
  1. Now save Role ARN as your roleArn in target configuration

Setting up Snowflake

Snowflake Loader provides quick setup action that automatically creates following entities:

  • atomic database schema
  • atomic.events table to store enriched events
  • File format - entity to describe how Snowplow enriched data should be processed
  • External Stage in atomic schema - reference to S3 path; output of Snowplow Snowflake Transformer. Can optionally contain AWS credentials
  • Virtual Warehouse - computing entity of Snowflake; smallest (X-Small) be default

All above safely can have default settings. Warehouse can be scaled up manually.

Two things you need to create manually are Snowflake database and DynamoDB table. After database is created, you can run setup.

To do this you need to use setup CLI action for Snowflake Loader:

$ aws s3 cp s3://snowplow-hosted-assets/4-storage/snowflake-loader/snowplow-snowflake-loader-0.4.0.jar .

$ java -jar snowplow-snowflake-transformer-0.4.0.jar \
    setup \
    --config /path/to/self-describing-config.json \
    --resolver /path/to/resolver.json 

# OR

$ java -jar snowplow-snowflake-transformer-0.4.0.jar \
    setup \
    --base64
    --config $(cat /path/to/self-describing-config.json | base64 -w 0) \
    --resolver $(cat /path/to/resolver.json | base64 -w 0)

Storing credentials in stage

Apart from using AWS Role and static credentials, it is also possible to save credentials in Snowflake stage.

This can be added manually (if stage already exists). Snowflake Console -> Databases -> YOUR DB -> Stages -> Edit YOUR STAGE Or during setup from local machine (if stage doesn't exist). If you add credentials to config, run setup - they'll be added to stage and after than can be safely removed from config

DynamoDB

To use DynamoDB table as processing manifest you need to create table with partition key RunId with string type and fill manifest property in configuration with newly created table name.

Snowflake

Snowplow data in Snowflake is stored in single fat table called atomic.events (schema can be changed, table name cannot).

Initial atomic.events DDL for Snowflake can be found in atomic-def.sql.

Dataflow Runner

Dataflow Runner used to run Snowplow Snowflake Transformer Spark job on EMR cluster. It also can run loader.

EMR Cluster has default configuration. Only ec2.keyName and logUri must be changed. Everything else is optional. Edit and save below as cluster.json:

{
   "schema":"iglu:com.snowplowanalytics.dataflowrunner/ClusterConfig/avro/1-1-0",
   "data":{
      "name":"dataflow-runner - snowflake transformer",
      "logUri":"s3://snowplow-snowflake-test/logs/",
      "region":"us-east-1",
      "credentials":{
         "accessKeyId":"env",
         "secretAccessKey":"env"
      },
      "roles":{
         "jobflow":"EMR_EC2_DefaultRole",
         "service":"EMR_DefaultRole"
      },
      "ec2":{
         "amiVersion":"5.9.0",
         "keyName":"key-name",
         "location":{
            "vpc":{
               "subnetId":null
            }
         },
         "instances":{
            "master":{
               "type":"m2.xlarge"
            },
            "core":{
               "type":"m2.xlarge",
               "count":1
            },
            "task":{
               "type":"m1.medium",
               "count":0,
               "bid":"0.015"
            }
         }
      },
      "tags":[ ],
      "bootstrapActionConfigs":[ ],
      "configurations":[
         {
            "classification":"core-site",
            "properties":{
               "Io.file.buffer.size":"65536"
            }
         },
         {
            "classification":"mapred-site",
            "properties":{
               "Mapreduce.user.classpath.first":"true"
            }
         },
         {
            "classification":"yarn-site",
            "properties":{
               "yarn.resourcemanager.am.max-attempts":"1"
            }
         },
         {
            "classification":"spark",
            "properties":{
               "maximizeResourceAllocation":"true"
            }
         }
      ],
      "applications":[ "Hadoop", "Spark" ]
   }
}

Edit and save below as playbook.json:

{
   "schema":"iglu:com.snowplowanalytics.dataflowrunner/PlaybookConfig/avro/1-0-1",
   "data":{
      "region":"{{.awsRegion}}",
      "credentials":{
         "accessKeyId":"env",
         "secretAccessKey":"env"
      },
      "steps":[
         {
            "type":"CUSTOM_JAR",
            "name":"Snowflake Transformer",
            "actionOnFailure":"CANCEL_AND_WAIT",
            "jar":"command-runner.jar",
            "arguments":[
               "spark-submit",
               "--conf",
               "spark.hadoop.mapreduce.job.outputformat.class=com.snowplowanalytics.snowflake.transformer.S3OutputFormat",
               "--deploy-mode",
               "cluster",
               "--class",
               "com.snowplowanalytics.snowflake.transformer.Main",

               "s3://snowplow-hosted-assets/4-storage/snowflake-loader/snowplow-snowflake-transformer-0.4.0.jar",

               "--config",
               "{{base64File "./config.json"}}",
               "--resolver",
               "{{base64File "./resolver.json"}}",
               "--events-manifest",
               "{{base64File "./events_manifest.json"}}"
            ]
         },

         {
            "type":"CUSTOM_JAR",
            "name":"Snowflake Loader",
            "actionOnFailure":"CANCEL_AND_WAIT",
            "jar":"s3://snowplow-hosted-assets/4-storage/snowflake-loader/snowplow-snowflake-loader-0.4.0.jar",
            "arguments":[
               "load",
               "--base64",
               "--config",
               "{{base64File "./config.json"}}",
               "--resolver",
               "{{base64File "./resolver.json"}}"
            ]
         }
      ],
      "tags":[ ]
   }
}

To run above configuration you can use following command:

$ dataflow-runner run-transient --emr-config cluster.json --emr-playbook playbook.json

This will start both Transformer and Loader on EMR cluster.

Note that loader also can be launched on local machine, with paths specified for --config and --resolver - you'll have to omit --base64 for that.

Back-populating run manifest

In order to pre-populate manifest with run ids that have to be never loaded you can use backfill.py script.

Script requires to have Python 3, Snowplow Python Analytics SDK 0.2.3+ and boto3:

$ pip install boto3 snowplow_analytics_sdk
$ wget https://raw.githubusercontent.com/snowplow-incubator/snowplow-snowflake-loader/release/0.4.0/backfill.py   # Won't actually be downloaded as repository is private

Script accepts 6 required arguments. Notice startdate, this is the date since which (inclusive) transformer should process run ids:

$ ./backfill.py \
    --startdate 2017-08-22-01-01-01 \
    --region $AWS_REGION \
    --manifest-table-name $DYNAMODB_MANIFEST \
    --enriched-archive $TRANSFORMER_INPUT \
    --aws-access-key-id=$AWS_ACCESS_KEY_ID \
    --aws-secret-access-key=$AWS_SECRET_KEY

Upgrading from 0.1.0

  • To update Snowflake infrastructure another setup must be launched - it'll create a new file format snowplow_enriched_json
  • Both setup and load subcommands now accept required --snowflake-region option

Upgrading from 0.2.0

  • Biggest change is that both transformer and loader now accept common configuration file (--configuration) and Iglu Resolver config (--resolver) instead of specific CLI options
  • Static credentials are considered deprecated now, use AWS Role (or Snowflake stage)
  • If you don't want to store credentials in config and already have existing stage - you'll need to add credentials to stage object.
  • Snowflake transformer now depends on Spark 2.2.0 and therefore requires amiVersion in cluster.json to be set to 5.9.0

Upgrading from 0.3.0

  • Due to several columns in atomic.events being widened to support pseudonymization and MaxMind changes, the table schema on Snowflake will need to be migrated - in order to automatically update the relevant column definitions, run java -jar snowplow-snowflake-loader-0.4.0.jar migrate --loader-version 0.4.0
Clone this wiki locally
You can’t perform that action at this time.