In [None]:
#!pip install pyspark
#!pip install configparser

In [1]:
import os
import pyspark
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages com.amazonaws:aws-java-sdk-pom:1.10.34,org.apache.hadoop:hadoop-aws:2.7.2 pyspark-shell'

In [2]:
import configparser
config = configparser.ConfigParser()
#config.read(os.path.expanduser("~/.aws/credentials"))
#aws_profile='default'
#access_id = config.get(aws_profile, "aws_access_key_id") 
#access_key = config.get(aws_profile, "aws_secret_access_key")

In [3]:
from pyspark import SparkContext
from pyspark.sql import SparkSession

sc = SparkContext()
spark = SparkSession.builder.master("local").appName("exploratory").getOrCreate()

In [4]:
sc=spark.sparkContext
#hadoop_conf=sc._jsc.hadoopConfiguration()
#hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
#hadoop_conf.set("fs.s3n.awsAccessKeyId", access_id)
#hadoop_conf.set("fs.s3n.awsSecretAccessKey", access_key)

In [5]:
filePath = "./courses.json"
df = spark.read.json(filePath)

In [6]:
df.show()

+-------+--------------------+
|     Id|                Name|
+-------+--------------------+
|1199555| Engenharia Elétrica|
|1199521|Economia / Ciênci...|
|1199517|             Direito|
|1199491| Ciências Ambientais|
|1199573|  Engenharia Química|
|1199553|Engenharia de Pro...|
|1199536|Engenharia Ambiental|
|1199725|             Química|
|1199453|       Administração|
|1199701|         Odontologia|
|1199461|           Agronomia|
|1199741|      Serviço Social|
|1199532|Engenharia Aeroná...|
|1199544|    Engenharia Civil|
|1199724|Comunicação Socia...|
|1199704|           Pedagogia|
|1199699|            Nutrição|
|6495411|      Gestão Pública|
|1199687|            Medicina|
|1199734|Relações Internac...|
+-------+--------------------+
only showing top 20 rows



In [7]:
df.count()

15000000

In [8]:
anonymous_not_null = df.filter(df["anonymous_id"] != "")

In [9]:
anonymous_not_null.count()

14999959

In [10]:
df.filter("browser_family = ''" or "device_family = ''" or "os_family = ''").count()

0

In [17]:
from pyspark.sql.types import TimestampType
from pyspark.sql.functions import from_unixtime
import pyspark.sql.functions as f


df = df.withColumn('ts_click', from_unixtime(df.device_sent_timestamp / 1000, "yyyy-MM-dd HH:mm:ss:SSS"))

+--------------------+--------------+-------------+---------------------+----------------+---------+--------------------+
|        anonymous_id|browser_family|device_family|device_sent_timestamp|            name|os_family|            ts_click|
+--------------------+--------------+-------------+---------------------+----------------+---------+--------------------+
|90cb72a1-e6be-439...|         Other|        Other|           1545677124| RankingRendered|    Other|1970-01-18 18:21:...|
|75da4f56-c6ec-48a...|         Other|        Other|           1545677124|ShowcaseRendered|    Other|1970-01-18 18:21:...|
|0310725C-4468-444...|         Other|        Other|           1550713100| RankingRendered|    Other|1970-01-18 19:45:...|
|21ACF4C8-2861-41D...|         Other|        Other|           1552768774|  RankingClicked|    Other|1970-01-18 20:19:...|
|21ACF4C8-2861-41D...|         Other|        Other|           1552768774| ListingRendered|    Other|1970-01-18 20:19:...|
|E295D243-6E64-45D...|  

In [19]:
df.printSchema()

root
 |-- anonymous_id: string (nullable = true)
 |-- browser_family: string (nullable = true)
 |-- device_family: string (nullable = true)
 |-- device_sent_timestamp: long (nullable = true)
 |-- name: string (nullable = true)
 |-- os_family: string (nullable = true)
 |-- ts_click: string (nullable = true)



In [20]:
# Removing null anonymous_id, it doesn't make sense to count sessions into nonexistent keys/users.
df = df.where(df.anonymous_id != '')

In [21]:
df.createOrReplaceTempView("df")

