Skip to content
/ ogre Public

Contains tools to run different simple data pipelines

Notifications You must be signed in to change notification settings

ogreflow/ogre

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

5 Commits
 
 
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

O.G.R.E TOOLBOX
===============

General
-----------------------------------------------------------------------

Ogre is a suite of tools to facilitate for Extract, Load and Transform data in different formats in a
AWS S3 based data Lake.

Ogre is open sourced by Widespace AB under Apache License, Version 2.0:
   https://opensource.org/licenses/Apache-2.0

From Widespaces point of view the AWS S3 data lake is the company wide repository where different applications publish
their data and making it available to others. The data lake is hosted in S3 buckets and there are some basic rules to
follow in order to publish data there:

 1. The data should be packed as records in AVRO container files.
 2. Time series data records should contain an id and timestamp in UTC.
 3. All records within a data file must belong to the same hour.
 4. Data files are uploaded and published into the data lake using following S3 URL pattern:

    s3://<bucket>/<format>/<component>/<source>/<type>/d=<yyyy-MM-dd>/h=<hour>/<filename>

    Where:

    - <bucket>      The Data Lake bucket
    - <format>      Format of the files under the component (e.g. avro, json)
    - <component>   The application/component producing data (e.g. engine, rs, apls, inscreenservice)
    - <source>      The name of log/data (e.g. rl2, accesslogs, perflogs, inscreenlogs)
    - <type>        The log record type (e.g delivery, impression, click, currentposition)
    - <yyyy-MM-dd>  Origination date of the file.
                    The year using 4 digits (e.g. 2015)
                    The month in year using 2 digits (i.e. 01, 02, ... , 11, 12)
                    The day of month using 2 digits (i.e. 01, 02, ... , 30, 31)
    - <hour>        The hour of day using 2 digits (e.g. 00, 01, ..., 22, 23)
    - <filename>    The data filename having the ".avro" extension. As a guideline also start names with <minute><second>
                    so they are listed in chronological order for readability. Also add a random number to make sure to
                    avoid name conflicts if more than one nodes in a cluster uploading data files.

 5. Upload data file using appropriate storage plan to save costs. The S3 Infrequent Access plan is the one to use in
    99% of the cases.


Many options exists to join and process the data in the Data Lake:

 - Redshift (Ogre can load/sync data from lake into a custom Redshift)
 - Athena   (Athena can operate directly on the datalake avro files)
 - EMR      (can work with avro files in S3 natively)
 - Spark    (can work with avro files in S3 natively)
 - Kinesis  (Ogre can load/sync data from lake into custom Kinesis streams)


The Ogre tools are implemented as simple CLI tools. Following tools exist to help making the data in lake
available in Redshift, Kinesis etc:

1. avro2json: Converts avro files to json. Nested lists and maps are also explodes into own separate two dimensional
           json files. Exploded maps and lists will have foreign keys to main data file for joining. This tool is mainly
           used to convert avro files to json for speedier imports into Redshift.
           The tool supports bulk converts for a period or continuously converting new data as it is published in the
           lake.

2. data2redshift: Imports data into a Redshift. Handles automatic AVRO/DDL schema changes and data partitioning
           seamlessly. The tool tool supports bulk import for a period or continuously import of new data as it is
           published in the lake.

3. avro2kinesis: Loads data into a custom Kinesis Stream. The tool supports bulk loading for a period or continuously
           loading new data into the stream as it is published in the lake.

4. avro2s3: Copies data files from a S3 data lake location into another. This to be able to copy data from the old
           unstructured path into new structured data lake path. Also, for any path, we can copy data from 1 location
           to another location (e.g. bucket).

5. db2avro: Runs arbitrary SQL scripts in a relational database (like MySQL or Redshift), the result is dynamically
           converted to AVRO and uploaded to S3. Support for dependency check to make sure required data is in place
           before script is executed. This tool is mainly used to dump dimension tables to the data lake as well as
           running ETL jobs aggregating data in e.g. a Redshift and dump the result in datalake. An ETL chain can be
           build using this tool.

6. data2rds: Imports data into a RDS DB. Handles automatic AVRO/DDL schema changes and data partitioning
            seamlessly. The tool tool supports bulk import for a period or continuously import of new data as it is
            published in the lake.


Installation
-----------------------------------------------------------------------

- Import the Athena and Redshift drivers into your local maven repository:

  mvn install:install-file -Dfile=lib/RedshiftJDBC41.jar -DgroupId=com.amazon.redshift -DartifactId=aws-redshift-jdbc41 -Dversion=1.1.13.1013 -Dpackaging=jar
  mvn install:install-file -Dfile=lib/AthenaJDBC41_2.0.9.jar -DgroupId=com.amazonaws.athena.jdbc -DartifactId=AthenaJDBC41 -Dversion=2.0.9 -Dpackaging=jar

