In [None]:
import org.apache.spark.sql.types.{StringType, StructType, BooleanType}

private val SCHEMA = (new StructType)
    .add("@type", StringType)
    .add("device", (new StructType)
      .add("deviceType", StringType)
    )
    .add("object", (new StructType) 
      .add("@id", StringType) 
      .add("@type", StringType) 
      .add("attachments", StringType)
      .add("body", StringType)
      .add("inReplyTo", StringType)
      .add("isFirstMessage", BooleanType)
      .add("origin", StringType)
      .add("receiverConversationId", StringType)
      .add("senderConversationId", StringType)
      .add("subject", StringType)
      .add("messageType", StringType)
      .add("category", StringType)
      .add("publisher", (new StructType)
          .add("@id", StringType)
      )
    )
    .add("provider", (new StructType)
      .add("@type", StringType)
      .add("component", StringType)
      .add("productType", StringType)
    )
    .add("tracker", (new StructType)
      .add("type", StringType)
      .add("version", StringType)
    )
    .add("account", (new StructType)
      .add("accountId", StringType)
    )
    .add("target", (new StructType)
      .add("accountId", StringType)
    )
    .add("schema", StringType)
    .add("@id", StringType)
    .add("day", StringType)
    .add("hour", StringType)

//val events = spark.read.json("s3://schibsted-spt-common-prod/purple/messaging/client=subito/version=1/year=2017/month=10/day=6/hour=10")
val events = spark.read.schema(SCHEMA).json("s3://schibsted-spt-common-prod/purple/messaging/client=subito/version=1/year=2017/month=10/day=6/hour=10")


events.cache

events.printSchema

In [None]:
import org.apache.spark.sql.functions._

val normalizeCategory = (category: String) => {
    category.split(">").last.toLowerCase.trim
      .replaceAll(" ", "-")
      .replaceAll(",-", "-")
      .replaceAll(",", "-")
      .replaceAll("/", "-")
}

val normalizeMacroCategory = (category: String) => {
    category.split(">")(0).toLowerCase.trim
      .replaceAll(" ", "-")
      .replaceAll(",-", "-")
      .replaceAll(",", "-")
      .replaceAll("/", "-")
}

val normalizeRegion = (locality: String) => {
    locality.split(">")(0).trim
}

val provinceAndCity = (region: String, locality: String) => {
    locality.split(">")(0).trim
}

private val selectAdID = (value: String) => {
    value.toLowerCase()
      .replace("sdrn:subito:classified:classifiedad:id:ad:", "")
      .replace("sdrn:schibsted:classified:", "").replace("sdrn:regresssubito:classified:", "")
      .replace("sdrn:subito:classified:","").replace("sdrn:com.subito.subito:classified:", "")
      .replace("sdrn:subito:phonecontact:","")
      .replace("id%3aad%3a", "").replace("id:ad:","")
      .replace("%3alist%3a","-").replace(":list:", "-")
      .split("-")(0).trim
}

private val selectListID = (value: String) => {
    value.toLowerCase()
      .replace("sdrn:subito:classified:classifiedad:id:ad:", "")
      .replace("sdrn:schibsted:classified:", "").replace("sdrn:regresssubito:classified:", "")
      .replace("sdrn:subito:classified:","").replace("sdrn:com.subito.subito:classified:", "")
      .replace("sdrn:subito:phonecontact:","")
      .replace("id%3aad%3a", "").replace("id:ad:","")
      .replace("%3alist%3a","-").replace(":list:", "-")
      .split("-")(1).trim
}

spark.udf.register("normalize_category", normalizeCategory)
spark.udf.register("normalize_macro_category", normalizeMacroCategory)
spark.udf.register("normalize_region", normalizeRegion)
spark.udf.register("select_ad_id", selectAdID)
spark.udf.register("select_list_id", selectListID)

val messages = events.
    //filter(($"@type" === "Launch")).
    filter($"@type" === "View" && $"object.@type" === "Listing").
    select(col("object.filters.region").alias("region"),
           col("object.filters.query").alias("query"),
           col("object.filters.sorting").alias("sorting"),
           col("object.filters.locality").alias("locality"),
           col("device.deviceType").alias("device_type"),
           col("device.environmentId").alias("environment_id"),
           col("tracker.type").alias("tracker_type"),
           col("tracker.version").alias("tracker_version"),
           col("object.category").alias("category"),
           col("actor.spt:userId").alias("user_id"),
           col("@type").alias("event_type"),
           col("published")).
    cache

messages.createOrReplaceTempView("messages")

In [None]:
val data = messages.
    sqlContext.sql("SELECT count(*) from messages ").
    cache


data.show(1000, false)

In [None]:
val data = messages.
    sqlContext.sql("SELECT * from messages where user_id = 18832846").
    cache


data.show(1000, false)

In [None]:
val data = messages.
    sqlContext.sql("SELECT * from  messages limit 1 ").
    cache


data.show(1000, false)

In [None]:
val data = messages.
    sqlContext.sql("SELECT category, tracker_type, device_type, tracker_version from messages where tracker_type <> 'Android' and category is not null group by category, tracker_type, device_type, tracker_version order by category limit 10000 ").
    cache


data.show(1000, false)