### Preprocessing Google Analytics Dataset (from Google Merchandise Store)

Main goal is testing how is processing data using PySpark for large dataset. We are gonna start handling 1.0 GB, then 7.0 GB and finally we are gonna go all the way through 25.0 GB (more or less). 

Things to keep in mind for this project that I'm going to be doing:

- Exploring the dataset
- Preprocessing for creating cool new features on the way
- Learning
- & Enjoy

#### Context
The Google Merchandise Store sells Google branded merchandise. The data is typical of what you would see for an ecommerce website.

#### Content
The sample dataset contains Google Analytics 360 data from the Google Merchandise Store, a real ecommerce store. The Google Merchandise Store sells Google branded merchandise. The data is typical of what you would see for an ecommerce website. It includes the following kinds of information:

Traffic source data: information about where website visitors originate. This includes data about organic traffic, paid search traffic, display traffic, etc.
Content data: information about the behavior of users on the site. This includes the URLs of pages that visitors look at, how they interact with content, etc.
Transactional data: information about the transactions that occur on the Google Merchandise Store website.

Features/Columns in data:
* fullVisitorId — A unique identifier for each user
* channelGrouping — The channel via which the user came to the Store
* date — The date on which the user visited
* device — The specifications for the device used to access
* geoNetwork — This section contains information about the geography of the user
* socialEngagementType — Engagement type, either “Socially Engaged” or “Not Socially Engaged”
* totals — This section contains aggregate values across the session
* trafficSource — This section contains information about the Traffic Source from which the session originated
* visitId — An identifier for this session. This is part of the value usually stored as the _utmb cookie. This is only unique to the user
* visitNumber — The session number for this user. If this is the first session, then this is set to 1
* visitStartTime — The timestamp (expressed as POSIX time)
* hits — This row and nested fields are populated for any and all types of hits. Provides a record of all page visits

In [1]:
# Import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.window import Window
import pyspark.sql.functions as F
from datetime import datetime
import calendar
import timeit

# Create SparkSession 
spark = SparkSession.builder \
      .master("local[8]") \
      .appName("SparkTraining.com") \
      .getOrCreate() 

In [2]:
df = spark.read.options(header=True, inferSchema = True, quote = '"', escape = '"', multiLine = True).csv("./test.csv")
df.printSchema()

root
 |-- channelGrouping: string (nullable = true)
 |-- date: integer (nullable = true)
 |-- device: string (nullable = true)
 |-- fullVisitorId: decimal(20,0) (nullable = true)
 |-- geoNetwork: string (nullable = true)
 |-- sessionId: string (nullable = true)
 |-- socialEngagementType: string (nullable = true)
 |-- totals: string (nullable = true)
 |-- trafficSource: string (nullable = true)
 |-- visitId: integer (nullable = true)
 |-- visitNumber: integer (nullable = true)
 |-- visitStartTime: integer (nullable = true)



In [3]:
df.select(['date','fullVisitorId','visitId','visitStartTime']).describe().show()

+-------+--------------------+--------------------+-------------------+--------------------+
|summary|                date|       fullVisitorId|            visitId|      visitStartTime|
+-------+--------------------+--------------------+-------------------+--------------------+
|  count|              804684|              804684|             804684|              804684|
|   mean|2.0174964498142127E7|45087500686953208...|1.513338532552003E9|1.5133385335090122E9|
| stddev|   4573.100966819984|3.098113840258274...|  6676000.459978136|   6676000.428767434|
|    min|            20170802|        259678714014|         1501656404|          1501657203|
|    max|            20180430|98911902128782375692|         1525157818|          1525157818|
+-------+--------------------+--------------------+-------------------+--------------------+



#### Manipulating dates

In [4]:
# 1.- Preprocessing dates in PySpark
df2 = df.withColumn("datetime", F.to_date(F.col("date"),"yyyyMMdd"))
df3 = df2.withColumn("year", F.year("datetime"))\
        .withColumn("month", F.month("datetime"))\
        .withColumn("day", F.date_format(F.col("datetime"), "d"))
df3.select(F.col("date"),F.col("datetime"),F.col("year"),F.col("month"),F.col("day")).show(10)

