# Porting of example from http://czep.net/16/session-ids-sql.html to Spark SQL

In [1]:
spark.sql("create or replace temporary view raw_events as ( \
    select * from ( \
        select 111 as user_id, timestamp '2016-05-01 17:00:00' as event_time union all \
        select 111 as user_id, timestamp '2016-05-01 17:01:00' as event_time union all \
        select 111 as user_id, timestamp '2016-05-01 17:02:00' as event_time union all \
        select 111 as user_id, timestamp '2016-05-01 17:03:00' as event_time union all \
        select 222 as user_id, timestamp '2016-05-01 18:00:00' as event_time union all \
        select 333 as user_id, timestamp '2016-05-01 19:00:00' as event_time union all \
        select 333 as user_id, timestamp '2016-05-01 19:10:00' as event_time union all \
        select 333 as user_id, timestamp '2016-05-01 19:20:00' as event_time union all \
        select 333 as user_id, timestamp '2016-05-01 19:30:00' as event_time union all \
        select 333 as user_id, timestamp '2016-05-01 20:01:00' as event_time union all \
        select 333 as user_id, timestamp '2016-05-01 20:02:00' as event_time union all \
        select 444 as user_id, timestamp '2016-05-01 23:01:00' as event_time union all \
        select 444 as user_id, timestamp '2016-05-01 23:21:00' as event_time union all \
        select 444 as user_id, timestamp '2016-05-01 23:59:00' as event_time union all \
        select 444 as user_id, timestamp '2016-05-02 00:01:00' as event_time union all \
        select 444 as user_id, timestamp '2016-05-02 00:21:00' as event_time union all \
        select 444 as user_id, timestamp '2016-05-02 23:59:00' as event_time union all \
        select 444 as user_id, timestamp '2016-05-03 00:05:00' as event_time \
    )\
)")

DataFrame[]

In [2]:
spark.sql("select * from raw_events").show()

+-------+-------------------+
|user_id|         event_time|
+-------+-------------------+
|    111|2016-05-01 17:00:00|
|    111|2016-05-01 17:01:00|
|    111|2016-05-01 17:02:00|
|    111|2016-05-01 17:03:00|
|    222|2016-05-01 18:00:00|
|    333|2016-05-01 19:00:00|
|    333|2016-05-01 19:10:00|
|    333|2016-05-01 19:20:00|
|    333|2016-05-01 19:30:00|
|    333|2016-05-01 20:01:00|
|    333|2016-05-01 20:02:00|
|    444|2016-05-01 23:01:00|
|    444|2016-05-01 23:21:00|
|    444|2016-05-01 23:59:00|
|    444|2016-05-02 00:01:00|
|    444|2016-05-02 00:21:00|
|    444|2016-05-02 23:59:00|
|    444|2016-05-03 00:05:00|
+-------+-------------------+



In [3]:
spark.sql("create or replace temporary view lagged_events as ( \
    select \
        user_id, \
        event_time, \
        lag(event_time) over (partition by date(event_time), user_id order by event_time) as prev \
    from \
        raw_events \
)")

DataFrame[]

In [4]:
spark.sql("select * from lagged_events").show()

+-------+-------------------+-------------------+
|user_id|         event_time|               prev|
+-------+-------------------+-------------------+
|    111|2016-05-01 17:00:00|               null|
|    111|2016-05-01 17:01:00|2016-05-01 17:00:00|
|    111|2016-05-01 17:02:00|2016-05-01 17:01:00|
|    111|2016-05-01 17:03:00|2016-05-01 17:02:00|
|    444|2016-05-01 23:01:00|               null|
|    444|2016-05-01 23:21:00|2016-05-01 23:01:00|
|    444|2016-05-01 23:59:00|2016-05-01 23:21:00|
|    444|2016-05-02 00:01:00|               null|
|    444|2016-05-02 00:21:00|2016-05-02 00:01:00|
|    444|2016-05-02 23:59:00|2016-05-02 00:21:00|
|    222|2016-05-01 18:00:00|               null|
|    444|2016-05-03 00:05:00|               null|
|    333|2016-05-01 19:00:00|               null|
|    333|2016-05-01 19:10:00|2016-05-01 19:00:00|
|    333|2016-05-01 19:20:00|2016-05-01 19:10:00|
|    333|2016-05-01 19:30:00|2016-05-01 19:20:00|
|    333|2016-05-01 20:01:00|2016-05-01 19:30:00|


