Skip to content

Commit

Permalink
test #3736
Browse files Browse the repository at this point in the history
  • Loading branch information
BenFradet committed May 25, 2018
1 parent d04e830 commit 3d87f34
Show file tree
Hide file tree
Showing 2 changed files with 94 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
{
"schema": "iglu:com.snowplowanalytics.snowplow/ip_lookups/jsonschema/2-0-0",
"data": {
"name": "ip_lookups",
"vendor": "com.snowplowanalytics.snowplow",
"enabled": true,
"parameters": {
"geo": {
"database": "GeoLite2-City.mmdb",
"uri": "http://snowplow-hosted-assets.s3.amazonaws.com/third-party/maxmind"
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
/*
* Copyright (c) 2012-2018 Snowplow Analytics Ltd. All rights reserved.
*
* This program is licensed to you under the Apache License Version 2.0,
* and you may not use this file except in compliance with the Apache License Version 2.0.
* You may obtain a copy of the Apache License Version 2.0 at
* http://www.apache.org/licenses/LICENSE-2.0.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the Apache License Version 2.0 is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the Apache License Version 2.0 for the specific language governing permissions and
* limitations there under.
*/
package com.snowplowanalytics.snowplow.enrich.beam

import java.io.File
import java.net.URL
import java.nio.file.Paths

import scala.sys.process._

import com.spotify.scio.testing._
import org.apache.commons.codec.binary.Base64

class EnrichWithLocalFileSpec extends PipelineSpec {

val raw = Seq("CwBkAAAADTM3LjIyOC4yMjUuMzIKAMgAAAFjiJGp1QsA0gAAAAVVVEYtOAsA3AAAABJzc2MtMC4xMy4wLXN0ZG91dCQLASwAAAALY3VybC83LjUwLjMLAUAAAAAjL2NvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy90cDILAVQAAAFpeyJzY2hlbWEiOiJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy9wYXlsb2FkX2RhdGEvanNvbnNjaGVtYS8xLTAtNCIsImRhdGEiOlt7InR2IjoidHJhY2tlcl92ZXJzaW9uIiwiZSI6InVlIiwicCI6IndlYiIsInVlX3ByIjoie1wic2NoZW1hXCI6XCJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy91bnN0cnVjdF9ldmVudC9qc29uc2NoZW1hLzEtMC0wXCIsXCJkYXRhXCI6e1wic2NoZW1hXCI6XCJpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy9zY3JlZW5fdmlldy9qc29uc2NoZW1hLzEtMC0wXCIsXCJkYXRhXCI6e1wibmFtZVwiOlwiaGVsbG8gZnJvbSBTbm93cGxvd1wifX19In1dfQ8BXgsAAAAFAAAAO0hvc3Q6IGVjMi0zNC0yNDUtMzItNDcuZXUtd2VzdC0xLmNvbXB1dGUuYW1hem9uYXdzLmNvbToxMjM0AAAAF1VzZXItQWdlbnQ6IGN1cmwvNy41MC4zAAAAC0FjY2VwdDogKi8qAAAAG1RpbWVvdXQtQWNjZXNzOiA8ZnVuY3Rpb24xPgAAABBhcHBsaWNhdGlvbi9qc29uCwFoAAAAEGFwcGxpY2F0aW9uL2pzb24LAZAAAAAwZWMyLTM0LTI0NS0zMi00Ny5ldS13ZXN0LTEuY29tcHV0ZS5hbWF6b25hd3MuY29tCwGaAAAAJDEwZDk2YmM3LWU0MDAtNGIyOS04YTQxLTY5MTFhZDAwZWU5OAt6aQAAAEFpZ2x1OmNvbS5zbm93cGxvd2FuYWx5dGljcy5zbm93cGxvdy9Db2xsZWN0b3JQYXlsb2FkL3RocmlmdC8xLTAtMAA=")
val expected = List(
"web",
"2018-05-22 15:57:17.653",
"unstruct",
"tracker_version",
"ssc-0.13.0-stdout$",
s"beam-enrich-${generated.BuildInfo.version}",
"37.228.225.32",
"10d96bc7-e400-4b29-8a41-6911ad00ee98",
"IE",
"L",
"Dublin",
"D02",
"53.3331",
"-6.2489",
"Leinster",
"""{"schema":"iglu:com.snowplowanalytics.snowplow/unstruct_event/jsonschema/1-0-0","data":{"schema":"iglu:com.snowplowanalytics.snowplow/screen_view/jsonschema/1-0-0","data":{"name":"hello from Snowplow"}}}""",
"curl/7.50.3",
"com.snowplowanalytics.snowplow",
"screen_view",
"jsonschema",
"1-0-0"
)

"Enrich" should "enrich a unstruct event with geo ip information" in {
downloadLocalEnrichmentFile(
"http://snowplow-hosted-assets.s3.amazonaws.com/third-party/maxmind/GeoLite2-City.mmdb",
"./ip_geo"
)

JobTest[Enrich.type]
.args("--input=in", "--output=out", "--bad=bad",
"--resolver=" + Paths.get(getClass.getResource("/iglu_resolver.json").toURI()),
"--enrichments=" + Paths.get(getClass.getResource("/enrichments").toURI()))
.input(PubsubIO("in"), raw.map(Base64.decodeBase64))
.distCache(DistCacheIO("http://snowplow-hosted-assets.s3.amazonaws.com/third-party/maxmind/GeoLite2-City.mmdb"),
List(Right(Paths.get("./ip_geo"))))
.output(PubsubIO[String]("out"))(_ should satisfy { c: Iterable[String] =>
c.size == 1 && expected.forall(c.head.contains)
})
.output(PubsubIO[String]("bad"))(_ should beEmpty)
.run()

deleteLocalFile("./ip_geo")
}

private def downloadLocalEnrichmentFile(remoteLocation: String, localLocation: String): Unit =
new URL(remoteLocation).#>(new File(localLocation)).!!

private def deleteLocalFile(location: String): Unit = new File(location).delete

}

0 comments on commit 3d87f34

Please sign in to comment.