spark.sql("""
WITH click_series AS (
    SELECT
        df.*,
        LAG(device_sent_timestamp) OVER(PARTITION BY anonymous_id ORDER BY device_sent_timestamp) AS click_next
    FROM df
)

SELECT
    cs.anonymous_id,
    cs.browser_family,
    cs.device_family,
    cs.device_sent_timestamp,
    cs.name,
    cs.os_family,
    SUM(CASE WHEN cs.device_sent_timestamp - cs.click_next < 30 * 60 * 1000 THEN 0 ELSE 1 END) 
        OVER(PARTITION BY cs.anonymous_id ORDER BY cs.click_next) AS session
FROM click_series AS cs""").show()

+--------------------+--------------+-------------+---------------------+--------------------+----------+-------+
|        anonymous_id|browser_family|device_family|device_sent_timestamp|                name| os_family|session|
+--------------------+--------------+-------------+---------------------+--------------------+----------+-------+
|0002c21c-316b-4f1...| Mobile Safari|       iPhone|        1554812413440|     ListingRendered|       iOS|      1|
|0002c21c-316b-4f1...| Mobile Safari|       iPhone|        1554812413517|    ShowcaseRendered|       iOS|      1|
|0002c21c-316b-4f1...| Mobile Safari|       iPhone|        1554812413529|    ShowcaseRendered|       iOS|      1|
|0002c21c-316b-4f1...| Mobile Safari|       iPhone|        1554812414110|            pageview|       iOS|      1|
|0002c21c-316b-4f1...| Mobile Safari|       iPhone|        1554812414506|Visit Property De...|       iOS|      1|
|00071d44-ddfa-4d4...|       Firefox|        Other|        1554847461485|            pag

In [22]:
from pyspark.sql.window import Window
import pyspark.sql.functions as f

df_lag = df.withColumn('click_next',
                        f.lag(df.device_sent_timestamp)
                                 .over(Window.partitionBy("anonymous_id").orderBy("device_sent_timestamp")))
df_sessions = df_lag.withColumn('session',
                        f.sum(f.when((df_lag.device_sent_timestamp - df_lag.click_next < 30 * 60 * 1000), 1).otherwise(0)) \
                                 .over(Window.partitionBy("anonymous_id").orderBy("click_next")))
df_sessions.show()

+--------------------+--------------+-------------+---------------------+--------------------+----------+--------------------+-------------+-------+
|        anonymous_id|browser_family|device_family|device_sent_timestamp|                name| os_family|            ts_click|   click_next|session|
+--------------------+--------------+-------------+---------------------+--------------------+----------+--------------------+-------------+-------+
|0002c21c-316b-4f1...| Mobile Safari|       iPhone|        1554812413440|     ListingRendered|       iOS|2019-04-09 09:20:...|         null|      0|
|0002c21c-316b-4f1...| Mobile Safari|       iPhone|        1554812413517|    ShowcaseRendered|       iOS|2019-04-09 09:20:...|1554812413440|      1|
|0002c21c-316b-4f1...| Mobile Safari|       iPhone|        1554812413529|    ShowcaseRendered|       iOS|2019-04-09 09:20:...|1554812413517|      2|
|0002c21c-316b-4f1...| Mobile Safari|       iPhone|        1554812414110|            pageview|       iOS|2

In [23]:
spark.sql("""
WITH click_series AS (
    SELECT
        df.*,
        LAG(device_sent_timestamp) OVER(PARTITION BY anonymous_id ORDER BY device_sent_timestamp) AS click_next
    FROM df
),
base AS (
SELECT
    cs.anonymous_id,
    cs.device_sent_timestamp,
    cs.ts_click,
    SUM(CASE WHEN cs.device_sent_timestamp - cs.click_next < 30 * 60 * 1000 THEN 0 ELSE 1 END) 
        OVER(PARTITION BY cs.anonymous_id ORDER BY cs.click_next) AS session
FROM click_series AS cs
WHERE ts_click IS NOT NULL)
SELECT * FROM base WHERE session > 1
ORDER BY 1
""").show()

