This notebook ingests the last 2 days of photon stream, transforms it, and writes it back out as parquet. There are two versions of the schema in the stream which can be identified by inspecting the record.

In [2]:
spark.table('photon.photon_raw_partitioned').printSchema()

In [3]:
import datetime as dt
number_of_days_to_load = 3
def days_to_load_fuction():
  return range(0,number_of_days_to_load)[-1]*(-1)

sqlContext.registerFunction('days_to_load',days_to_load_fuction)

files_to_load = []

for days in range(0,number_of_days_to_load):
  file_date = dt.date.today()+dt.timedelta(days=-1*days)
  file = "mnt/cc-data-lake/1-raw/photon/raw/%s/%s/%s/*" % (file_date.year,file_date.strftime('%m'), file_date.strftime('%d'))
  files_to_load.append(file)
  
files_to_load

In [4]:
from pyspark.sql.functions import *

df = spark.read.json(files_to_load).withColumn("date", to_date(col("addedUtc")))

In [5]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from datetime import *
from hashlib import sha256
from k2databricks import k2
import os, re


def merge_stream_into_table(input_df, output_table, dedupe_columns, validations, rows_per_split):
  """
    Takes input data that may span multiple target partitions, dedupes it against existing data in the partitions, and
    loads the diff into the partitions
  """
  def write_log(s):
    print("%s %s: %s" % (datetime.now(), output_table, s))
  
  input_df = input_df.cache()
  input_row_count = input_df.count()
  # find unique list of dates in the input stream, to determine which target partitions data will be loaded into.
  partition_dates = input_df.selectExpr("collect_list(DISTINCT date)").collect()[0][0]
  write_log("Found %s input records for partit0ions %s" % (input_row_count, ", ".join([str(d) for d in sorted(partition_dates)])))

  #validate & scrub the input data
  valid_df = input_df
  for validation in validations:
    valid_df = valid_df.filter(validation)

  valid_row_count = valid_df.count()
  write_log("Data validation scrubbed %s records" % (input_row_count - valid_row_count))

  # dedupe the input against itself first
  dedupe_df = valid_df.dropDuplicates(dedupe_columns)
  #dedupe_df = input_df
  dedupe_row_count = dedupe_df.count()
  write_log("Deduped %s records within input stream" % (valid_row_count - dedupe_row_count))
  
  # load any existing data from the target partitions, then dedupe the input against it
  existing_df = spark.table(output_table).filter(col("date").isin(partition_dates))
  delta_df = dedupe_df \
    .join(existing_df, dedupe_columns, "leftanti") \
    .selectExpr(existing_df.columns) \
    .cache()
  
  output_row_count = delta_df.count()
  delta_row_count = dedupe_row_count - output_row_count
  write_log("Deduped %s records against existing partition data" % (delta_row_count))
  if output_row_count:
    # compute the number of splits based upon how many rows remain following the dedupe process
    num_splits = 1 + output_row_count / rows_per_split
    write_log("Loading %s records using %s splits" % (output_row_count, num_splits))
    
    spark.conf.set("hive.exec.dynamic.partition.mode", "nonstrict")
    delta_df.repartition(num_splits).write.mode("append").save(output_table)
  else:
    write_log("No new records to load after dedupe was performed")
    
  input_df.unpersist()
  delta_df.unpersist()
  


In [6]:
merge_stream_into_table(df, 'photon.photon_raw_partitioned', ['addedUtc','cookieId'], [], 100000)

In [7]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
import urlparse
import httpagentparser

def parse_query_string(url):
  query_string = urlparse.urlsplit(url).query
  return dict(urlparse.parse_qsl(query_string))

def parse_domain(url):
  try:
    return urlparse.urlparse(url).netloc
  except:
    return None

def parse_user_agent(ua):
  parsed = httpagentparser.detect(ua)
  return {
    "bot": parsed.get("bot"),
    "browser": parsed.get("browser"),
    "os": parsed.get("platform")
  }

parse_user_agent_schema = StructType([
  StructField("bot", BooleanType()),
  StructField("browser", StructType([
    StructField("name", StringType()),
    StructField("version", StringType())
  ])),
  StructField("os", StructType([
    StructField("name", StringType()),
    StructField("version", StringType())
  ])),
])

sqlContext.registerFunction("parse_query_string", parse_query_string, MapType(StringType(), StringType()))
sqlContext.registerFunction("parse_domain", parse_domain, StringType())
sqlContext.registerFunction("parse_user_agent", parse_user_agent, parse_user_agent_schema)

In [8]:
%sql
CREATE OR REPLACE TEMP VIEW photon
AS

WITH
photon_live_hits AS (
  SELECT
    * 
  FROM photon.photon_raw_partitioned
  WHERE type = 'hit'
  --AND date(addedUtc) >= '2018-07-01'
  AND date(addedUtc) >= date_add(current_date,days_to_load())
),

photon_v1 AS (
  SELECT * FROM photon_live_hits WHERE data.url IS NOT NULL
),

photon_v2 AS (
  SELECT * FROM photon_live_hits WHERE data.url IS NULL AND data.serverReferrer IS NOT NULL
),

normalized AS (
  SELECT
    addedUtc,
    cookieId,
    struct(
      data.url AS page_url,
      boolean(NULL) AS page_url_verified_by_photon,
      string(NULL) AS referring_url,
      string(NULL) AS full_url,
      parse_query_string(data.url) AS query_params,
      parse_domain(data.url) AS domain
    ) AS url,
    parse_user_agent(data.userAgent) AS parsed_user_agent,
    data
  FROM photon_v1

  UNION ALL

  SELECT
    addedUtc,
    cookieId,
    struct(
      data.serverReferrer AS page_url,
      data.urlVerified AS page_url_verified_by_photon,
      data.clientReferrer AS referring_url,
      data.clientUrl AS full_url,
      parse_query_string(data.serverReferrer) AS query_params,
      parse_domain(data.serverReferrer) AS domain
    ) AS url,
    parse_user_agent(data.userAgent) AS parsed_user_agent,
    data
  FROM photon_v2
),

transformed AS (
  SELECT
    date(addedUtc) AS date,
    addedUtc AS activity_time,
    cookieId AS cc_cookie_id,
    data.ip AS ip,
    url,
    struct(
      data.platform,
      data.colorDepth AS color_depth,
      data.screenHeight AS screen_height,
      data.screenWidth AS screen_width,
      parsed_user_agent.bot AS self_identified_as_bot,
      parsed_user_agent.browser,
      parsed_user_agent.os
    ) AS device,
    struct(
      url.query_params["utm_campaign"] AS campaign,
      url.query_params["utm_source"] AS source,
      url.query_params["utm_content"] AS content,
      url.query_params["utm_medium"] AS medium,
      url.query_params["utm_term"] AS term
    ) AS utm,
    data
  FROM normalized
)

SELECT * FROM transformed;

In [9]:
#old code to write the entire table
#spark.table("photon").write.partitionBy("date").saveAsTable("photon.photon", format="orc", mode="overwrite")

In [10]:
merge_stream_into_table(spark.table('photon'), 'photon.photon', ['activity_time','cc_cookie_id'], [], 100000)
#merge_stream_into_table(input_df, output_table, dedupe_columns, validations, rows_per_split)