# hw5


**Задание 1**. Merge Sort Join и Hash Join

**Merge Sort Join**

Плюсы:

* Более эффективен для больших наборов данных, которые не помещаются в оперативную память, так как он не зависит от размера памяти
* Более эффективен если данные уже отсортированы
* Всегда работает за O(n log n)
* Требования к памяти исполнителей для выполнения Sort Merge Join значительно ниже, чем для Shuffle Hash и Broadcast Hash<br>

Недостатки:

* На маленьких наборах данных работает медленнее Hash Join
* Нужна предварительная сортировка 

**Hash Join**

Плюсы:

* Быстрый для меньших наборов данных, которые помещаются в память

Недостатки:

* Ограничен размером доступной памяти
* При работе с данными, которые не умещаются в память, производительность падает из-за IO операций 




In [1]:
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions._

// Соберите WordCount приложение, запустите на датасете ppkm_sentiment

val sparkk = SparkSession.builder()
  .appName("WordCount")
  .getOrCreate()

val df = sparkk.read.option("header", "true").csv("datasets/ppkmSentiment/ppkm_dataset.csv")

val wordsDf = df.select(explode(split(col("comment"), "\\s+")).as("word"))
  .groupByKey(_.getString(0))
  .count()
  .toDF("word", "count")

wordsDf.show()

+----------------+-----+
|            word|count|
+----------------+-----+
|            PPKM|  100|
|          patuhi|    3|
|        semangat|    4|
|           KEDUA|    1|
|   PENGOPTIMALAN|    1|
|          hingga|   11|
|           #PTKM|    4|
|          Dahulu|    1|
|          baik""|    1|
|        generasi|    1|
|          tumbuh|    1|
|          Mangga|    1|
|         artinya|    1|
|         tagihan|    1|
|          Jumlah|    1|
|        Tracking|    1|
|         operasi|    1|
|          #Sumut|    1|
|         Kapolda|    1|
|bulan.PEMERINTAH|    1|
+----------------+-----+
only showing top 20 rows



In [2]:
// Измените WordCount так, чтобы он удалял знаки препинания и приводил все слова к единому регистру

val cleanedWordsDf = df.select(explode(split(lower(regexp_replace(col("comment"), "[^\\w\\s]", "")), "\\s+")).as("word"))
  .groupByKey(_.getString(0))
  .count()
  .toDF("word", "count")

cleanedWordsDf.show()

+------------------+-----+
|              word|count|
+------------------+-----+
|          semangat|    8|
|            patuhi|    9|
|            hingga|   13|
|          generasi|    2|
|            tumbuh|    1|
|           pranowo|    2|
|           artinya|    1|
|           tagihan|    1|
|httpstco3jtlpo1pwo|    1|
|           operasi|    3|
|httpstcopah0sasp9w|    1|
|         erwin3977|    1|
|            absurd|    1|
|            bangun|    1|
|           pembuat|    1|
|             kabar|    1|
|              gini|    1|
|             bidan|    2|
|httpstcohamun5p96h|    1|
|            pidana|    1|
+------------------+-----+
only showing top 20 rows



In [3]:
// Измените выход WordCount так, чтобы сортировка была по количеству повторений, а список слов был во втором столбце, а не в первом

val sortedWordsDf = cleanedWordsDf.orderBy(desc("count"))
  .select("count", "word")

sortedWordsDf.show()

+-----+-------------+
|count|         word|
+-----+-------------+
|  134|         ppkm|
|  110|        mikro|
|   87|          dan|
|   77|           di|
|   71|      covid19|
|   65|   masyarakat|
|   59|    ppkmmikro|
|   59|           yg|
|   44|     kegiatan|
|   43|   pembatasan|
|   42| perpanjangan|
|   33|           rt|
|   31|          ada|
|   31|     berbasis|
|   29|         yang|
|   29|        covid|
|   28|        maret|
|   27|        untuk|
|   25|  humas_jogja|
|   25|jogjaistimewa|
+-----+-------------+
only showing top 20 rows



In [4]:
// Измените выход WordCount, чтобы формат соответствовал TSV

sortedWordsDf.write.option("sep", "\t").mode("overwrite").csv("wordcount_output.tsv")

In [5]:
// Добавьте в WordCount возможность через конфигурацию задать список стоп-слов, которые будут отфильтрованы во время работы приложения

val stopWordsDf = spark.read.text("datasets/ppkmSentiment/stopwordv1.txt")
val stopWords = stopWordsDf.as[String].collect()
val broadcastStopWords = spark.sparkContext.broadcast(stopWords)

val filteredWordsDf = wordsDf
  .filter(!col("word").isin(broadcastStopWords.value: _*))
filteredWordsDf.show()

+----------------+-----+
|            word|count|
+----------------+-----+
|            PPKM|  100|
|          patuhi|    3|
|        semangat|    4|
|           KEDUA|    1|
|   PENGOPTIMALAN|    1|
|           #PTKM|    4|
|          Dahulu|    1|
|          baik""|    1|
|        generasi|    1|
|          tumbuh|    1|
|          Mangga|    1|
|         artinya|    1|
|         tagihan|    1|
|          Jumlah|    1|
|        Tracking|    1|
|         operasi|    1|
|          #Sumut|    1|
|         Kapolda|    1|
|bulan.PEMERINTAH|    1|
|          absurd|    1|
+----------------+-----+
only showing top 20 rows

