#Data Prep Google Analytics 360
###Data: 20/Abr/21
####Author: Ismael R.

In [0]:
from pyspark.sql.functions import udf, col, explode, array, sum, count, avg, to_timestamp, date_format, concat, lit
from pyspark.sql.types import StructField, StructType, StringType, LongType, IntegerType

In [0]:
#Data Lake Configurations and Environment variables
DATALAKE = "INSERT YOUR DATALAKE NAME"
CONTAINER = "INSERT OUR CONTAINER NAME"
MOUNT_POINT = "/mnt/adlsqa-dataset"
KEY = "INSERT YOUR ACCESS KEY"
_connector = f"fs.azure.account.key.{DATALAKE}.blob.core.windows.net"
_source = f"wasbs://{CONTAINER}@{DATALAKE}.blob.core.windows.net"
_extra_configs = {_connector: KEY}
DATA_FILE = f"{MOUNT_POINT}/categories/data.json"
PRODUCT_FILE = f"{MOUNT_POINT}/products.parquet"
HITS_FILE = f"{MOUNT_POINT}/hits.parquet"
TRANSACTION_FILE = f"{MOUNT_POINT}/transaction.parquet"

In [0]:
#Data Lake Configurations and Environment variables
DATALAKE = "neotrustadlsqa"
CONTAINER = "datasets"
MOUNT_POINT = "/mnt/neotrustadlsqa-dataset"
KEY = "CP94sn4EhA+80YSWHQdYYRpDMmlrYUuYX+CqciSV7GxcTgPA/M/W9NhVhxDiKO3bgSLTQOrGjuwmBcpekUKZkw=="
_connector = f"fs.azure.account.key.{DATALAKE}.blob.core.windows.net"
_source = f"wasbs://{CONTAINER}@{DATALAKE}.blob.core.windows.net"
_extra_configs = {_connector: KEY}
DATA_FILE = f"{MOUNT_POINT}/categories/data.json"
PRODUCT_FILE = f"{MOUNT_POINT}/products.parquet"
HITS_FILE = f"{MOUNT_POINT}/hits.parquet"
TRANSACTION_FILE = f"{MOUNT_POINT}/transaction.parquet"

In [0]:
def mount_datalake():
  """
    This function mounts the data lake storage on databricks
  """
  if not any(mount.mountPoint == MOUNT_POINT for mount in dbutils.fs.mounts()):
    dbutils.fs.mount(source=_source, mount_point=MOUNT_POINT, extra_configs=_extra_configs)
    print("The mount point is ready.")
  else:
    print("The mount point is already mounted.")

def unmount_datalake():
  """
    This function unmounts the data lake mount point
  """
  mp = [mount.mountPoint for mount in self.dbutils.fs.mounts()]

  if MOUNT_POINT in mp:
    self.dbutils.fs.unmount(MOUNT_POINT)
    print(f"The mount point {MOUNT_POINT} is now unmounted.")
  else:
    print(f"The mount point {MOUNT_POINT} is already unmounted.")

In [0]:
def load_dataset(filename, delimiter=','):
    """
      Load a json file, csv, delta or parquet as a spark Dataframe
    """
    filetype = filename.split('.')[-1]
    dataframe = None
    message = "No data"
    try:
        if filetype == 'csv':
            dataframe = spark.read.format('csv').options(header='true').options(delimiter=delimiter).load(filename)
            message = f"File: {filename}\n# of records: {dataframe.count()}\ncolumns:{dataframe.columns}"
        elif filetype == 'json':
                dataframe = spark.read.json(filename)
                message = f"File: {filename}\n# of records: {dataframe.count()}\ncolumns:{dataframe.columns}"
        elif filetype == 'delta':
                dataframe = spark.read.format('delta').load(filename)
                message = f"File: {filename}\n# of records: {dataframe.count()}\ncolumns:{dataframe.columns}"
        else:
            dataframe = spark.read.parquet(filename)
            message = f"File: {filename}\n# of records: {dataframe.count()}\ncolumns:{dataframe.columns}"
    except:
        message = f"Unable to read File: {filename}"
        print(message)
    return dataframe

In [0]:
mount_datalake()

In [0]:
df = load_dataset(DATA_FILE)
print(f"# of Records: {df.count()}")

