In [1]:
from pyspark.sql import SparkSession

# importanto funções de agregação comuns
from pyspark.sql.functions import col, count, collect_list, max

# nível de armazenamento em memória (MEMORY_ONLY, DISK_ONLY...)
from pyspark import StorageLevel

# sessão Spark é o ponto de entrada para usar DataFrames
spark = SparkSession.builder.appName("Bootcamp PySpark").getOrCreate()

# desativando o broadcast join automático (o spark faz automaticamente para datasets de até 10 MB)
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")

# permite self-joins entre dataframes sem a necessidade de criar alias para evitar ambiguidade
# quando isso ocorrer, não vai falhar
spark.conf.set("spark.sql.analyzer.failAmbiguousSelfJoin", "false")

# alterando as partições produzidas por shuffle's de 200 (default) para 4
spark.conf.set("spark.sql.shuffle.partitions", "4")

25/08/06 19:05:44 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [2]:
# views temporárias ficam disponíveis apenas durante a sessão
# podem ser consultadas tranquilamente via sql

users = spark.read \
             .option("header", "true") \
             .option("inferSchema", "true") \
             .csv("/home/iceberg/data/events.csv") \
             .where(col("user_id").isNotNull())

users.createOrReplaceTempView("events")

                                                                                

In [3]:
users.show(3)

+-----------+---------+--------+--------------------+---+--------------------+
|    user_id|device_id|referrer|                host|url|          event_time|
+-----------+---------+--------+--------------------+---+--------------------+
| 1037710827|532630305|    NULL| www.zachwilson.tech|  /|2021-03-08 17:27:...|
|  925588856|532630305|    NULL|    www.eczachly.com|  /|2021-05-10 11:26:...|
|-1180485268|532630305|    NULL|admin.zachwilson....|  /|2021-02-17 16:19:...|
+-----------+---------+--------+--------------------+---+--------------------+
only showing top 3 rows



In [4]:
# cache de um DataFrame com agregações por usuário e device
# cache deve ser usado com cuidado (< 5GB) / ou para broadcast join
# evitar StorageLevel que não seja MEMORY_ONLY (a menos que necessário)
# o padrão é MEMORY_AND_DISK
eventsAggregated = spark.sql("""
    SELECT user_id, 
           device_id, 
           COUNT(1) as event_counts, 
           COLLECT_LIST(DISTINCT host) as host_array
    FROM events
    GROUP BY user_id, device_id
""").cache()

# se eu precisar usar DISK_ONLY como storage, provavelmente é melhor criar uma tabela de staging !!
spark.sql("""
    CREATE TABLE IF NOT EXISTS bootcamp.events_aggregated_staging (
        user_id BIGINT,
        device_id BIGINT,
        event_counts BIGINT,
        host_array ARRAY<STRING>
    )
    PARTITIONED BY (ds STRING)
""")


DataFrame[]

In [5]:
eventsAggregated.show(3)

[Stage 5:>                                                          (0 + 2) / 2]

+-----------+----------+------------+--------------------+
|    user_id| device_id|event_counts|          host_array|
+-----------+----------+------------+--------------------+
|-2147421007|-807271869|           1|[admin.zachwilson...|
|-2147340867|1324700293|           1|  [www.eczachly.com]|
|-2147051672| 583904608|           1|  [www.eczachly.com]|
+-----------+----------+------------+--------------------+
only showing top 3 rows



                                                                                

In [9]:
eventsAggregated.count()

141935

In [11]:
# join do dataframe lido com a view temporária (em cache)
# o único benefício de datasets (como temp views) em cache é caso eu precise usar mais de uma vez

# só para tirar um erro de ambiguidade
eventsRenamed = eventsAggregated.withColumnRenamed("device_id", "event_device_id")

usersAndDevices = users.join(eventsRenamed, on="user_id") \
    .groupBy("user_id") \
    .agg(max("event_counts").alias("total_hits"),
         collect_list("event_device_id").alias("devices") )

# notar que sse join não é nada escalável...
usersAndDevices.explain()



== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- ObjectHashAggregate(keys=[user_id#17], functions=[max(event_counts#61L), collect_list(event_device_id#682, 0, 0)])
   +- ObjectHashAggregate(keys=[user_id#17], functions=[partial_max(event_counts#61L), partial_collect_list(event_device_id#682, 0, 0)])
      +- Project [user_id#17, event_device_id#682, event_counts#61L]
         +- SortMergeJoin [user_id#17], [user_id#687], Inner
            :- Sort [user_id#17 ASC NULLS FIRST], false, 0
            :  +- Exchange hashpartitioning(user_id#17, 4), ENSURE_REQUIREMENTS, [plan_id=457]
            :     +- Filter isnotnull(user_id#17)
            :        +- FileScan csv [user_id#17] Batched: false, DataFilters: [isnotnull(user_id#17)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int>
            +- Sort [user_id#687 ASC NULLS FIRST], false, 0
             

In [13]:
# testando o plano de execução com broadcast join
from pyspark.sql.functions import broadcast

usersAndDevices_broadcast = users.join(broadcast(eventsRenamed), on="user_id") \
    .groupBy("user_id") \
    .agg(
        max("event_counts").alias("total_hits"),
        collect_list("event_device_id").alias("devices")
    )

usersAndDevices_broadcast.explain()


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- ObjectHashAggregate(keys=[user_id#17], functions=[max(event_counts#61L), collect_list(event_device_id#682, 0, 0)])
   +- Exchange hashpartitioning(user_id#17, 4), ENSURE_REQUIREMENTS, [plan_id=505]
      +- ObjectHashAggregate(keys=[user_id#17], functions=[partial_max(event_counts#61L), partial_collect_list(event_device_id#682, 0, 0)])
         +- Project [user_id#17, event_device_id#682, event_counts#61L]
            +- BroadcastHashJoin [user_id#17], [user_id#783], Inner, BuildRight, false
               :- Filter isnotnull(user_id#17)
               :  +- FileScan csv [user_id#17] Batched: false, DataFilters: [isnotnull(user_id#17)], Format: CSV, Location: InMemoryFileIndex(1 paths)[file:/home/iceberg/data/events.csv], PartitionFilters: [], PushedFilters: [IsNotNull(user_id)], ReadSchema: struct<user_id:int>
               +- BroadcastExchange HashedRelationBroadcastMode(List(cast(input[0, int, true] as bigint)),false), [pla

In [14]:
usersAndDevices.show(3)
type(usersAndDevices)

[Stage 54:>                                                         (0 + 1) / 1]

+-----------+----------+--------------------+
|    user_id|total_hits|             devices|
+-----------+----------+--------------------+
|-2147470439|         3|[378988111, 37898...|
|-2147421007|         1|        [-807271869]|
|-2147326548|         1|         [532630305]|
+-----------+----------+--------------------+
only showing top 3 rows



                                                                                

pyspark.sql.dataframe.DataFrame

In [15]:
# take retorna uma lista de Rows
usersAndDevices.take(1)
type(usersAndDevices.take(1))

list

In [16]:
eventsAggregated.unpersist()

DataFrame[user_id: int, device_id: int, event_counts: bigint, host_array: array<string>]

In [18]:
spark.stop()