In [0]:
%spark.conf

spark.executor.instances=2
spark.executor.memory=1G
spark.kryoserializer.buffer.max=1024m

spark.sql.autoBroadcastJoinThreshold=20971520

Нужно скопировать себе эту тетрадку. Параграфы с генерацией данных и созданием семплов запускать не нужно, они оставлены для ознакомления

In [2]:
import org.apache.spark.mllib.random.RandomRDDs._
import java.time.LocalDate
import java.time.format.DateTimeFormatter

val dates = (0 to 14).map(LocalDate.of(2020, 11, 1).plusDays(_).format(DateTimeFormatter.ofPattern("yyyy-MM-dd"))).toSeq

def generateCity(r: Double): String = if (r < 0.9) "BIG_CITY" else "SMALL_CITY_" + scala.math.round((r - 0.9) * 1000)

def generateCityUdf = udf(generateCity _)

// spark.sql("drop table hw2.events_full")
spark.sql("create database hw_4")
for(i <- dates) {
    uniformRDD(sc, 10000000L, 1)
    .toDF("uid")
    .withColumn("date", lit(i))
    .withColumn("city", generateCityUdf($"uid"))
    .selectExpr("date", " sha2(cast(uid as STRING), 256) event_id", "city")
    .withColumn("skew_key", when($"city" === "BIG_CITY", lit("big_event")).otherwise($"event_id"))
    .write.mode("append")
    .partitionBy("date")
    .saveAsTable("hw_4.events_full")
}


In [3]:
spark.table("hw_4.events_full")
.select("event_id")
.sample(0.001)
.repartition(2)
.write.mode("overwrite")
.saveAsTable("hw_4.sample")


In [4]:

spark.table("hw_4.sample")
.limit(100)
.coalesce(1)
.write.mode("overwrite")
.saveAsTable("hw_4.sample_small")

In [5]:


spark.table("hw_4.events_full")
.select("event_id")
.sample(0.003)
.repartition(1)
.write.mode("overwrite")
.saveAsTable("hw_4.sample_big")

In [6]:


spark.table("hw_4.events_full")
.select("event_id")
.sample(0.015)
.repartition(1)
.write.mode("overwrite")
.saveAsTable("hw_4.sample_very_big")

Для упражнений сгрененирован большой набор синтетических данных в таблице hw4.events_full. Из этого набора данных созданы маленькие (относительно исходного набора) таблицы разного размера kotelnikov.sample_[small, big, very_big]. 

Ответить на вопросы:
 * какова структура таблиц
 * сколько в них записей 
 * сколько места занимают данные
 

In [8]:
%pyspark

tables = spark.sql("SHOW TABLES IN hw_4").collect()
    
for row in tables:
    print(row.tableName)
    spark.table("{}.{}".format(row.database, row.tableName)).printSchema()
    spark.table("{}.{}".format(row.database, row.tableName)).count()
    print("---------------------------------------------------------------------------------")

    

# Информация о том, сколько места занимают данные
# Посмотреть в HDFS UI

Получить планы запросов для джойна большой таблицы hw_4.events_full с каждой из таблиц hw_4.sample, hw_4.sample_big, hw_4.sample_very_big по полю event_id. В каких случаях используется BroadcastHashJoin? 

BroadcastHashJoin автоматически выполняется для джойна с таблицами, размером меньше параметра spark.sql.autoBroadcastJoinThreshold. Узнать его значение можно командой spark.conf.get("spark.sql.autoBroadcastJoinThreshold").

Решение: ячейка ниже.
BroadcastHashJoin используется при джоине events_full и sample.

In [11]:
%pyspark
from pyspark.sql.functions import *

print("autoBroadcastJoinThreshold =", spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))

events_full = spark.table("hw_4.events_full")
sample = spark.table("hw_4.sample")
sample_big = spark.table("hw_4.sample_big")
sample_very_big = spark.table("hw_4.sample_very_big")

#events_full.show()
print("size of the events_full table:", events_full.count())
print("size of the sample table:", sample.count())
print("size of the sample_big table:", sample_big.count())
print("size of the sample_very_big table:", sample_very_big.count())

events_full.join(sample, "event_id", "inner").explain()
print('-------------------------------------------------------')
events_full.join(sample_big, "event_id", "inner").explain()
print('-------------------------------------------------------')
events_full.join(sample_very_big, "event_id", "inner").explain()


Выполнить джойны с таблицами  hw_4.sample,  hw_4.sample_big в отдельных параграфах, чтобы узнать время выполнения запросов (например, вызвать .count() для результатов запросов). Время выполнения параграфа считается автоматически и указывается в нижней части по завершении.

events_full join sample время выполнения состовляет 20 секунд. Размер таблицы на выходе: 150397
events_full join sample_big время выполнения составляет 1 минута 3 секунды. Размер таблицы на выходе: 449135

