In [9]:
#@title Downloads
%%bash
wget "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00000.json.gz"
wget "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00001.json.gz"
wget "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00002.json.gz"
wget "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00003.json.gz"
wget "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00004.json.gz"
wget "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00005.json.gz"
wget "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00006.json.gz"
wget "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00007.json.gz"
wget "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00008.json.gz"
wget "https://d3l36jjwr70u5l.cloudfront.net/data-engineer-test/part-00009.json.gz"
mkdir data-engineer-test
mv part-*.gz data-engineer-test/
wget https://bin.equinox.io/c/4VmDzA7iaHb/ngrok-stable-linux-amd64.zip
unzip ngrok-stable-linux-amd64.zip -o .
pip install pyspark

Starting job # 0 in a separate thread.


In [None]:
#@title Truncamento dos dados
# se estiver testando no jupyter notebook, rode esse script para truncar os dados

import os
from pyspark.sql import SparkSession

a = os.listdir("data-engineer-test/")

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("EscaleTT") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "12g") \
    .getOrCreate()

for i in a:
    spark.read.schema(schema).json("data-engineer-test/"+i).limit(10000).write.json("data-engineer-test-trunc/"+i)

!ls data-engineer-test-trunc

In [34]:
#@title Monitoramento de jobs 
# tunelamento http para ver a spark-ui
# rode o código abaixo e use o link https://*.ngrok.io no arquivo de log para acessar o monitoramento
%%bash --bg
./ngrok http 4040 -log ngrok-outfile &


Starting job # 10 in a separate thread.


In [None]:
!cat ngrok-outfile

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as f
from pyspark.sql.types import (
    LongType,
    FloatType,
    StringType,
    StructType,
    StructField,
    IntegerType
)

spark = SparkSession \
    .builder \
    .master("local") \
    .appName("EscaleTT") \
    .config("spark.driver.memory", "12g") \
    .config("spark.executor.memory", "12g") \
    .getOrCreate()

# dados truncados para teste no jupyter notebook, use o .py para rodar um job no cluster
datapath = "file:///content/data-engineer-test-trunc/part-00*.json.gz"
schema = StructType([
            StructField("n", IntegerType(), True),
            StructField("event", StringType(), True),
            StructField("version", FloatType(), True),
            StructField("platform", StringType(), True),
            StructField("os_family", StringType(), True),
            StructField("anonymous_id", StringType(), True),
            StructField("device_family", StringType(), True),
            StructField("browser_family", StringType(), True),
            StructField("device_sent_timestamp", StringType(), True)
    ]
)

df = spark \
    .read \
    .schema(schema) \
    .json(datapath) \
    .withColumn("file",f.input_file_name()) \
    .withColumn("device_sent_timestamp", f.substring(f.col("device_sent_timestamp"),0,10)) \
    .withColumn("device_sent_timestamp", f.from_unixtime(f.col("device_sent_timestamp")))


# FAZENDO SESSIONAMENTO
# IMPORTANTE: arredondei o tempo máximo de sessão pra 1 hora, ao invés de 30 minutos
df = df.withColumn("session_timestamp", f.date_trunc("hour","device_sent_timestamp")) 

session_cols = [
    # f.col("anonymous_id"), # IMPORTANTE:  anonymous_id gerava um hash único pra cada linha, decidi dropar
    # fingerprinting está sendo feito por User-Agent e timestamp
    f.col("os_family"),
    f.col("device_family"),
    f.col("browser_family"),
    f.col("session_timestamp") 
]
df = df.withColumn("session_id", f.sha2(f.concat(*session_cols),256))

# ETAPA 1
stage1 = df \
.groupBy(f.col("file")) \
.agg(f.countDistinct(f.col("session_id")).alias("unique_sessions")) \
.rdd.collectAsMap()

# ETAPA 2
def unique_sessions(family):
    return df \
    .groupBy(f.col(family)) \
    .agg(f.countDistinct(f.col("session_id")).alias("unique_sessions")) \
    .rdd.collectAsMap()

stage2 = {
    "os_family": unique_sessions("os_family"),
    "device_family": unique_sessions("device_family"),
    "browser_family": unique_sessions("browser_family")
} 

# ETAPA 3
session_duration = f.unix_timestamp(f.col("session_end")) - f.unix_timestamp(f.col("session_start"))
session_start_and_end = ( f.min("device_sent_timestamp").alias("session_start"), f.max("device_sent_timestamp").alias("session_end") )

df2 = df.groupBy(f.col("session_id")) \
    .agg(*session_start_and_end) \
    .withColumn("session_duration", session_duration)

df = df.join(df2,"session_id")

def median_session_duration(family):
    return df \
    .groupBy(f.col("os_family")) \
    .agg(f.expr('percentile_approx(session_duration, 0.5)').alias("median_duration")) \
    .rdd.collectAsMap()

stage3 = {
    "os_family": median_session_duration("os_family"),
    "browser_family": median_session_duration("browser_family"),
    "device_family": median_session_duration("device_family")
}

stage1,stage2,stage3

In [142]:
import os
import json
import errno


def safe_write_json(dict_,path):
    if not os.path.exists(os.path.dirname(path)):
        try:
            os.makedirs(os.path.dirname(path))
        except OSError as exc:
            if exc.errno != errno.EEXIST:
                raise exc
    with open(path,"w") as f:
        json.dump(dict_,f)


safe_write_json(stage1,"sessions/unique/by_file.json")
safe_write_json(stage1,"sessions/unique/by_family.json")
safe_write_json(stage1,"sessions/median/by_family.json")

In [1]:
#@title REST API
%%writefile main.py

from typing import Optional, List
from fastapi import FastAPI, Query

app = FastAPI()


@app.get("/")
def read_root():
    return {}


# /sessions/unique?by=family
@app.get("/sessions/{metric}/")
def read_metric(metric:str, q: str = Query(default="file",alias="by")):
    metric = metric.strip().lower()
    if q not in ["file","family"]: return {}
    if metric not in ["median","unique"]: return {}

    return json.load(f"sessions/{median}/by_{q}.json")


# {
#     "metric": "median",
#     "by": "file"
# }
# @app.post("/jobs/")
# def read_metric(payload:dict):
#     pass

Overwriting main.py


In [25]:
%%bash --bg
pip install fastapi uvicorn
uvicorn main:app --reload &&

Starting job # 7 in a separate thread.


In [None]:
%%bash --bg
# to use the API
./ngrok http 8000 -log ngrok-outfile