+--------------------+---------------------+--------------------+-------+
|        anonymous_id|device_sent_timestamp|            ts_click|session|
+--------------------+---------------------+--------------------+-------+
|00097a4b-a4d7-4db...|        1554877642520|2019-04-10 03:27:...|      2|
|000C9C12-18B8-4A6...|        1554778843573|2019-04-09 00:00:...|      2|
|000C9C12-18B8-4A6...|        1554778842104|2019-04-09 00:00:...|      2|
|000C9C12-18B8-4A6...|        1554778811386|2019-04-09 00:00:...|      2|
|000C9C12-18B8-4A6...|        1554778841216|2019-04-09 00:00:...|      2|
|000C9C12-18B8-4A6...|        1554778780612|2019-04-08 23:59:...|      2|
|000C9C12-18B8-4A6...|        1554778809686|2019-04-09 00:00:...|      2|
|000C9C12-18B8-4A6...|        1554778810108|2019-04-09 00:00:...|      2|
|000C9C12-18B8-4A6...|        1554778811897|2019-04-09 00:00:...|      2|
|000C9C12-18B8-4A6...|        1554778841049|2019-04-09 00:00:...|      2|
|000C9C12-18B8-4A6...|        15547787

In [29]:
df = spark.sql("""
WITH click_series AS (
    SELECT
        df.*,
        LAG(device_sent_timestamp) OVER(PARTITION BY anonymous_id ORDER BY device_sent_timestamp) AS click_next
    FROM df
)

SELECT
    cs.anonymous_id,
    cs.browser_family,
    cs.device_family,
    cs.os_family,
    cs.name,
    cs.device_sent_timestamp,
    cs.ts_click,
    SUM(CASE WHEN cs.device_sent_timestamp - cs.click_next < 30 * 60 * 1000 THEN 0 ELSE 1 END) 
        OVER(PARTITION BY cs.anonymous_id ORDER BY cs.click_next) AS session
FROM click_series AS cs""")

In [38]:
df.createOrReplaceTempView("df_clean")
spark.sql("""
WITH family_session AS (
SELECT
    browser_family,
    session,
    COUNT(DISTINCT anonymous_id) AS count
FROM 
    df_clean
GROUP BY 1, 2)
SELECT
    fs.browser_family,
    SUM(fs.count) AS count
FROM
    family_session AS fs
GROUP by browser_family
ORDER BY 2 DESC""").show()

+--------------------+------+
|      browser_family| count|
+--------------------+------+
|       Chrome Mobile|215633|
|              Chrome|175101|
|       Mobile Safari| 57797|
|               Other| 37480|
|            Facebook| 16685|
|             Firefox| 15709|
|   Chrome Mobile iOS|  5440|
|                Edge|  5080|
|              Safari|  3317|
|Mobile Safari UIW...|  2532|
|                  IE|  2433|
|               Opera|  1686|
|      Firefox Mobile|   456|
|           IE Mobile|   295|
|         Edge Mobile|   142|
|        Opera Mobile|   132|
|            Chromium|   130|
|         Firefox iOS|    88|
|         FacebookBot|    60|
|           Googlebot|    54|
+--------------------+------+
only showing top 20 rows



In [40]:
def df_to_json(category_family):
    """
    Creates a dataframe with the count of some category family, looking for the user (anonymous_id) and his session
    """
#    df_count = df.groupBy(df[f"{category_family}_family", "session"]).count() \
#            .orderBy('count', ascending=False)
    df_count = spark.sql(f"""
                WITH family_session AS (
                SELECT
                    {category_family}_family,
                    session,
                    COUNT(DISTINCT anonymous_id) AS count
                FROM 
                    df_clean
                GROUP BY 1, 2)
                SELECT
                    fs.{category_family}_family,
                    SUM(fs.count) AS count
                FROM
                    family_session AS fs
                GROUP by 1
                ORDER BY 2 DESC""")
    df_count = df_count.toPandas()
    category_dict = df_count.set_index(f"{category_family}_family").to_dict()['count']

    import json
    with open(f"{category_family}_results.json", "w") as f:
        f.write(json.dumps(category_dict))

In [41]:
df_to_json("browser")
df_to_json("device")
df_to_json("os")