Зайти в spark ui (ссылку сгенерировать в следующем папраграфе). Сколько tasks создано на каждую операцию? Почему именно столько? Каков DAG вычислений?  

events_full join sample: 82 tasks. Всего 2 стейджа: 91 и 92. 
Стейдж 91: FileScanRDD -> MapPartitionsRDD. Происходит поиск партиций и мэппинг. 
Стейдж 92: ShuffledRowRDD -> MapPartitionsRDD -> MapPartitionsRDD. Происходит shuffle данных по индексам значений, которые мы мерджим и затем партиционирование и мэпинг.
По первой таблице происходит Scan parquet -> Filter -> Project. По второй таблице (sample) идет Scan parquet -> Filter -> Project -> BroadcastExchange -> Происходит BroadcastHashJoin c первой таблицей
http://ca-spark-n-01.innoca.local:8088/proxy/application_1707128095337_0216/jobs/job?id=58 
http://ca-spark-n-01.innoca.local:8088/proxy/application_1707128095337_0254/SQL/execution?id=4

events_full join sample_big: 284 tasks. Всего 4 стейджа: 8, 9, 10 и 11. 
Стейдж 8: FileScanRDD -> MapPartitionsRDD. Происходит поиск партиций и мэппинг. 
Стейдж 9: FileScanRDD -> MapPartitionsRDD. Происходит поиск партиций и мэппинг.
Стейдж 10: ShuffledRowRDD -> MapPartitionsRDD -> ZippedPartitionsRDD -> MapPartitionsRDD. На данном этапе происходит дополнительный shuffle.
Стейдж 11: ShuffledRowRDD -> MapPartitionsRDD -> MapPartitionsRDD. Происходит shuffle данных по индексам значений, которые мы мерджим и затем партиционирование и мэпинг.
Главная разница по сравнению с предыдущим запросом (events_full join sample), что джоин происходит через SortMergeJoin, а не через BroadcastHashJoin, так как вторая таблица (sample_big) весит 27mb, что больше параметра spark.sql.autoBroadcastJoinThreshold равного 20mb 
http://ca-spark-n-01.innoca.local:8088/proxy/application_1707128095337_0219/jobs/job?id=4
http://ca-spark-n-01.innoca.local:8088/proxy/application_1707128095337_0254/SQL/execution?id=5


In [13]:
%pyspark

sc.setLocalProperty("callSite.short", "events_full join sample")

print(events_full.join(sample, "event_id", "inner").count())

In [14]:
%pyspark


sc.setLocalProperty("callSite.short", "events_full join sample_big")

print(events_full.join(sample_big, "event_id", "inner").count())

Оптимизировать джойн с таблицами hw_4.sample_big, hw_4.sample_very_big с помощью broadcast(df). Выполнить запрос, посмотреть в UI, как поменялся план запроса, DAG, количество тасков. Второй запрос не выполнится.

events_full join broadcast sample_big время выполнения состовляет 33 секунды. 
82tasks. Всего 2 стейджа: 21, 22.
Стейдж 21: FileScanRDD -> MapPartitionsRDD. Происходит поиск партиций и мэппинг.
Стейдж 22: ShuffledRowRDD -> MapPartitionsRDD -> MapPartitionsRDD -> MapPartitionsRDD -> FileScanRDD -> MapPartitionsRDD. Происходит shuffle данных по индексам значений, которые мы мерджим и затем партиционирование и мэпинг.
Джоин происходит через BroadcastHashJoin
http://ca-spark-n-01.innoca.local:8088/proxy/application_1707128095337_0254/SQL/execution?id=6

events_full join broadcast sample_very_big запрос не выполнился, так как размер sample_very_big, помещенный в broadcast состовляет 136mb, что больше значения spark.sql.autoBroadcastJoinThreshold равного 20mb


In [16]:
%pyspark
from pyspark.sql.functions import broadcast

sc.setLocalProperty("callSite.short", "events_full join broadcast sample_big")

print(events_full.join(broadcast(sample_big), "event_id", "inner").count())






In [17]:
%pyspark
from pyspark.sql.functions import broadcast

sc.setLocalProperty("callSite.short", "events_full join broadcast sample_very_big")

print(events_full.join(broadcast(sample_very_big), "event_id", "inner").count())



Таблица hw_4.sample_very_big оказывается слишком большой для бродкаста и не помещается полностью на каждой ноде, поэтому возникает исключение.


.
.
.

Отключить автоматический броадкаст командой spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1"). Сделать джойн с семплом hw_4.sample, сравнить время выполнения запроса.


In [21]:
%pyspark

spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "-1")


In [22]:
%pyspark

sc.setLocalProperty("callSite.short", "events_full join sample")

print(events_full.join(sample, "event_id", "inner").count())


6 минут 2 секунды в случае, когда отключен автоматический броадкастинг, по сравнению с 2 минутами 1 секундой со включенным бродкастингом.