In [0]:
print(df.printSchema())

The sum of all Pageviews

In [0]:
df.select("totals.pageviews").withColumn("pageviews", col("pageviews").cast(IntegerType())).groupBy().agg(sum('pageviews').alias("Sum of pageviews")).display()

Sum of pageviews
35737


The number of visits/sessions per user.

In [0]:
df.select(col("fullVisitorID").alias("Unique Visitor ID"),"visitId").groupBy("Unique Visitor ID").agg(count("visitId").alias("# sessions / user")).display()

Unique Visitor ID,# sessions / user
524897314891908894,1
3973742615068285577,1
5162435414990636261,1
1785355091652511358,1
1254397940815590467,1
7416689135354972628,1
2874554627333898075,1
6736851098864427390,1
1502981305148432638,1
1921879989553252986,1


Unique Sessions per day

In [0]:
df.select(date_format(to_timestamp(col("date"),'yyyymmdd'),'yyyy-mm-dd').alias("date"),"fullVisitorID","visitId").groupBy("date").count().withColumnRenamed("count"," Sessions per day").display()

date,Sessions per day
2016-08-08,28
2017-04-30,15
2017-05-04,26
2017-01-27,18
2016-10-15,24
2016-12-21,25
2017-04-16,15
2017-05-17,35
2017-01-12,25
2017-01-02,8


Session duration Average in seconds per day

In [0]:
df.select(date_format(to_timestamp(col("date"),'yyyymmdd'),'yyyy-mm-dd').alias("date"),"totals.timeOnSite").groupBy("date").agg(avg("timeOnSite").alias("avg session time (sec)")).display()

The number of visits per day per browser.

In [0]:
df.select(date_format(to_timestamp(col("date"),'yyyymmdd'),'yyyy-mm-dd').alias("date"),"device.browser","visitId").groupBy("date","browser").agg(count("visitId").alias("# sessions / day")).display()

date,browser,# sessions / day
2017-01-06,Chrome,12
2017-01-12,Chrome,21
2017-02-25,Chrome,20
2017-07-15,Chrome,12
2017-05-06,Safari,7
2016-11-05,Chrome,14
2017-01-19,Chrome,16
2017-07-01,Android Webview,2
2017-02-06,Chrome,23
2017-02-10,Chrome,20


Creating Product Dataset

In [0]:
exploded = df.select(explode("hits.product"))
products = exploded.select(explode('col'))
products = products.filter(products.col.isNotNull())

In [0]:
product_dataset = products.select("col.productSKU","col.v2ProductName","col.v2ProductCategory","col.productPrice").dropDuplicates(["productSKU"])

Saving the dataset as parquet

In [0]:
product_dataset.write.mode("overwrite").parquet(PRODUCT_FILE)
PRODUCT_FILE

In [0]:
product_dataset.display()

productSKU,v2ProductName,v2ProductCategory,productPrice
9180748,Android Lunch Kit,(not set),0
9180749,Android Glass Water Bottle with Black Sleeve,(not set),0
9180750,Android 24 oz Contigo Bottle,(not set),0
9180751,Android 24 oz Contigo Bottle,(not set),0
9180752,Android 24 oz Contigo Bottle,(not set),0
9180755,Android Sticker Sheet Ultra Removable,(not set),0
9180756,Windup Android,(not set),0
9180757,Yoga Block,(not set),0
9180758,Straw Beach Mat,(not set),0
9180759,Google Lunch Bag,(not set),0


Creating the hits dataset

In [0]:
exp_hits = df.select("fullVisitorId","visitId","date",explode('hits'))
hits = exp_hits.filter(exp_hits.col.isNotNull())

In [0]:
hits = hits.select("fullVisitorId","visitId", to_timestamp(concat('date',lit(' '),'col.hour',lit(':'),'col.minute'), 'yyyyMMdd H:m').alias('date'),'col.hitNumber','col.page.pagePath','col.eventInfo.*')

In [0]:
hits.write.mode("overwrite").parquet(HITS_FILE)
HITS_FILE

In [0]:
hits.display()

