<a href="https://colab.research.google.com/github/tsandesfernandes/escale/blob/master/escale_002.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
#!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#!wget -q https://archive.apache.org/dist/spark/spark-2.4.4/spark-2.4.4-bin-hadoop2.7.tgz
#!tar xf spark-2.4.4-bin-hadoop2.7.tgz
#!pip install -q findspark

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.4.4-bin-hadoop2.7"

import findspark
findspark.init('spark-2.4.4-bin-hadoop2.7')

In [2]:

from google.colab import drive
drive.mount('/content/gdrive')
root_path = '/content/gdrive/My Drive/escale/'
os.chdir(root_path)

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [3]:
from pyspark.sql import SparkSession
from pyspark.sql import *
from pyspark.sql import functions as F
from pyspark.sql import types as T

from pyspark.sql.window import Window

In [4]:
spark = SparkSession.builder.master('local[*]').getOrCreate()
sc = spark.sparkContext

In [5]:
dir = os.listdir(".")

In [6]:
df = spark.read.json('/content/gdrive/My Drive/escale/part-00000.json.gz') #teste

In [7]:
df.printSchema()

root
 |-- anonymous_id: string (nullable = true)
 |-- browser_family: string (nullable = true)
 |-- device_family: string (nullable = true)
 |-- device_sent_timestamp: long (nullable = true)
 |-- event: string (nullable = true)
 |-- n: long (nullable = true)
 |-- os_family: string (nullable = true)
 |-- platform: string (nullable = true)
 |-- version: string (nullable = true)



In [8]:
#df.show(2)

In [9]:
schema = T.StructType([
                    T.StructField('anonymous_id', T.StringType(), False),
                    T.StructField('browser_family', T.StringType(), False),
                    T.StructField('device_family', T.StringType(), False),
                    T.StructField('device_sent_timestamp', T.LongType(), False),
                    T.StructField('event', T.StringType(), True),
                    T.StructField('n', T.LongType(), True),
                    T.StructField('os_family', T.StringType(), False),
                    T.StructField('plataform', T.StringType(), True),
                    T.StructField('version', T.StringType(), True),
                    T.StructField('filename', T.StringType(), True)   
])
df = spark.createDataFrame(sc.emptyRDD(), schema)


In [10]:
for file in dir:
  if 'part' in file:
    path = "{}/{}".format(os.getcwd(), file)
    json_df = spark.read.json(path)
    json_df = json_df.withColumn('filename', F.lit(path.split('/')[-1]))
    df = df.union(json_df)

In [11]:
#df = df.limit(40) #remover no final

In [12]:
df = df.withColumn('device_date', (df["device_sent_timestamp"]/1000).cast("timestamp")) 

In [13]:
#df.show(5)

In [14]:
#anonymous_id nao serve como sessao
#df.groupby('anonymous_id').count().alias('count').sort('count', ascending=False).show(20)

In [15]:
session_cols = [F.col('browser_family'), F.col('device_family'), F.col('os_family')]

In [16]:
df = df.withColumn('session_id', F.lit(F.concat(*session_cols)))

In [17]:
df = df.sort(F.col('session_id'), F.col('device_sent_timestamp'), ascending=True)

In [18]:
window = Window.partitionBy('session_id').orderBy('device_sent_timestamp')

In [19]:
df = df.withColumn("minutes_diff", F.coalesce( ( F.col('device_sent_timestamp') - F.lag(F.col('device_sent_timestamp')).over(window) ) / 60000,F.lit(0)) )


In [20]:
#df.show(5)

In [21]:
df = df.withColumn("is_new_session", F.when( (F.col('minutes_diff') > 30) | (F.col('minutes_diff') == 0.0) , F.lit(1) ).otherwise(F.lit(0)))

In [22]:
#df.show(20)

In [23]:
df.printSchema()

root
 |-- anonymous_id: string (nullable = true)
 |-- browser_family: string (nullable = true)
 |-- device_family: string (nullable = true)
 |-- device_sent_timestamp: long (nullable = true)
 |-- event: string (nullable = true)
 |-- n: long (nullable = true)
 |-- os_family: string (nullable = true)
 |-- plataform: string (nullable = true)
 |-- version: string (nullable = true)
 |-- filename: string (nullable = true)
 |-- device_date: timestamp (nullable = true)
 |-- session_id: string (nullable = true)
 |-- minutes_diff: double (nullable = false)
 |-- is_new_session: integer (nullable = false)



In [27]:
#resultado parte 1
resultado_1_df = df.groupby('filename').agg(F.sum(F.col('is_new_session')).alias('sessoes_unicas'))

