Skip to content

Commit

Permalink
Add test for pii enrichment
Browse files Browse the repository at this point in the history
  • Loading branch information
spenes committed Aug 24, 2021
1 parent df74d12 commit 2c4d9ab
Show file tree
Hide file tree
Showing 2 changed files with 46 additions and 0 deletions.
Expand Up @@ -71,11 +71,27 @@ object Database {
characterMaximumLength: Option[Int]
)

case class PiiFields(domainUserId: String,
networkUserId: String,
domainSessionId: String,
userIpAddress: String,
refrDomainUserId: String
)

def query: IO[List[UUID]] = {
val tablefr = Fragment.const0(EventsTableName)
fr"SELECT event_id FROM $tablefr".query[UUID].to[List].transact(xa)
}

def queryPiiFields: IO[List[PiiFields]] = {
val tablefr = Fragment.const0(EventsTableName)
fr"SELECT domain_userid, network_userid, domain_sessionid, user_ipaddress, refr_domain_userid FROM $tablefr"
.query[(String, String, String, String, String)]
.map(PiiFields.tupled)
.to[List]
.transact(xa)
}

def count(table: String): IO[Int] =
(fr"SELECT count(*) FROM " ++ Fragment.const(table)).query[Int].unique.transact(xa)

Expand Down
Expand Up @@ -62,6 +62,36 @@ class sinkspec extends Database {
}
}

"sink an event with pii enrichment" >> {
val TestPiiFields = PiiFields(
domainUserId = "3abb6677af34ac57c0ca5828fd94f9d886c26ce59a8ce60ecf6778079423dccff1d6f19cb655805d56098e6d38a1a710dee59523eed7511e5a9e4b8ccb3a4686",
networkUserId = "63e22ec2fbeebabf005e58fbfb0eee607c4aa417045a68a0cc63767b048e3559268d35e72f367d3b2dbd5dbddf12fc4397762ba149260b3795a0391713bddcd7",
domainSessionId = "2b59d179d9815994f687383a886ea34109889756efca5ab27318cc67ce2a21261d12fa6fee6b8c716f72214ead55ee0d789d6c35cff977d40ef5728ba9188a80",
userIpAddress = "db545c410fd0c8ede533d5b0666cd2798ba380bd25b655619cd5fd3a33a255569b3ccc319bfdef3322d8392d894d15c2e6aa2d53346e6ac54eaf5d627bfe6a9a",
refrDomainUserId = "29b3573989378848e91465abb8bb12aaad1c40f01ddba6ce5dce4de88d61d49621cd4272bc6f889cd469e9490040b412eb0a237cf2cd49c637da1d5de5903f3d"
)
val line =
"snowplow\tweb\t2021-08-20 15:47:20.975\t2021-08-20 15:47:20.811\t\tpage_view\te0370811-c78d-476e-a8cc-1df51e6c4298\t\t\tno-js-0.1.0\tssc-2.1.2-stdout$\ttry-snowplow-pipeline-0.0.0-c673c2d60db72c-SNAPSHOT-common-2.0.2\texample123\t" + TestPiiFields.userIpAddress + "\t\t" + TestPiiFields.domainUserId + "\t2\t" + TestPiiFields.networkUserId + "\t\t\t\t\t\t\t\t\t\t\t\thttp://example.com/snowplow/snowplow?_sp=305902ac-8d59-479c-ad4c-82d4a2e6bb9c\tRoot README\t\thttp\texample.com\t80\t/snowplow/snowplow\t_sp=305902ac-8d59-479c-ad4c-82d4a2e6bb9c\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\tcurl/7.64.1\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t\t" + TestPiiFields.refrDomainUserId + "\t\t{\"schema\":\"iglu:com.snowplowanalytics.snowplow/contexts/jsonschema/1-0-0\",\"data\":[{\"schema\":\"iglu:com.snowplowanalytics.snowplow/ua_parser_context/jsonschema/1-0-0\",\"data\":{\"useragentFamily\":\"curl\",\"useragentMajor\":\"7\",\"useragentMinor\":\"64\",\"useragentPatch\":\"1\",\"useragentVersion\":\"curl 7.64.1\",\"osFamily\":\"Other\",\"osMajor\":null,\"osMinor\":null,\"osPatch\":null,\"osPatchMinor\":null,\"osVersion\":\"Other\",\"deviceFamily\":\"Other\"}},{\"schema\":\"iglu:nl.basjes/yauaa_context/jsonschema/1-0-2\",\"data\":{\"deviceBrand\":\"Curl\",\"deviceName\":\"Curl\",\"operatingSystemVersionMajor\":\"??\",\"layoutEngineNameVersion\":\"curl 7.64.1\",\"operatingSystemNameVersion\":\"Cloud ??\",\"layoutEngineNameVersionMajor\":\"curl 7\",\"operatingSystemName\":\"Cloud\",\"agentVersionMajor\":\"7\",\"layoutEngineVersionMajor\":\"7\",\"deviceClass\":\"Robot\",\"agentNameVersionMajor\":\"Curl 7\",\"operatingSystemNameVersionMajor\":\"Cloud ??\",\"operatingSystemClass\":\"Cloud\",\"layoutEngineName\":\"curl\",\"agentName\":\"Curl\",\"agentVersion\":\"7.64.1\",\"layoutEngineClass\":\"Robot\",\"agentNameVersion\":\"Curl 7.64.1\",\"operatingSystemVersion\":\"??\",\"agentClass\":\"Robot\",\"layoutEngineVersion\":\"7.64.1\"}}]}\t" + TestPiiFields.domainSessionId + "\t2021-08-20 15:47:20.811\tcom.snowplowanalytics.snowplow\tpage_view\tjsonschema\t1-0-0\t365d7d64ebd2a7f0adae82f6d698365f\t"
val event = Event.parse(line).getOrElse(throw new RuntimeException("Event is invalid"))
val stream = Stream.emit[IO, Data](Data.Snowplow(event))

implicit val D = DB.interpreter[IO](igluClient.resolver, xa, Schema)

val action = for {
state <- State.init[IO](List(), igluClient.resolver)
_ <- stream.through(orderedPipe(Sink.sinkGood(state, igluClient, processor))).compile.drain.action
eventIds <- query.action
resultPiiFields <- queryPiiFields.action
} yield (eventIds, resultPiiFields)

val result = action.value.unsafeRunSync()
val ExpectedEventId = UUID.fromString("e0370811-c78d-476e-a8cc-1df51e6c4298")
result must beRight.like {
case (List(ExpectedEventId), List(TestPiiFields)) => ok
case (ids, resultPiiFields) => ko(s"Unexpected result. Event ids: $ids; pii fields: $resultPiiFields")
}
}

"sink a single self-describing JSON" >> {
val row = json"""{"schema":"iglu:com.getvero/bounced/jsonschema/1-0-0","data":{"bounce_type":"one","bounce_code":null}}"""
val json = SelfDescribingData.parse(row).getOrElse(throw new RuntimeException("Invalid SelfDescribingData"))
Expand Down

0 comments on commit 2c4d9ab

Please sign in to comment.