fullVisitorId,visitId,date,hitNumber,pagePath,eventAction,eventCategory,eventLabel
5162435414990636261,1494216917,2017-05-07T21:15:00.000+0000,1,/home,,,
5162435414990636261,1494216917,2017-05-07T21:15:00.000+0000,2,/google+redesign/apparel/mens/mens+outerwear,,,
5162435414990636261,1494216917,2017-05-07T21:16:00.000+0000,3,/google+redesign/apparel/mens/mens+outerwear,Quickview Click,Enhanced Ecommerce,Google Men's Zip Hoodie
5162435414990636261,1494216917,2017-05-07T21:16:00.000+0000,4,/google+redesign/apparel/mens/mens+outerwear/quickview,,,
5162435414990636261,1494216917,2017-05-07T21:16:00.000+0000,5,/google+redesign/apparel/mens/mens+outerwear,Quickview Click,Enhanced Ecommerce,Google Men's Zip Hoodie
5162435414990636261,1494216917,2017-05-07T21:16:00.000+0000,6,/google+redesign/apparel/mens/mens+outerwear/quickview,,,
5162435414990636261,1494216917,2017-05-07T21:16:00.000+0000,7,/google+redesign/apparel/mens/mens+outerwear,Quickview Click,Enhanced Ecommerce,Google Men's Zip Hoodie
5162435414990636261,1494216917,2017-05-07T21:16:00.000+0000,8,/google+redesign/apparel/mens/mens+outerwear/quickview,,,
5162435414990636261,1494216917,2017-05-07T21:16:00.000+0000,9,/google+redesign/apparel/mens/mens+outerwear,Quickview Click,Enhanced Ecommerce,Google Men's Zip Hoodie
5162435414990636261,1494216917,2017-05-07T21:16:00.000+0000,10,/google+redesign/apparel/mens/mens+outerwear/quickview,,,


Creating the transaction dataset and relates the hits events to product dataset.

In [0]:
exp_transactions = df.select("fullVisitorId","visitId","date",explode('hits'))
transactions = exp_transactions.filter(exp_transactions.col.isNotNull())

In [0]:
transactions = transactions.select("fullVisitorId","visitId", to_timestamp(concat('date',lit(' '),'col.hour',lit(':'),'col.minute'), 'yyyyMMdd H:m').alias('date'),'col.hitNumber','col.product',"col.transaction")

In [0]:
transactions = transactions.filter(transactions.product.isNotNull())
transactions = transactions.select("fullVisitorId","visitId","date","hitNumber",explode('product'),"transaction")
hits_products = transactions.select("fullVisitorId","visitId","date","hitNumber",'col.productSKU','col.productPrice',"transaction.transactionId",
                    "transaction.transactionRevenue","transaction.transactionShipping","transaction.transactionTax","transaction.currencyCode")

In [0]:
hits_products.write.mode("overwrite").parquet(TRANSACTION_FILE)
TRANSACTION_FILE

In [0]:
hits_products.display()

fullVisitorId,visitId,date,hitNumber,productSKU,productPrice,transactionId,transactionRevenue,transactionShipping,transactionTax,currencyCode
5162435414990636261,1494216917,2017-05-07T21:15:00.000+0000,2,GGOEGAAX0313,31990000,,,,,USD
5162435414990636261,1494216917,2017-05-07T21:15:00.000+0000,2,GGOEGAAX0358,44790000,,,,,USD
5162435414990636261,1494216917,2017-05-07T21:15:00.000+0000,2,GGOEGAAX0359,44790000,,,,,USD
5162435414990636261,1494216917,2017-05-07T21:15:00.000+0000,2,GGOEGAAX0362,41590000,,,,,USD
5162435414990636261,1494216917,2017-05-07T21:15:00.000+0000,2,GGOEGAAX0568,87990000,,,,,USD
5162435414990636261,1494216917,2017-05-07T21:15:00.000+0000,2,GGOEGAAX0592,55990000,,,,,USD
5162435414990636261,1494216917,2017-05-07T21:15:00.000+0000,2,GGOEGAAX0593,55990000,,,,,USD
5162435414990636261,1494216917,2017-05-07T21:15:00.000+0000,2,GGOEGAAX0598,79190000,,,,,USD
5162435414990636261,1494216917,2017-05-07T21:15:00.000+0000,2,GGOEGAAX0595,59990000,,,,,USD
5162435414990636261,1494216917,2017-05-07T21:15:00.000+0000,2,GGOEGAAX0596,59990000,,,,,USD