+--------+----------+----+-----+---+
|    date|  datetime|year|month|day|
+--------+----------+----+-----+---+
|20171016|2017-10-16|2017|   10| 16|
|20171016|2017-10-16|2017|   10| 16|
|20171016|2017-10-16|2017|   10| 16|
|20171016|2017-10-16|2017|   10| 16|
|20171016|2017-10-16|2017|   10| 16|
|20171016|2017-10-16|2017|   10| 16|
|20171016|2017-10-16|2017|   10| 16|
|20171016|2017-10-16|2017|   10| 16|
|20171016|2017-10-16|2017|   10| 16|
|20171016|2017-10-16|2017|   10| 16|
+--------+----------+----+-----+---+
only showing top 10 rows



#### Time between Sessions by Visitor

#### Primitive way to calculate time between logins/sessions

In [5]:
# 2.- Time Between Sessions by Visitor
df4 = df3.select(F.col("fullVisitorId"),F.col("datetime"),F.col("year"),F.col("month"),F.col("day")).sort("fullVisitorId", "datetime")
# We are going to ignore if a visitor entered two or more times in a day using .distinct()
df4 = df4.distinct()
df4.show()

+-------------------+----------+----+-----+---+
|      fullVisitorId|  datetime|year|month|day|
+-------------------+----------+----+-----+---+
|1434440881633966770|2017-10-16|2017|   10| 16|
|4012482813988067872|2017-10-16|2017|   10| 16|
|6442523930144577585|2017-10-16|2017|   10| 16|
|6416990411545001239|2017-10-16|2017|   10| 16|
|1994773412357945288|2017-10-16|2017|   10| 16|
|5527875182371598421|2017-10-16|2017|   10| 16|
|  63897075477117625|2017-10-16|2017|   10| 16|
|4392517427630048868|2017-11-30|2017|   11| 30|
| 231355554632373132|2017-11-30|2017|   11| 30|
|1216522698917837803|2017-11-30|2017|   11| 30|
| 209493828541647398|2017-11-30|2017|   11| 30|
| 282872886065230921|2017-11-30|2017|   11| 30|
| 585439608615618519|2017-11-30|2017|   11| 30|
|1129596338043901560|2017-11-30|2017|   11| 30|
|3142691974796850629|2017-11-30|2017|   11| 30|
|6268170711858015260|2017-11-30|2017|   11| 30|
|8519904975907632432|2017-11-30|2017|   11| 30|
|3477015762961788473|2017-11-30|2017|   

In [6]:
start = timeit.default_timer()
# sorting columns
df4d = df4.select(F.col("fullVisitorId").alias("fullVisitorIdD"), 
                  F.col("datetime").alias("datetimeD")).sort("fullVisitorIdD","datetimeD")

# Joining with the same dataframe, getting difference between dates, aggregating data by visitor
df5 = df4.join(df4d,(df4.fullVisitorId == df4d.fullVisitorIdD) & (df4.datetime == F.date_add(df4d.datetimeD, 1)),"left")

df5 = df5.withColumn("diff_days", F.datediff(F.col("datetime"),F.col("datetimeD")))

df5.groupBy("fullVisitorId").agg(F.sum("diff_days").alias("sum_diff_days"),\
                                F.avg("diff_days").alias("avg_diff_days")).show(10)

stop = timeit.default_timer()
print("Program Executed in "+str(stop - start))

+-----------------+-------------+-------------+
|    fullVisitorId|sum_diff_days|avg_diff_days|
+-----------------+-------------+-------------+
| 4499797999887844|            1|          1.0|
|22649038761799046|            1|          1.0|
|23869313799981694|            1|          1.0|
|34710606159448409|         null|         null|
|45431075147774863|         null|         null|
|49173489736183166|         null|         null|
|59005825026777770|         null|         null|
|64202398468545145|         null|         null|
|71866025174152017|         null|         null|
|73166438954880919|         null|         null|
+-----------------+-------------+-------------+
only showing top 10 rows

Program Executed in 14.542431500000006


#### Cooler way to calculate time between logins/sessions (using Window Functions)

In [7]:
# first step: use a famous lag window function (we can do the same with lead)
start = timeit.default_timer()
windowSpec = Window.partitionBy("fullVisitorId").orderBy(F.col("datetime").asc())
df5_cooler = df4.withColumn("lag",F.lag("datetime",1).over(windowSpec))

df5_cooler = df5_cooler.withColumn("diff_days", F.datediff(F.col("datetime"),F.col("lag")))