In [5]:
# we use to_unixtimestamp to compare timestamps in seconds; in this 1800 represents 30 minutes which is used to group
# log events into sessions
spark.sql("create or replace temporary view new_sessions as ( \
    select \
        user_id, \
        event_time, \
        case \
            when prev is null then 1 \
            when to_unix_timestamp(event_time) - to_unix_timestamp(prev) > 1800 then 1 \
            else 0 \
        end as is_new_session \
    from \
        lagged_events \
)")

DataFrame[]

In [6]:
spark.sql("select * from new_sessions").show()

+-------+-------------------+--------------+
|user_id|         event_time|is_new_session|
+-------+-------------------+--------------+
|    111|2016-05-01 17:00:00|             1|
|    111|2016-05-01 17:01:00|             0|
|    111|2016-05-01 17:02:00|             0|
|    111|2016-05-01 17:03:00|             0|
|    444|2016-05-01 23:01:00|             1|
|    444|2016-05-01 23:21:00|             0|
|    444|2016-05-01 23:59:00|             1|
|    444|2016-05-02 00:01:00|             1|
|    444|2016-05-02 00:21:00|             0|
|    444|2016-05-02 23:59:00|             1|
|    222|2016-05-01 18:00:00|             1|
|    444|2016-05-03 00:05:00|             1|
|    333|2016-05-01 19:00:00|             1|
|    333|2016-05-01 19:10:00|             0|
|    333|2016-05-01 19:20:00|             0|
|    333|2016-05-01 19:30:00|             0|
|    333|2016-05-01 20:01:00|             1|
|    333|2016-05-01 20:02:00|             0|
+-------+-------------------+--------------+



In [7]:
spark.sql("create or replace temporary view session_index as ( \
    select \
        user_id, \
        event_time, \
        is_new_session, \
        sum(is_new_session) over (partition by user_id order by event_time rows between unbounded preceding and current row) as session_index \
    from \
        new_sessions \
)")

DataFrame[]

In [8]:
spark.sql("select * from session_index").show()

+-------+-------------------+--------------+-------------+
|user_id|         event_time|is_new_session|session_index|
+-------+-------------------+--------------+-------------+
|    333|2016-05-01 19:00:00|             1|            1|
|    333|2016-05-01 19:10:00|             0|            1|
|    333|2016-05-01 19:20:00|             0|            1|
|    333|2016-05-01 19:30:00|             0|            1|
|    333|2016-05-01 20:01:00|             1|            2|
|    333|2016-05-01 20:02:00|             0|            2|
|    222|2016-05-01 18:00:00|             1|            1|
|    111|2016-05-01 17:00:00|             1|            1|
|    111|2016-05-01 17:01:00|             0|            1|
|    111|2016-05-01 17:02:00|             0|            1|
|    111|2016-05-01 17:03:00|             0|            1|
|    444|2016-05-01 23:01:00|             1|            1|
|    444|2016-05-01 23:21:00|             0|            1|
|    444|2016-05-01 23:59:00|             1|            

In [9]:
# changed:
# concat in place of || operator
# string in place of varchar
spark.sql("\
    select \
        concat(concat(cast(user_id as string), '.'), cast(session_index as string)) as user_id_session_index, \
        user_id, \
        event_time, \
        is_new_session, \
        session_index \
    from \
        session_index \
    order by  \
        user_id, event_time \
").show()

+---------------------+-------+-------------------+--------------+-------------+
|user_id_session_index|user_id|         event_time|is_new_session|session_index|
+---------------------+-------+-------------------+--------------+-------------+
|                111.1|    111|2016-05-01 17:00:00|             1|            1|
|                111.1|    111|2016-05-01 17:01:00|             0|            1|
|                111.1|    111|2016-05-01 17:02:00|             0|            1|
|                111.1|    111|2016-05-01 17:03:00|             0|            1|
|                222.1|    222|2016-05-01 18:00:00|             1|            1|
|                333.1|    333|2016-05-01 19:00:00|             1|            1|
|                333.1|    333|2016-05-01 19:10:00|             0|            1|
|                333.1|    333|2016-05-01 19:20:00|             0|            1|
|                333.1|    333|2016-05-01 19:30:00|             0|            1|
|                333.2|    3