- Build the ogre.jar and the ogre-x.x.x-bin.tar.gz package (output dir: target folder)

  mvn install

- Extract the ogre-x.x.x-bin.tar.gz file on the machine to use to install it


avro2json
- - - - - - - - - - - - - - - - - -

Objective:

To convert and explode AVRO data files preparing for import into Redshift.

Prerequisites:

- IAM user(s) with access to S3 where to read the avro files and where to write the converted to json files.

Command:

>  avro2json sync <int> <lookback> -threads <x> -types <x,y,z> -config <x>
>  avro2json load <from> <to>      -threads <x> -types <x,y,z> -config <x> -replace

Command args:

 - <int>             The interval in seconds to poll for new Avro files to convert.
 - <lookback>        The time in hours to search backwards for new files to sync

 - <from>            The start of period to convert Avro files from in format yyyy-MM-dd:HH
 - <end>             The end of period to convert Avro files from in format yyyy-MM-dd:HH

Command options:

 - threads           The number of threads to use for converting and upload the the files. Default: 1.
 - replace           Will replace already converted JSON files. Default: Files are not replaced
 - types             Comma separated list of types to convert. Default: All types defined in config file
 - config            The configuration file to use. Default: avro2json.conf


Configuration file:

    log4j.configuration = log4j.xml

    types               = delivery, impression, click
    type.*.include      = $.deliveryId,$.timestamp

    src.s3.accessKeyId  = xxxx
    src.s3.secretKey    = xxxx
    src.s3.rootPath     = s3://

    dst.s3.accessKeyId  = xxxx
    dst.s3.secretKey    = xxxx
    dst.s3.rootPath     = s3://
    dst.s3.storageClass = STANDARD_IA


Configuration parameters:

 - <log4j.configuration>      (optional)  To point out an external log4j.xml file to use

 - <types>                    (mandatory) To define the types to convert. This can be overriden by the -types CLI arg.

 - <type.[type].include>      (optional)  Comma separated list of json paths to Avro fields which to include in extracted
                                          arrays/maps. The '[type]' should be replaced with the type/name of array/map
                                          where to include fields into. '[type]' can include the glob wildcards * and ?.
                                          This is useful for adding important fields such as RedShift dist and sort keys.

 - <src.s3.accessKeyId>       (mandatory) AWS key with permissions to read avro files in <src.s3.rootPath>
 - <src.s3.secretKey>         (mandatory) AWS secret key
 - <src.s3.rootPath>          (mandatory) The S3 url to Avro root folder. I.e. the folder containing '<type>' folders.

 - <dst.s3.accessKeyId>       (mandatory) AWS key with permissions to read/write JSON files in <dst.s3.rootPath>
 - <dst.s3.secretKey>         (mandatory) AWS secret key
 - <dst.s3.rootPath>          (mandatory) The S3 url to JSON root folder. I.e. the folder containing '<type>' folders.
 - <dst.s3.storageClass>      (optional)  The S3 storage class to use for uploaded json data files. Available:
                                          STANDARD, STANDARD_IA, REDUCED_REDUNDANCY, GLACIER. Default: STANDARD_IA


Examples:

>  avro2json sync 60 1 -threads 20

This will continuously scan S3 every minute for new Avro files. Ogre will look back one hour and new files are converted
and shipped to S3 in parallel by 20 threads.


>  avro2json load 2015-10-01:00 2015-10-21:23 -threads 20 -replace

This will bulk convert Avro files from 2015-10-01 00:00:00 to 2015-10-21 23:59:59 to JSON. Existing JSON files for
period will be replaced.


data2redshift
- - - - - - - - - - - - - - - - - -

Ogre needs to bootstrap Redshift with some Ogre tables keeping track of import state before any files can be imported.
This is done using the "data2redshift init" command.

Multiple Ogre pipelines importing data from different data sources into same Redshift cluster is a valid use case when to
join different types of data together and doing analytics.

 +--------+
 |   S3   |   Ogre X
 | Data X | --------+
 |  JSON  |         |        +----------+
 +--------+         +------> |          |
                             | Redshift | <----- Analytic queries...
 +--------+         +------> |          |
 |   S3   |         |        +----------+
 | Data Y | --------+
 |  JSON  |   Ogre Y
 +--------+

E.g. Data X resp Y could be the RL2 resp Inscreen Logs to join and doing analytics on.