df5_cooler.groupBy("fullVisitorId").agg(F.sum("diff_days").alias("sum_diff_days"),\
                                F.avg("diff_days").alias("avg_diff_days")).show(10)


stop = timeit.default_timer()
print("Program Executed in "+str(stop - start))

+---------------+-------------+-------------+
|  fullVisitorId|sum_diff_days|avg_diff_days|
+---------------+-------------+-------------+
| 53049821714864|         null|         null|
| 95085510298525|         null|         null|
|106364568517340|            1|          1.0|
|118334805178127|         null|         null|
|130646294093000|         null|         null|
|133092152601524|         null|         null|
|141778304660879|         null|         null|
|155706826600807|         null|         null|
|227122480450743|         null|         null|
|248459207073320|         null|         null|
+---------------+-------------+-------------+
only showing top 10 rows

Program Executed in 11.944977799999997


#### Let's get number of sessions in the last 3,6 and 9 months (static picture)
In this exercise we are going through the calculation of a static picture for our visitors, this kind of data could help for supporting day to day operations rather than an analytical process.

In [8]:
# We can create a dataframe with sessions per visitor for different months,
# and from that move into 3, 6, and 9 months from the last day in our dataset (Static Picture = SP)
dfSP = df4.withColumn("month_name", F.date_format(F.col("datetime"), "MMMM"))\
            .withColumn("aux", F.lit(1))\
            .groupBy("fullVisitorId").pivot("month").sum("aux").na.fill(value=0)

# Creating new column names to replace month 
newColumns = ['fullVisitorId']+[calendar.month_abbr[int(col)] for col in dfSP.columns[1:]]
dfSP = dfSP.toDF(*newColumns)
dfSP.show()

+-------------------+---+---+---+---+---+---+---+---+---+
|      fullVisitorId|Jan|Feb|Mar|Apr|Aug|Sep|Oct|Nov|Dec|
+-------------------+---+---+---+---+---+---+---+---+---+
| 428679113694403880|  0|  0|  2|  0|  0|  0|  0|  0|  0|
|1909885162226675764|  0|  1|  1|  1|  0|  0|  1|  4|  0|
|3027759290441300182|  0|  0|  0|  0|  0|  0|  0|  0|  1|
| 354105861304045159|  0|  0|  0|  0|  0|  1|  0|  0|  0|
|3570064048664276854|  0|  0|  0|  0|  0|  1|  0|  0|  0|
| 846336123810068918|  0|  0|  0|  3|  0|  0|  0|  0|  0|
|  12602602100442033|  0|  1|  0|  0|  0|  0|  0|  0|  0|
|5969364060748176916|  0|  0|  1|  0|  0|  0|  0|  0|  0|
|7867912365879879182|  0|  0|  0|  0|  1|  0|  0|  0|  0|
|6731636299042416168|  0|  0|  0|  0|  0|  0|  0|  1|  0|
|4193161673322046819|  4|  0|  0|  0|  0|  0|  0|  0|  0|
|4860229664741834970|  0|  0|  0|  0|  0|  0|  0|  0|  1|
| 243543595910373348|  0|  0|  1|  0|  0|  0|  0|  0|  0|
|6434529131595737032|  0|  1|  1|  0|  0|  0|  0|  0|  0|
| 312978540207

In [None]:
# Now we can get last 3, 6, and 9 months just adding new columns to our new dataset
cols_list3M = dfSP.columns[len(dfSP.columns)-3:len(dfSP.columns)]
cols_list6M = dfSP.columns[len(dfSP.columns)-3:len(dfSP.columns)]
cols_list9M = dfSP.columns[len(dfSP.columns)-3:len(dfSP.columns)]

# Creating an addition expression using `join`
expression3M = '+'.join(cols_list3M)
expression6M = '+'.join(cols_list6M)
expression9M = '+'.join(cols_list9M)

# Adding the columns for 3, 6, and 9 months
dfSP = dfSP.withColumn("last_3m", F.expr(expression3M))\
    .withColumn("last_6m", F.expr(expression6M))\
    .withColumn("last_9m", F.expr(expression9M))

dfSP.show()

#### Let's get number of sessions within 1, 3 and 5 months (dinamic picture)
In this exercise we are going to perform a calculation where the main goal is determine how many sessions a visitor has from the last interaction, counting the number of sessions since that last interaction to 1, 3, and 5 months before.