In [25]:
resultado_1_dict = {}
for row in resultado_1_df.collect():
  resultado_1_dict[row['filename']] = row['sessoes_unicas']
  

In [26]:
resultado_1_dict

{'part-00000.json.gz': 8267,
 'part-00001.json.gz': 13953,
 'part-00002.json.gz': 19552,
 'part-00003.json.gz': 25187,
 'part-00004.json.gz': 30932,
 'part-00005.json.gz': 36500,
 'part-00006.json.gz': 41822,
 'part-00007.json.gz': 47512,
 'part-00008.json.gz': 53304,
 'part-00009.json.gz': 58894}

In [27]:
#resultado parte 2
def unique_by_column(column):
  return df.groupby(column).agg(F.sum(F.col('is_new_session')).alias('sessoes_unicas'))

In [28]:
columns = ['browser_family', 'os_family', 'device_family']
resultado_2_dict = {}
for column in columns:
  df_2 = unique_by_column(F.col(column))
  in_dict = {}
  for row in df_2.collect():
    in_dict[row[column]] = row['sessoes_unicas']
  resultado_2_dict[column] = in_dict

In [29]:
resultado_2_dict

{'browser_family': {'AdsBot-Google': 376,
  'Amazon Silk': 19,
  'Android': 7,
  'AppleMail': 24,
  'Applebot': 1,
  'BingPreview': 9,
  'Chrome': 16252,
  'Chrome Mobile': 210139,
  'Chrome Mobile iOS': 205,
  'Chromium': 77,
  'Edge': 149,
  'Edge Mobile': 53,
  'Facebook': 23350,
  'Firefox': 762,
  'Firefox Mobile': 166,
  'Firefox iOS': 88,
  'GooglePlusBot': 1,
  'Googlebot': 71965,
  'IE': 4,
  'IE Mobile': 6,
  'Investment Crawler': 3,
  'Maxthon': 8,
  'Mobile Safari': 4107,
  'Mobile Safari UIWebView': 2969,
  'Opera': 372,
  'Opera Mini': 2,
  'Opera Mobile': 370,
  'Other': 3821,
  'Pale Moon (Firefox Variant)': 2,
  'PetalBot': 9,
  'Pinterest': 3,
  'Puffin': 29,
  'SMTBot': 7,
  'Safari': 120,
  'SeaMonkey': 2,
  'UC Browser': 400,
  'Vivaldi': 21,
  'Yahoo! Slurp': 1,
  'Yandex Browser': 24},
 'device_family': {'XiaoMi Redmi 7': 277,
  'iPhone11,2': 162,
  'MS50L_4G': 6,
  'XiaoMi Redmi Go': 145,
  'KYY23': 1,
  'Samsung SM-G970N': 2,
  'iPad6,12': 2,
  'Pixel 2 XL': 4,

In [24]:
#parte 3
df = df.withColumn('30min_delta', F.coalesce( ( F.col('device_sent_timestamp') - F.lag(F.col('device_sent_timestamp')).over(window) ) / 60000 > 30, F.lit(True)) )

In [25]:
#window_delta = Window.partitionBy().orderBy(F.col('session_id'), F.col('device_sent_timestamp'))
df = df.withColumn('group_id', F.sum(F.col('30min_delta').cast('int')).over(window)   )

In [26]:
#df.show(20)

In [27]:
df_3 = df.groupby(F.col('browser_family'), F.col('device_family'),F.col('os_family'), F.col('group_id') ).agg(F.min(F.col('device_date')).alias('min_date'), F.max(F.col('device_date')).alias('max_date'))

In [28]:
#df_3.show(20)

In [None]:
df_3 = df_3.withColumn('diff_seconds', F.unix_timestamp(F.col('max_date')) - F.unix_timestamp(F.col('min_date')) )

In [30]:
#df_3.show(20)


In [31]:
#resultado parte 3
def median_by_column(column):
  return df_3.groupby(column).agg(F.expr('percentile_approx(diff_seconds, 0.5)').alias("mediana")).rdd.collectAsMap()

In [None]:
#2 fors estavam dando java memory heap  limite do google colab 12gb
columns = ['browser_family', 'os_family', 'device_family']
resultado_3_dict = {}
for column in columns:
  #df_4 = median_by_column(F.col(column))
  #in_dict = {}
  #for row in df_4.collect():
  #  in_dict[row[column]] = row['mediana']
  resultado_3_dict[column] = median_by_column(column)

In [None]:
resultado_3_dict