NOTE: Each Ogre process MUST upload its data into a dedicated Redshift schema else there will be conflicts and
deadlocks. I.e. Ogre processes CANNOT share the same Redshift schema.

Each Ogre process should have a dedicated src.s3.ddldir and s3.tempdir while sharing the same s3.avroroot and s3.jsonroot
is perfectly fine.


Objective:

To import data into a Redshift.

Prerequisites:

- IAM user(s) with access to S3
- Redshift cluster
- S3 folder with the DDLs to Ogre into Redshift
- S3 folder for temporary files

Command:

> data2redshift -init -config <x>
> data2redshift -sync <int> <lookback> -types <x,y,z> -config <x>
> data2redshift -load <from> <to> -replace -types <x,y,z> -config> <x>

Command args:

 - <int>             The interval in seconds to poll for new data files to import.
 - <lookback>        The time in hours to search backwards for new files to import

 - <from>            The start of period to import data files from (in format yyyy-MM-dd:HH)
 - <end>             The end of period to import data files from (in format yyyy-MM-dd:HH)

Command options:

 - replace           Will replace already imported data files. Default: Data is not replaced
 - types             Comma separated list of types to import. Default: All types defined in config file
 - config            The configuration file to use. Default: data2redshift.conf

Configuration file:

    log4j.configuration     = log4j.xml

    src.s3.accessKeyId      = xxx
    src.s3.secretKey        =
    src.s3.rootdir          = s3://xxx
    src.s3.tmpdir           = s3://xxx
    src.s3.ddldir           = s3://xxx

    dst.redshift.host       = xxx
    dst.redshift.database   = xxx
    dst.redshift.schema     = xxx
    dst.redshift.user       = xxx
    dst.redshift.password   = xxx

    partitioning.xxx        = public.xxx:daily:30

Configuration parameters:

 - log4j.configuration      (optional)  To point out an external log4j.xml file to use

 - <src.s3.accessKeyId>     (mandatory) AWS key with permissions to read avro files in <src.s3.rootPath>
 - <src.s3.secretKey>       (mandatory) AWS secret key
 - <src.s3.rootdir>         (mandatory) The S3 url to data root folder. I.e. the folder containing '<type>' folders.
 - <src.s3.tmpdir>          (mandatory) An temp dir on S3 for Ogre to write temporary files
 - <src.s3.ddldir>          (mandatory) The S3 folder containing the Redshift DDL files where to import data in

 - <dst.redshift.host>      (mandatory) The Redshift host name
 - <dst.redshift.database>  (mandatory) The Redshift database to import data into
 - <dst.redshift.schema>    (mandatory) The schema where to import data. Every Ogre import must have a dedicated schema
 - <dst.redshift.user>      (mandatory) The user name
 - <dst.redshift.password>  (mandatory) The password

 - <partitioning.<type>>    (optional)  If to partition a type. Format <view name>:<partition type>:<nbr of partitions>
                                        The <view name> is the union all view consolidating all partition tables,
                                        <partition type> is one of hourly, daily, weekly, monthly.

Examples:

>  data2redshift sync 60 1

This will continuously scan S3 every minute for new json data files. Ogre will look back one hour and new files are
imported into redshift.


>  data2redshift load 2015-10-01:00 2015-10-21:23 -replace

This will bulk import json data files from 2015-10-01 00:00:00 to 2015-10-21 23:59:59 into Redshift. Existing data for
period will be replaced.


Ogre instance type considerations
-----------------------------------------------------------------------

The EC2 instance type to chose for Ogre depends on the task. For Ogre to convert AVRO to Gzipped JSON requires
an instance with relatively high network capacity as well as CPU. For long batches of jsonload c4.2xlarge is
recommended.

For jsonsync a suitable instance type would be a m3.medium.

For dbload and dbsync a t2.medium is fine.


Surveillance and alarming
-----------------------------------------------------------------------

Setup Cloudwatch alarms for Ogre instance health.

Setup Ogre application SNS alarming by editing the log4j.xml file and enable the SnsAsyncAppender:

    <appender name="sns" class="com.ws.ogre.utils.SnsAsyncAppender">
       <param name="threshold" value="WARN"/>
       <param name="topicArn" value="arn:aws:sns:eu-west-1:xxx"/>
       <param name="awsKeyId" value="xxx"/>
       <param name="awsSecretKey" value="xxx"/>

        <layout class="org.apache.log4j.PatternLayout">
            <param name="ConversionPattern" value="%p %t %c - %m%n"/>
        </layout>
     </appender>

    <root>
        <appender-ref ref="stdout"/>
        <appender-ref ref="sns"/>
    </root>

This will send application alarms as SNS notifications.

About

Contains tools to run different simple data pipelines

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published