Skip to content

Commit

Permalink
Schema DDL: add support for ZSTD encoding (close #237)
Browse files Browse the repository at this point in the history
  • Loading branch information
miike authored and oguzhanunlu committed Jan 25, 2018
1 parent cb1a2b3 commit bc7d7aa
Show file tree
Hide file tree
Showing 5 changed files with 24 additions and 18 deletions.
Expand Up @@ -69,3 +69,5 @@ case object RunLengthEncoding extends CompressionEncodingValue { def toDdl = "RU
case object Text255Encoding extends CompressionEncodingValue { def toDdl = "TEXT255" }

case object Text32KEncoding extends CompressionEncodingValue { def toDdl = "TEXT32K" }

case object ZstdEncoding extends CompressionEncodingValue { def toDdl = "ZSTD"}
Expand Up @@ -85,19 +85,19 @@ object DdlGenerator {

// Columns with data taken from self-describing schema
private[redshift] val selfDescSchemaColumns = List(
Column("schema_vendor", RedshiftVarchar(128), Set(CompressionEncoding(RunLengthEncoding)), Set(Nullability(NotNull))),
Column("schema_name", RedshiftVarchar(128), Set(CompressionEncoding(RunLengthEncoding)), Set(Nullability(NotNull))),
Column("schema_format", RedshiftVarchar(128), Set(CompressionEncoding(RunLengthEncoding)), Set(Nullability(NotNull))),
Column("schema_version", RedshiftVarchar(128), Set(CompressionEncoding(RunLengthEncoding)), Set(Nullability(NotNull)))
Column("schema_vendor", RedshiftVarchar(128), Set(CompressionEncoding(ZstdEncoding)), Set(Nullability(NotNull))),
Column("schema_name", RedshiftVarchar(128), Set(CompressionEncoding(ZstdEncoding)), Set(Nullability(NotNull))),
Column("schema_format", RedshiftVarchar(128), Set(CompressionEncoding(ZstdEncoding)), Set(Nullability(NotNull))),
Column("schema_version", RedshiftVarchar(128), Set(CompressionEncoding(ZstdEncoding)), Set(Nullability(NotNull)))
)

// Snowplow-specific columns
private[redshift] val parentageColumns = List(
Column("root_id", RedshiftChar(36), Set(CompressionEncoding(RawEncoding)), Set(Nullability(NotNull))),
Column("root_tstamp", RedshiftTimestamp, Set(CompressionEncoding(LzoEncoding)), Set(Nullability(NotNull))),
Column("ref_root", RedshiftVarchar(255), Set(CompressionEncoding(RunLengthEncoding)), Set(Nullability(NotNull))),
Column("ref_tree", RedshiftVarchar(1500), Set(CompressionEncoding(RunLengthEncoding)), Set(Nullability(NotNull))),
Column("ref_parent", RedshiftVarchar(255), Set(CompressionEncoding(RunLengthEncoding)), Set(Nullability(NotNull)))
Column("root_tstamp", RedshiftTimestamp, Set(CompressionEncoding(ZstdEncoding)), Set(Nullability(NotNull))),
Column("ref_root", RedshiftVarchar(255), Set(CompressionEncoding(ZstdEncoding)), Set(Nullability(NotNull))),
Column("ref_tree", RedshiftVarchar(1500), Set(CompressionEncoding(ZstdEncoding)), Set(Nullability(NotNull))),
Column("ref_parent", RedshiftVarchar(255), Set(CompressionEncoding(ZstdEncoding)), Set(Nullability(NotNull)))
)


Expand Down Expand Up @@ -196,7 +196,7 @@ object DdlGenerator {
)

// List of compression encoding suggestions
val encodingSuggestions: List[EncodingSuggestion] = List(lzoSuggestion)
val encodingSuggestions: List[EncodingSuggestion] = List(lzoSuggestion, zstdSuggestion)


/**
Expand Down Expand Up @@ -231,7 +231,7 @@ object DdlGenerator {
* Takes each suggestion out of ``compressionEncodingSuggestions`` and
* decide whether current properties satisfy it, then return the compression
* encoding.
* If nothing suggested LZO Encoding returned as default
* If nothing suggested ZSTD Encoding returned as default
*
* @param properties is a string we need to recognize
* @param dataType redshift data type for current column
Expand All @@ -247,7 +247,7 @@ object DdlGenerator {
: CompressionEncoding = {

suggestions match {
case Nil => CompressionEncoding(LzoEncoding) // LZO is default for user-generated
case Nil => CompressionEncoding(ZstdEncoding) // ZSTD is default for user-generated
case suggestion :: tail => suggestion(properties, dataType, columnName) match {
case Some(encoding) => CompressionEncoding(encoding)
case None => getEncoding(properties, dataType, columnName, tail)
Expand Down
Expand Up @@ -29,5 +29,9 @@ object EncodeSuggestions {
case _ => None
}


val zstdSuggestion: EncodingSuggestion = (properties, dataType, columnName) =>
dataType match {
case RedshiftVarchar(_) => Some(ZstdEncoding)
case _ => None
}
}
Expand Up @@ -46,8 +46,8 @@ class DdlGeneratorSpec extends Specification { def is = s2"""
DdlGenerator.selfDescSchemaColumns ++
DdlGenerator.parentageColumns ++
List(
Column("foo",RedshiftVarchar(30),Set(CompressionEncoding(LzoEncoding)),Set(Nullability(NotNull))),
Column("bar",RedshiftVarchar(5),Set(CompressionEncoding(LzoEncoding)),Set())
Column("foo",RedshiftVarchar(30),Set(CompressionEncoding(ZstdEncoding)),Set(Nullability(NotNull))),
Column("bar",RedshiftVarchar(5),Set(CompressionEncoding(ZstdEncoding)),Set())
),
Set(ForeignKeyTable(NonEmptyList("root_id"),RefTable("atomic.events",Some("event_id")))),
Set(Diststyle(Key), DistKeyTable("root_id"),SortKeyTable(None,NonEmptyList("root_tstamp")))
Expand All @@ -74,7 +74,7 @@ class DdlGeneratorSpec extends Specification { def is = s2"""
DdlGenerator.parentageColumns ++
List(
Column("foo",RedshiftBoolean,Set(CompressionEncoding(RunLengthEncoding)),Set(Nullability(NotNull))),
Column("bar",RedshiftVarchar(5),Set(CompressionEncoding(LzoEncoding)),Set()),
Column("bar",RedshiftVarchar(5),Set(CompressionEncoding(ZstdEncoding)),Set()),
Column("baz",RedshiftBoolean,Set(CompressionEncoding(RunLengthEncoding)),Set())
),
Set(ForeignKeyTable(NonEmptyList("root_id"),RefTable("atomic.events",Some("event_id")))),
Expand Down
Expand Up @@ -51,7 +51,7 @@ class MigrationGeneratorSpec extends Specification { def is = s2"""
|BEGIN TRANSACTION;
|
| ALTER TABLE atomic.com_acme_launch_missles_1
| ADD COLUMN "status" VARCHAR(4096) ENCODE LZO;
| ADD COLUMN "status" VARCHAR(4096) ENCODE ZSTD;
|
| COMMENT ON TABLE atomic.com_acme_launch_missles_1 IS 'iglu:com.acme/launch_missles/jsonschema/1-0-1';
|
Expand Down Expand Up @@ -108,9 +108,9 @@ class MigrationGeneratorSpec extends Specification { def is = s2"""
|BEGIN TRANSACTION;
|
| ALTER TABLE atomic.com_acme_launch_missles_1
| ADD COLUMN "status" VARCHAR(4096) ENCODE LZO;
| ADD COLUMN "status" VARCHAR(4096) ENCODE ZSTD;
| ALTER TABLE atomic.com_acme_launch_missles_1
| ADD COLUMN "launch_time" TIMESTAMP ENCODE LZO;
| ADD COLUMN "launch_time" TIMESTAMP ENCODE ZSTD;
| ALTER TABLE atomic.com_acme_launch_missles_1
| ADD COLUMN "latitude" DOUBLE PRECISION ENCODE RAW;
| ALTER TABLE atomic.com_acme_launch_missles_1
Expand Down

0 comments on commit bc7d7aa

Please sign in to comment.