#Partycjonowanie

* Wikipedia odwiedziny strony

* Różnice pomiędzy partycjami a slots/cores
* Porównanie `repartition(n)` and `coalesce(n)`
* Shuffle partitions

https://dumps.wikimedia.org/other/pageviews/readme.html

In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.types import * 


schema = StructType([
    StructField("domain_code", StringType(), True),
    StructField("page_title", StringType(), False),
    StructField("count_views", IntegerType(), True),
    StructField("total_response_size",IntegerType(),True)
  ])

fileName = "dbfs:/FileStore/tables/pageviews_20200601_080000.gz"

initialDF = (spark.read
  .option("header", "true")
  .option("sep", " ")
  .option("header","True")
  .schema(schema)
  .csv(fileName))

## Partycje kontra Sloty

** *The Spark API uses the term **core** meaning a thread available for parallel execution.*<br/>*Here we refer to it as **slot** to avoid confusion with the number of cores in the underlying CPU(s)*<br/>*to which there isn't necessarily an equal number.*

### Slots/Cores

Sprawdzam ile jest slotów `SparkContext.defaultParallelism`

Dokumentacja <a href="https://spark.apache.org/docs/latest/configuration.html#execution-behavior" target="_blank">Spark Configuration, Execution Behavior</a>

> Może zależeć od manager clustra:
> * Local mode: number of cores on the local machine
> * Mesos fine grained mode: 8
> * **Others: total number of cores on all executor nodes or 2, whichever is larger**

Sprawdz jaki jest paralelism w spark context

In [0]:
sc.defaultParallelism

8

### Partitions

* Ile jest partycji

Jak sprawdzić ilość partycji 
* wykonaj konwersję do `RDD`
* zapytaj o `RDD` ilość partycji 


In [0]:
initialDF.rdd.getNumPartitions()

1

Dlaczego tylko jedna partycje, czy to może przez to że wczytuję nie podzielny plik ???<br>
Załaduj rozpakowany plik i wczytaj jeszcze raz

In [0]:
from pyspark.sql.functions import * 
from pyspark.sql.types import * 


schema = StructType([
    StructField("domain_code", StringType(), True),
    StructField("page_title", StringType(), False),
    StructField("count_views", IntegerType(), True),
    StructField("total_response_size",IntegerType(),True)
  ])

fileName = "/FileStore/tables/pageviews_20200601_080000"

unzippedDF = (spark.read
  .option("header", "true")
  .option("sep", " ")
  .option("header","True")
  .schema(schema)
  .csv(fileName))


Teraz lepiej, inny rodzaj pliku i od razu inna ilość partycji.

In [0]:
unzippedDF.rdd.getNumPartitions()

8

Zapisz do innej ścieżki i podejżyj ile jest plików ???

In [0]:
unzippedDF.write.format("parquet").mode("overwrite").save("/FileStore/tables/training/wikipedia/pageviews/")

