In [37]:
from pyspark.asql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.types import (BooleanType, ByteType, DateType, IntegerType,
                               LongType, ShortType, StringType, StructField,
                               StructType, TimestampType)
from pyspark.sql.window import Window
from itertools import chain

In [2]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName("b2w") \
        .getOrCreate()

In [3]:
df = spark.read \
        .format("json") \
        .option("header", "true") \
        .option("timestampFormat", "dd/MM/yyyy HH:mm:ss") \
        .load("dados_navegacionais")

In [12]:
df.printSchema()

root
 |-- cart_qty: long (nullable = true)
 |-- cart_total_value: double (nullable = true)
 |-- device_type: string (nullable = true)
 |-- freight_delivery_time: long (nullable = true)
 |-- freight_value: double (nullable = true)
 |-- load_timestamp: string (nullable = true)
 |-- page_type: string (nullable = true)
 |-- product_id: long (nullable = true)
 |-- product_unit_price: double (nullable = true)
 |-- search_query: string (nullable = true)
 |-- site_department_id: long (nullable = true)
 |-- url_location: string (nullable = true)
 |-- visit_id: string (nullable = true)



In [17]:
df_steps = df.select("load_timestamp", "visit_id", "page_type").sort("visit_id", "load_timestamp")

In [22]:
df_steps.show(n=10, truncate=False)

+-----------------------+-------------------------------------------+---------+--------------+
|load_timestamp         |visit_id                                   |page_type|passo_anterior|
+-----------------------+-------------------------------------------+---------+--------------+
|2019-02-20 17:01:16.849|1.008219369612927420192031653336-1550692399|product  |0             |
|2019-02-20 17:01:20.967|1.008219369612927420192031653336-1550692399|product  |0             |
|2019-02-20 17:01:33.950|1.008219369612927420192031653336-1550692399|product  |0             |
|2019-02-20 17:08:56.353|1.008219369612927420192031653336-1550692399|product  |0             |
|2019-02-20 17:09:26.401|1.008219369612927420192031653336-1550692399|product  |0             |
|2019-02-20 17:02:58.448|1.035071781647744220191811615858-1550692978|product  |0             |
|2019-02-20 17:03:32.805|1.035071781647744220191811615858-1550692978|product  |0             |
|2019-02-20 17:03:41.211|1.11986394964846882019203

In [28]:
df_count.show(n=50, truncate=False)

+---------+--------------+-----+
|page_type|passo_anterior|count|
+---------+--------------+-----+
|search   |product       |10336|
|product  |search        |12442|
|home     |product       |3467 |
|search   |home          |3100 |
|basket   |product       |2635 |
|payment  |basket        |797  |
|thankyou |payment       |389  |
|product  |thankyou      |244  |
|search   |basket        |811  |
|product  |payment       |360  |
|home     |search        |1598 |
|product  |home          |2276 |
|search   |payment       |161  |
|basket   |search        |345  |
|product  |basket        |1461 |
|payment  |search        |93   |
|basket   |payment       |260  |
|home     |basket        |324  |
|thankyou |product       |98   |
|thankyou |home          |16   |
|home     |thankyou      |104  |
|home     |payment       |127  |
|search   |thankyou      |110  |
|payment  |product       |248  |
|basket   |home          |144  |
|thankyou |basket        |10   |
|basket   |thankyou      |19   |
|payment  

In [14]:
step_window = Window.partitionBy().orderBy("visit_id", "load_timestamp")

In [24]:
df_steps = df_steps.withColumn("passo_anterior", F.when(df_steps.page_type != F.lag(df_steps.page_type).over(step_window), F.lag(df_steps.page_type).over(step_window)).otherwise(None))

In [25]:
df_count = df_steps.groupby("page_type", "passo_anterior").count()

In [33]:
df_count_final = df_count.select(F.col("passo_anterior"), F.col("page_type").alias("passo_atual"), F.col("count")).dropna()

In [35]:
tag_passos = {'home':0, 'search':1, 'product':2, 'basket':3,'payment':4, 'thankyou':5}

In [38]:
mapeamento_tags = F.create_map([F.lit(tag) for tag in chain(*tag_passos.items())])

In [40]:
df_count_final = df_count_final.withColumn('passo_anterior', mapeamento_tags[df_count_final['passo_anterior']])
df_count_final = df_count_final.withColumn('passo_atual', mapeamento_tags[df_count_final['passo_atual']])

In [41]:
df_count_final.show(n=50, truncate=False)

+--------------+-----------+-----+
|passo_anterior|passo_atual|count|
+--------------+-----------+-----+
|2             |1          |10336|
|1             |2          |12442|
|2             |0          |3467 |
|0             |1          |3100 |
|2             |3          |2635 |
|3             |4          |797  |
|4             |5          |389  |
|5             |2          |244  |
|3             |1          |811  |
|4             |2          |360  |
|1             |0          |1598 |
|0             |2          |2276 |
|4             |1          |161  |
|1             |3          |345  |
|3             |2          |1461 |
|1             |4          |93   |
|4             |3          |260  |
|3             |0          |324  |
|2             |5          |98   |
|0             |5          |16   |
|5             |0          |104  |
|4             |0          |127  |
|5             |1          |110  |
|2             |4          |248  |
|0             |3          |144  |
|3             |5   