In [24]:
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", "26214400")

In [25]:
spark.sql("clear cache")

.
.
.

В процессе обработки данных может возникнуть перекос объёма партиций по количеству данных (data skew). В таком случае время выполнения запроса может существенно увеличиться, так как данные распределятся по исполнителям неравномерно. В следующем параграфе происходит инициализация датафрейма, этот параграф нужно выполнить, изменять код нельзя. В задании нужно работать с инициализированным датафреймом.

Датафрейм разделен на 30 партиций по ключу city, который имеет сильно  неравномерное распределение.

In [27]:
%pyspark 
from pyspark.sql.functions import col

skew_df = spark.table("hw_4.events_full")\
.where("date = '2020-11-01'")\
.repartition(30, col("city"))\
.cache()

skew_df.count()

Посчитать количество event_count различных событий event_id , содержащихся в skew_df с группировкой по городам. Результат упорядочить по event_count.

В spark ui в разделе jobs выбрать последнюю, в ней зайти в stage, состоящую из 30 тасков (из такого количества партиций состоит skew_df). На странице стейджа нажать кнопку Event Timeline и увидеть время выполнения тасков по экзекьюторам. Одному из них выпала партиция с существенно большим количеством данных. Остальные экзекьюторы в это время бездействуют -- это и является проблемой, которую предлагается решить далее.



In [29]:
%pyspark
from pyspark.sql.functions import *
sc.setLocalProperty("callSite.short", "skew task")

skew_df.printSchema()
skew_df.show()

skew_df \
.groupBy("city") \
.agg(count("event_id").alias("event_count")) \
.orderBy(col("event_count").desc()) \
.show()


Один из способов решения проблемы агрегации по неравномерно распределенному ключу является предварительное перемешивание данных. Его можно сделать с помощью метода repartition(p_num), где p_num -- количество партиций, на которые будет перемешан исходный датафрейм

In [32]:
%pyspark
from pyspark.sql.functions import *

sc.setLocalProperty("callSite.short", "repartition_task")
skew_df = spark.table("hw_4.events_full")
skew_df.repartition(30).show()

Другой способ исправить неравномерность по ключу -- создание синтетического ключа с равномерным распределением. В нашем случае неравномерность исходит от единственного значения city='BIG_CITY', которое часто повторяется в данных и при группировке попадает к одному экзекьютору. В таком случае лучше провести группировку в два этапа по синтетическому ключу CITY_SALT, который принимает значение BIG_CITY_rand (rand -- случайное целое число) для популярного значения BIG_CITY и CITY для остальных значений. На втором этапе восстанавливаем значения CITY и проводим повторную агрегацию, которая не занимает времени, потому что проводится по существенно меньшего размера данным. 

Такая же техника применима и к джойнам по неравномерному ключу, см, например https://itnext.io/handling-data-skew-in-apache-spark-9f56343e58e8

Что нужно реализовать:
* добавить синтетический ключ
* группировка по синтетическому ключу
* восстановление исходного значения
* группировка по исходной колонке

In [34]:
%pyspark
import pyspark.sql.functions as f
from pyspark.sql.functions import *

# Добавляем колонку с "солью": для BIG_CITY - случайное целое от 0 до 20 включительно, для SMALL_CITY - 21, 
# и выводим кусочки датафрейма для BIG_CITY и SMALL_CITY для контроля правильности выполненной процедуры

sc.setLocalProperty("callSite.short", "big_city_select_and_salt")


events_full = spark.table("hw_4.events_full")

print("Количество записей в таблице:", events_full.count())
print("-------------------------------------------------------------------------")

salt = f.expr("""pmod(round(rand() * 1000000, 0), 23)""").cast("integer")
salted = events_full.withColumn("salt", salt)

print("Уникальные значения синтетического ключа salt")

salted \
    .select('salt') \
    .distinct() \
    .orderBy(f.col("salt").asc()) \
    .show(30) \
    
print("-------------------------------------------------------------------------")
print("Выыод рандомных семплов")
    
salted.sample(0.1).show(30, False)

print("-------------------------------------------------------------------------")

salted_agg_df = salted \
.groupBy("salt", "city")\
.agg(f.count("*").alias("count")) \
    .orderBy(f.col("count").desc())
    
print("Вывод группировки по синтетическому ключу salt")
    
salted_agg_df.show(30, False)

print("-------------------------------------------------------------------------")

print("Вывод группировки по исходному ключу")

agg_df = salted_agg_df \
    .groupBy(f.col("city")).agg(f.sum("count").alias("final_sum")) \
    .orderBy(f.col("final_sum").desc())
    
agg_df.show(30, False)

print("-------------------------------------------------------------------------")

print("Вывод количества событий в таблице-результате")

agg_df.agg(f.sum("final_sum")).show()