In [0]:
display(dbutils.fs.ls("/FileStore/tables/training/wikipedia/pageviews/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/training/wikipedia/pageviews/_SUCCESS,_SUCCESS,0,1747225783000
dbfs:/FileStore/tables/training/wikipedia/pageviews/_committed_2763607988450499655,_committed_2763607988450499655,810,1747225783000
dbfs:/FileStore/tables/training/wikipedia/pageviews/_started_2763607988450499655,_started_2763607988450499655,0,1747225746000
dbfs:/FileStore/tables/training/wikipedia/pageviews/part-00000-tid-2763607988450499655-09d9bde2-5753-4ef8-834b-d2b9c999cefe-4-1-c000.snappy.parquet,part-00000-tid-2763607988450499655-09d9bde2-5753-4ef8-834b-d2b9c999cefe-4-1-c000.snappy.parquet,11328616,1747225781000
dbfs:/FileStore/tables/training/wikipedia/pageviews/part-00001-tid-2763607988450499655-09d9bde2-5753-4ef8-834b-d2b9c999cefe-5-1-c000.snappy.parquet,part-00001-tid-2763607988450499655-09d9bde2-5753-4ef8-834b-d2b9c999cefe-5-1-c000.snappy.parquet,11402618,1747225781000
dbfs:/FileStore/tables/training/wikipedia/pageviews/part-00002-tid-2763607988450499655-09d9bde2-5753-4ef8-834b-d2b9c999cefe-6-1-c000.snappy.parquet,part-00002-tid-2763607988450499655-09d9bde2-5753-4ef8-834b-d2b9c999cefe-6-1-c000.snappy.parquet,10846905,1747225781000
dbfs:/FileStore/tables/training/wikipedia/pageviews/part-00003-tid-2763607988450499655-09d9bde2-5753-4ef8-834b-d2b9c999cefe-7-1-c000.snappy.parquet,part-00003-tid-2763607988450499655-09d9bde2-5753-4ef8-834b-d2b9c999cefe-7-1-c000.snappy.parquet,11134210,1747225781000
dbfs:/FileStore/tables/training/wikipedia/pageviews/part-00004-tid-2763607988450499655-09d9bde2-5753-4ef8-834b-d2b9c999cefe-8-1-c000.snappy.parquet,part-00004-tid-2763607988450499655-09d9bde2-5753-4ef8-834b-d2b9c999cefe-8-1-c000.snappy.parquet,11303647,1747225781000
dbfs:/FileStore/tables/training/wikipedia/pageviews/part-00005-tid-2763607988450499655-09d9bde2-5753-4ef8-834b-d2b9c999cefe-9-1-c000.snappy.parquet,part-00005-tid-2763607988450499655-09d9bde2-5753-4ef8-834b-d2b9c999cefe-9-1-c000.snappy.parquet,11270606,1747225782000
dbfs:/FileStore/tables/training/wikipedia/pageviews/part-00006-tid-2763607988450499655-09d9bde2-5753-4ef8-834b-d2b9c999cefe-10-1-c000.snappy.parquet,part-00006-tid-2763607988450499655-09d9bde2-5753-4ef8-834b-d2b9c999cefe-10-1-c000.snappy.parquet,10173674,1747225781000


In [0]:
initialDF.count() 

6177086


* To nie przypadek źe mam **8 slots** i **8 partitions**
* Spark sprawdza ile jest **slots**, i na rozmiar danych i domyślnie ustawia ilość partycji.
* Nawet jeśli zwiększe ilość danych Spark wczyta **8 partycji**.
</br>



Wczytuję kopię danych ale już podzielonych na partycję

In [0]:
alternateDF = (spark.read
  .format("parquet").load("/FileStore/tables/training/wikipedia/pageviews/"))

print("Partycje: " + str(alternateDF.rdd.getNumPartitions()))

Partycje: 8


**1** Co się stanie jeśli będę miał duży plik z **200 partycjami** i **256 slotów**?

**2** Co jeśli będę miał bardzo duży plik **200 partycji** i będę miał tylko **8 slotów**, jak długo potrwa ładowanie w porównianiu z datasetem który ma tylko 8 partycji?

**2** Jakie mam opcję jeśli mam (**200 partycji** i **8 slotów**) jeśli nie jestem w stanie zwiększyć ilośći slotów?

### Użyj każdego Slot/Core

Poza kilkoma wyjątkami staraj się dopasować ilość **partycji do ilośći slotów **.

Dzięki temu **wszystkie sloty zostaną użyte** i każdy będzie miał przypisany **task**.



Mając 5 partycji i 8 slotów **3 sloty nie będą użyte**.

Mając 9 partycji i 8 slotów **job zajmię 2x więcej czasu**.
* Np może to zająć 10 sekund, żeby przetwożyć pierwszych 8  a potem kolejne 10 sekund na ostatnią partycję = 20s.

### Ile Partycji?

Podstawowa wartość sugerowana to **200MB na partycję (cached)**.
* Nie patrz na rozmiar na dysku: CSV zajmuje dużo miejsca na dysku ale mniej w RAM: String "12345" = 10B, Integer 12345=4B.
* Parquet skompresowane na dysku ale nie w RAM.
* Relacyjne bazy i inne źródła .....?

Wartość **200** pochodzi z doświadczeń Databricks oparty na wydajnośći. 

Jeśli masz wykonawce o niższym RAM (np JVMs with 6GB) możesz  obniżyć tą wartość.

Ile RAM Np 8 partycji * 200MB = 1.6GB


**Pytanie:** Jeśli moje dane będą miały 10 partycji co powinien zrobić ?...
* zredukować ilość partycji (1x ilość slotów)
* czy zwiększyć (2x ilość slotów)

**Odpowiedź** To zależy od ilośći danych w partycji
* Wczytaj dane. 
* Cache.
* Sprawdź wielkość partycji.
* Jeśli jest powyżej > 200MB to rozważ zwiększenie ilośći partycji.
* Jeśli jest poniżej < 200MB to możesz zmiejszyć ilość partycji.

**Celem jest użycie jak najmniejszej liczby partycji i utrzymanie poziomu slotów (przynajmniej 1 x partycji)**.

## `coalesce()` i `repartition()`


**`coalesce(n)`** :
> Returns a new Dataset that has exactly numPartitions partitions, when fewer partitions are requested.<br/>
> If a larger number of partitions is requested, it will stay at the current number of partitions.

**`repartition(n)`** :
> Returns a new Dataset that has exactly numPartitions partitions.

Różnice
* `coalesce(n)` transformacja **narrow** zmiejsza ilość partycji.
* `repartition(n)` transformacja **wide** może być użyta do zmiejszenia lub zwiększenia ilośći partycji.


Kiedy użyć jednej lub drugiej.
* `coalesce(n)` nie wywoła shuffle.
* `coalesce(n)` nie gwarantuje równej dystrybujci rekordów na wszystkich partycjach. Może się skończyć z partycjami zawierającymi 80% danych.
* `repartition(n)` jako transformacja **wide** doda koszt shuffle
* `repartition(n)` będzie miało relatywnie równą dystrybujcę danych w partycjach.

In [0]:

repartitionedDF = alternateDF.repartition(10)

print("Partitions: " + str(repartitionedDF.rdd.getNumPartitions()))

Partitions: 10


## Cache

Back to list...
0. Cache the data
0. Adjust the `spark.sql.shuffle.partitions`
0. Perform some basic ETL (i.e., convert strings to timestamp)
0. Possibly re-cache the data if the ETL was costly

We just balanced the number of partitions to the number of slots.

Depending on the size of the data and the number of partitions, the shuffle operation can be fairly expensive (though necessary).

Let's cache the result of the `repartition(n)` call..
* Or more specifically, let's mark it for caching.
* The actual cache will occur later once an action is performed
* Or you could just execute a count to force materialization of the cache.

In [0]:
df = repartitionedDF.cache()

##spark.sql.shuffle.partitions


0. Adjust the `spark.sql.shuffle.partitions`
0. Perform some basic ETL (i.e., convert strings to timestamp)
0. Possibly re-cache the data if the ETL was costly

The next problem has to do with a side effect of certain **wide** transformations.

So far, we haven't hit any **wide** transformations other than `repartition(n)`
* But eventually we will... 
* Let's illustrate the problem that we will **eventually** hit
* We can do this by simply sorting our data.

In [0]:

(repartitionedDF
  .orderBy("page_title")        # sortuje dane 
  .rdd.foreach(lambda x: ...))  # nie robi nic poza wywołaniem joba

In [0]:
spark.conf.get("spark.sql.shuffle.partitions")

'200'

In [0]:
spark.conf.set("spark.sql.shuffle.partitions",100)


* Jedna akcja.
* Spark wykonał 3 zadania(jobs).
* Sprawdź plan wykonania.
* **Exchange rangepartitioning**
  

In [0]:

# Sprawdz exmplain ze wszystkimi rekordami
(repartitionedDF
  .orderBy("count_views")
  .explain())


# Sprawdz exmplain z 3M rekordami
(repartitionedDF
  .orderBy("count_views")
  .limit(3000000)
  .explain())


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- == Initial Plan ==
   Sort [count_views#224 ASC NULLS FIRST], true, 0
   +- Exchange rangepartitioning(count_views#224 ASC NULLS FIRST, 100), ENSURE_REQUIREMENTS, [plan_id=284]
      +- InMemoryTableScan [domain_code#222, page_title#223, count_views#224, total_response_size#225]
            +- InMemoryRelation [domain_code#222, page_title#223, count_views#224, total_response_size#225], StorageLevel(disk, memory, deserialized, 1 replicas)
                  +- AdaptiveSparkPlan isFinalPlan=false
                  +- == Initial Plan ==
                     Exchange RoundRobinPartitioning(10), REPARTITION_BY_NUM, [plan_id=198]
                     +- FileScan parquet [domain_code#222,page_title#223,count_views#224,total_response_size#225] Batched: true, DataFilters: [], Format: Parquet, Location: InMemoryFileIndex(1 paths)[dbfs:/FileStore/tables/training/wikipedia/pageviews], PartitionFilters: [], PushedFilters: [], ReadSchema: str

Dodatkowe zadania (job) zostały wywołane ilością danych w DataFrame

In [0]:

(repartitionedDF
  .orderBy("count_views") 
  .limit(3000000)                 
  .count())        

com.databricks.backend.common.rpc.CommandCancelledException
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$5(SequenceExecutionState.scala:136)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3(SequenceExecutionState.scala:136)
	at com.databricks.spark.chauffeur.SequenceExecutionState.$anonfun$cancel$3$adapted(SequenceExecutionState.scala:133)
	at scala.collection.immutable.Range.foreach(Range.scala:158)
	at com.databricks.spark.chauffeur.SequenceExecutionState.cancel(SequenceExecutionState.scala:133)
	at com.databricks.spark.chauffeur.ExecContextState.cancelRunningSequence(ExecContextState.scala:728)
	at com.databricks.spark.chauffeur.ExecContextState.$anonfun$cancel$1(ExecContextState.scala:446)
	at scala.Option.getOrElse(Option.scala:189)
	at com.databricks.spark.chauffeur.ExecContextState.cancel(ExecContextState.scala:446)
	at com.databricks.spark.chauffeur.ExecutionContextManagerV1.can

Only 1 job.

Spark's Catalyst Optimizer is optimizing our jobs for us!

### Kolejny Problem

* Uruchom orginalny dataframe.
* Przejrzyj wszystkie zadania.
* Sprawdź ile jest partycji w ostatnim jobies!

In [0]:

funkyDF = (repartitionedDF
  .orderBy("count_views"))

funkyDF.rdd.foreach(lambda x: ...)

Czy w różnych jobach jest różna ilość partycji ?

In [0]:
funkyDF.rdd.getNumPartitions()

4


Wartość 200 jest domyślną i opartą na doświadczeniu, pasuje do większości scenariuszy.

Moźesz to zmienić w konfiguracji `spark.sql.shuffle.partitions`


In [0]:
spark.conf.get("spark.sql.shuffle.partitions")

'100'

Zmień na 8

In [0]:
spark.conf.set("spark.sql.shuffle.partitions", "8")

Czy jeśli zmienię różne typy operacji na datasecie to będę miał różną liczbę partycji ?
Ponowne wykonanie dla porównania.

In [0]:
betterDF = (repartitionedDF.orderBy("count_views","page_title")
            .groupBy("domain_code").agg(count("page_title"))
  )
                    
betterDF.rdd.foreach(lambda x: ...)

betterDF.rdd.getNumPartitions()

1