# Introduksjon til Spark
Apache Spark er et åpent kildekode rammeverk for distribuert databehandling av _store_ datamengder. Via grensesnittet til Spark, har man muligheten til å skrive programmer med implisitt data parallellisme med støtte for sanntidsanalyse og avanserte verktøy som maskinlæring.

![](https://files.training.databricks.com/images/sparkcluster.png)

#### Spark Sesjonen
**'Spark Sesjonen'** (_SparkSession_) er inngangspunktet til Spark for å jobbe med DataFrames og Dataset. Via den får man tilgang til Spark funksjonaliteter inkludert lesing av data, håndtering av konfigurasjoner og tilgang til den underliggende 'Spark Konteksten' (_SparkContext_).

I teorien kan man opprette så mange sesjoner som man ønsker, f.eks. om man ønsker at flere brukere skal kunne kjøre kode mot samme kluster.

I en Databricks notebook, vil **'Spark Sesjonen'** allerede være instansiert på forhånd.

_Sørg for at du har et cluster tilgjengelig, før du starter på notebooken her. For å fest en Databricks notebook til et cluster [se her.](https://docs.databricks.com/en/notebooks/notebook-ui.html#compute-resources-for-notebooks)_

**Spark Sesjon** er tilgjengelig i Databricks gjennom den globale notebook variabelen `spark`.

In [0]:
# Kjør denne cellen for å skrive ut Spark Sesjonen
spark

![](https://files.training.databricks.com/images/105/unified-engine.png)

Det er noen viktige grensesnitt man må kjenne til for å kunne bruke Spark. Spesielt med tanke på at Spark er et prosseserings verktøy for store datamengder.

#### Dataset
Dataset er den nyeste abstraksjonen av en distribuert kolleksjon av data, og kan sees på som en kombinasjon av DataFrames og RDDer. Det gir et typet grensesnitt som er tilgjengelig i RDDer, samtidig som det gir mye av den samme funksjonaliteten fra DataFrames. Dataset APIet er tilgjengelig i Scala og Java.
#### DataFrame
DataFrame er en samling av distribuerte Rad-typer. Disse gir et fleksibelt grensesnitt og er liknende i konsept til DataFrames, for de som er kjent med det gjennom Pandas (Python bibliotek) og R-språket. DataFrames APIet er tilgjengelig i Python.
#### RDD (Resilient Distributed Dataset)
Apache Spark's første abstraksjon av en distribuert kolleksjon av data var RDD. Det er et grensesnitt over en sekvens av dataobjekter som består av en eller flere typer som er distribuert utover et kluster av maskiner. Selv om dette er den opprinnelige datastrukturen som er tilgjengelig, bør nye brukere hovedsaklig fokusere på Dataset/DataFrame som eksponerer mye av funksjonaliteten til RDDer.


La oss bruke **Spark Sesjonen** til å lese inn en fil og opprette en DataFrame.

In [0]:
# Les inn en text fil
min_1_dataframe = spark.read.text("/databricks-datasets/learning-spark-v2/SPARK_README.md")

print(min_1_dataframe)

DataFrame[value: string]


Normalt ville man forventet at `print` faktisk skrev ut verdien til DataFramen over, men det er ikke det som skjer. Dette er fordi Spark tillater brukeren å gjøre 2 forskjellige operasjoner på en DataFrame, dette er det vi kaller for **transformasjoner** og **handlinger**.

#### Transformasjoner
Transformasjoner i Spark er late av natur, og de vil ikke bli utført før man på kaller en **handling**. Dette vil si at man kan bygge oppe en større transformasjons rekke, i en interaktiv notebook, over flere celler før en eneste transformasjon har skjedd på dataene. Noen vanlige transformasjoner i Spark inkluderer:
- `filter()`
- `select()`
- `groupBy()`
- `withColumn()`

#### Handlinger
Handlinger er operasjoner som utløser utførelsen av en Spark-jobb og returnerer et resultat. Jobben består av å kjøre alle de tidligere transformasjonene for å få tilbake et faktisk resultat. En handling er sammensatt av en eller flere jobber som består av oppgaver som vil bli utført av arbeiderne parallelt der det er mulig.
Noen vanlige handlinger i Spark inkluderer:
- `show()`
- `count()`
- `take()`
- `collect()`

![](https://files.training.databricks.com/images/wiki-book/pagecounts/trans_and_actions.png)




In [0]:
# En transformasjon
filtered = min_1_dataframe.filter(min_1_dataframe.value.contains("Spark"))

In [0]:
# To handlinger
print(min_1_dataframe.show(5))
print(filtered.show(5))

+--------------------+
|               value|
+--------------------+
|      # Apache Spark|
|                    |
|Spark is a fast a...|
|high-level APIs i...|
|supports general ...|
+--------------------+
only showing top 5 rows

None
+--------------------+
|               value|
+--------------------+
|      # Apache Spark|
|Spark is a fast a...|
|rich set of highe...|
|and Spark Streami...|
|You can find the ...|
+--------------------+
only showing top 5 rows

None


Databricks tilbyr en litt penere utskrifts funskjon, som heter `display`.

In [0]:
min_1_dataframe.display()
filtered.display()

value
# Apache Spark
Spark is a fast and general cluster computing system for Big Data. It provides
"high-level APIs in Scala, Java, Python, and R, and an optimized engine that"
supports general computation graphs for data analysis. It also supports a
"rich set of higher-level tools including Spark SQL for SQL and DataFrames,"
"MLlib for machine learning, GraphX for graph processing,"
and Spark Streaming for stream processing.
## Online Documentation
"You can find the latest Spark documentation, including a programming"
"guide, on the [project web page](http://spark.apache.org/documentation.html)"


value
# Apache Spark
Spark is a fast and general cluster computing system for Big Data. It provides
"rich set of higher-level tools including Spark SQL for SQL and DataFrames,"
and Spark Streaming for stream processing.
"You can find the latest Spark documentation, including a programming"
## Building Spark
Spark is built using [Apache Maven](http://maven.apache.org/).
"To build Spark and its example programs, run:"
"[""Building Spark""](http://spark.apache.org/docs/latest/building-spark.html)."
The easiest way to start using Spark is through the Scala shell:



Hovedårsaken til at Spark består av handlinger og transformasjoner, er at dette gir en enkel måte å optimalisere hele beregningskjeden på. Fremfor å fokusere på de individuelle delene, prøver Spark å se helheten i jobben den blir gitt. Dette gjør Spark eksepsjonelt rask i visse komputeringer, da det kan utføre alle relevante komputasjoner i parallell. Teknisk sett, så "pipeliner" Spark denne beregningen, noe vi kan se i bildet nedenfor. Dette innebærer at visse beregninger kan utføres samtidig (`withColumn()`,`filter()`,`sum()`) fremfor å måtte utføre én operasjon for alle dataelementene, og deretter den neste operasjonen.

![transformasjoner og handlinger](https://files.training.databricks.com/images/eLearning/ucdavis/transformations-narrow.png)

Apache Spark kan også holde resultater i minnet, i motsetning til andre rammeverk som umiddelbart skriver til disk etter hver oppgave (Hadoop).

#### Arkitekturen til Apache Spark

La oss ta en rask titt på Apache Sparks arkitektur. Som nevnt tidligere, lar Apache Spark deg distribuere arbeidslasten din utover et nettverk av maskiner, og dette oppnås gjennom en **driver-arbeider** type arkitektur. Her er det en driver node i et kluster av maskiner, ledsaget av arbeider noder. Driveren sender arbeid til arbeiderene og instruerer dem om å hente data enten fra minnet eller fra disk (eller fra en annen datakilde som GCS eller PostGIS).

Diagrammet nedenfor viser et eksempel på et Apache Spark-kluster. I hovedsak finnes det en Driver-node som kommuniserer med executor-noder. Hver av disse executor-nodene har `slots` som logisk sett fungerer som cpu kjerne.

![spark-arkitektur](https://files.training.databricks.com/images/105/spark_cluster_slots.png)

Driveren sender oppgaver til de ledige `slots` på executorne når det er arbeid som må gjøres:

![spark-arkitektur](https://files.training.databricks.com/images/105/spark_cluster_tasks.png)

Du kan se detaljene i din Apache Spark-applikasjon i Apache Spark web UI. Web UI er tilgjengelig i Databricks ved å gå til "Clusters" og deretter klikke på "View Spark UI"-lenken for klusteret ditt. Det er også tilgjengelig ved å klikke øverst til høyre på denne notatboken der du velger klusteret som er koblet til denne notatboken.

På et høyt nivå består hver Apache Spark-applikasjon av et driverprogram som starter forskjellige parallelle operasjoner på arbeider-nodene, Java Virtual Machines (JVMs), som kjører enten i et kluster eller lokalt på samme maskin. I Databricks, er notebook grensesnittet driverprogrammet. Dette driverprogrammet oppretter distribuerte datasett på klusteret, og deretter utføres operasjoner (transformasjoner og handlinger) på disse datasettene. Driverprogrammet får tilgang til Apache Spark gjennom et Spark Sesjon-objekt.

#### Et Siste Eksempel

La oss sette sammen alle prinsippene vi har introdusert over og sette det i en kontekst ved å lese inn en fil, Gjøre noen transformasjoner med datasettet og til slutt avslutte med en handling.

Vi skal, med andre ord, bygge opp en plan for hvordan vi skal få tilgang til dataene, og deretter endelig utføre den planen med en handling. Vi går gjennom en prosess med å analysere forespørselen, bygger opp en plan, sammenligne dem og til slutt utfører vi den.

![Spark Query Plan](https://files.training.databricks.com/images/wiki-book/pageviews/catalyst.png)

Vi kommer ikke til å gå mer innpå den underliggende optimalisereren her, men man kan [lese mer her.](https://www.databricks.com/glossary/catalyst-optimizer)

La oss lese inn et av datasettene som databricks tilbyr.
- Formatet på filen er `csv`
- Filen inneholder overskrift på første raden
- `inferSchema` betyr at vi lar Spark automatisk oppdage datatypene til kolonnene
   - Kostnaden vi betaler for dette er at filen må leses mer enn en gang

In [0]:
path = "/databricks-datasets/Rdatasets/data-001/csv/ggplot2/diamonds.csv"

diamanter = spark.read.format("csv")\
  .option("header","true")\
  .option("inferSchema", "true")\
  .load(path)

Når vi har lest inn dataene, er vi klare til å kjøre noen komputasjoner. Men før vi kan utføre noen transformasjoner på datasettet, må vi vite hvordan dataene ser ut. Vi kan gjøre dette i form av en handling, enten ved hjelp av `show()` eller `display()`.

In [0]:
diamanter.display()

_c0,carat,cut,color,clarity,depth,table,price,x,y,z
1,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43
2,0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31
3,0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31
4,0.29,Premium,I,VS2,62.4,58.0,334,4.2,4.23,2.63
5,0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75
6,0.24,Very Good,J,VVS2,62.8,57.0,336,3.94,3.96,2.48
7,0.24,Very Good,I,VVS1,62.3,57.0,336,3.95,3.98,2.47
8,0.26,Very Good,H,SI1,61.9,55.0,337,4.07,4.11,2.53
9,0.22,Fair,E,VS2,65.1,61.0,337,3.87,3.78,2.49
10,0.23,Very Good,H,VS1,59.4,61.0,338,4.0,4.05,2.39


Styrken til `display` er at vi raskt og enkelt, ved hjelp av noen klikk, kan lage visualisjoner for å lettere kunne forstå dataene. La oss lage et plot hvor vi kan sammenligne pris, color og cut. Ved å trykke på `+` ikonet på toppen av tabellen kan vi velge å legge til en visualisering fane. For å få frem dataene i visualisjonen må vi kjøre cellen på nytt.

In [0]:
diamanter.display()

_c0,carat,cut,color,clarity,depth,table,price,x,y,z
1,0.23,Ideal,E,SI2,61.5,55.0,326,3.95,3.98,2.43
2,0.21,Premium,E,SI1,59.8,61.0,326,3.89,3.84,2.31
3,0.23,Good,E,VS1,56.9,65.0,327,4.05,4.07,2.31
4,0.29,Premium,I,VS2,62.4,58.0,334,4.2,4.23,2.63
5,0.31,Good,J,SI2,63.3,58.0,335,4.34,4.35,2.75
6,0.24,Very Good,J,VVS2,62.8,57.0,336,3.94,3.96,2.48
7,0.24,Very Good,I,VVS1,62.3,57.0,336,3.95,3.98,2.47
8,0.26,Very Good,H,SI1,61.9,55.0,337,4.07,4.11,2.53
9,0.22,Fair,E,VS2,65.1,61.0,337,3.87,3.78,2.49
10,0.23,Very Good,H,VS1,59.4,61.0,338,4.0,4.05,2.39


Databricks visualization. Run in Databricks to view.

La oss gjøre noen enkle transformasjoner på datasettet, før vi analyserer den midlertidige planen Spark har for å påføre disse transformasjon på dataene.

Vi skal gjøre noen veldig enkle transformasjoner, først grupperer vi dataene på to variabler, cut og color, før vi beregner gjennomsnittsprisen basert på disse to variablene. Etter dette ønsker vi å slå sammen det nye datasettet med det originale datasettet på kolonnen color, og velge carat og gjennomsnittsprisen.

In [0]:
gruppert_data = diamanter.groupBy("cut", "color").avg("price")

joined_data = gruppert_data\
  .join(diamanter, on='color', how='inner')\
  .select("`avg(price)`", "carat")

Disse transformasjonene er nå på en måte fullført, men ingenting har skjedd. Som du ser ovenfor, har ikke Spark retunert et resultat tilbake!

Grunnen til det er at disse beregningene er late av natur. Dette er en intelligent optimalisering som muliggjør at enhver beregning kan bli rekalkulert fra kildedataene, noe som gjør at Apache Spark kan håndtere eventuelle feil som oppstår underveis, og håndtere forsinkelser på en vellykket måte. For det andre, kan Apache Spark optimalisere beregningen slik at transformasjonen på dataene kan utføres mer effektivt, ved å lage en plan for hvordan dette arbeidet vil utføres. Dette vil ikke si at man slipper å tenke på hvilken transformasjoner som bør komme før andre transformasjoner. Mer om dette senere.

For å få en følelse av hva denne planen består av, kan vi bruke explain-metoden. Husk at ingen av beregningene våre har blitt utført ennå, så alt denne explain-metoden gjør er å fortelle oss hvordan du beregner dette eksakte datasettet.

In [0]:
joined_data.explain()

== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   ResultQueryStage 2, Statistics(sizeInBytes=63.1 MiB, ColumnStat: N/A, isRuntime=true)
   +- *(3) Project [avg(price)#422, carat#428]
      +- *(3) BroadcastHashJoin [color#52], [color#430], Inner, BuildLeft, false
         :- ShuffleQueryStage 1, Statistics(sizeInBytes=1120.0 B, rowCount=35, ColumnStat: N/A, isRuntime=true)
         :  +- Exchange SinglePartition, EXECUTOR_BROADCAST, [plan_id=217]
         :     +- *(2) HashAggregate(keys=[cut#51, color#52], functions=[finalmerge_avg(merge sum#456, count#457L) AS avg(price#56)#421])
         :        +- AQEShuffleRead coalesced
         :           +- ShuffleQueryStage 0, Statistics(sizeInBytes=2016.0 B, rowCount=35, ColumnStat: N/A, isRuntime=true)
         :              +- Exchange hashpartitioning(cut#51, color#52, 200), ENSURE_REQUIREMENTS, [plan_id=182]
         :                 +- *(1) HashAggregate(keys=[cut#51, color#52], functions=[partial_avg(pric

Prøv å se om du forstår noe av planen over. Vi skal ikke gå noe dypere inn på dette her, men for mer [info se her.](https://docs.databricks.com/en/optimizations/cbo.html)

In [0]:
joined_data.count()

269700

Vi kan undersøke hvilken plan Spark brukte under eksekveringen av jobben ved å trykke på pilen ved siden av `(4) Spark Jobs` på bunnen av cellen over.
Etter at jobben har fått kjørt ferdig, kan vi trykke på `View` linken, etterfulgt av `DAG Visualization` knappen. Dette gir oss direkte tilgang til Spark UIen fra notebooken våres.

Dette er den Direkte Asykliske Grafen (DAG) av alle beregningene vi nettopp har kjørt. Dette blir ofte kalt for datalineage, da det gir oss en oversikt over hva som har skjedd med dataene fra vi leste de inn til vi har prosessert de ferdig.

Det er takket være de late transformasjonen at denne DAGen kan bli generert. Under genereringen av stegene som inngår i DAGen, optimaliserer Spark en del ting underveis. Dette er en av hovedgrunnen til å bruke DataFrame og Dataset i stedet for RDD APIet. Du vil se eksempler på `WholeStageCodeGen` og `tungsten` i planene, og disse er en del av forbedringene i Spark SQL, [som du kan lese mer om på Databricks sine sider.](https://www.databricks.com/glossary/tungsten)

#### Caching

Caching er en viktig funksjon i Apache Spark som lar deg lagre data i minnet under beregninger. Dette kan bidra til å øke hastigheten på tilgangen til ofte brukte tabeller eller datastykker. Caching er spesielt nyttig for iterative algoritmer som arbeider gjentatte ganger med de samme dataene. Men det er viktig å merke seg at caching ikke er en universalløsning for alle hastighetsproblemer. Andre viktige konsepter som data-partisjonering, data-klustering og bøttefordeling kan ha en mye større effekt på utførelsen av jobben din enn caching. Husk imidlertid at disse er alle verktøy i verktøykassen din!

For å cache en DataFrame eller RDD, kan du enkelt bruke `cache`-funksjonen

In [0]:
joined_data.cache()

DataFrame[avg(price): double, carat: double]

Caching, som en transformasjon, utføres lat. Det betyr at dataene ikke blir lagret i minnet før du kaller en handling (action) på datasettet.

Vi har fortalt Apache Spark at den skal `cache` datasettet vårt etter at vi har utført en handling. La oss påføre en handling på dataene med en `count` to ganger. Første gang vil dette opprette datasettet, cache den i minnet og deretter returnere resultatet. Andre gang, i stedet for å beregne hele datasettet på nytt, vil den bare hente versjonen som allerede er i minnet.

In [0]:
joined_data.count()

269700

In [0]:
joined_data.count()

269700

Som vi kan se, utføres `count` nummer to flere ganger raskere enn count nummer en.

For å se om en DataFrame er cachet kan vi kjøre følgende celle.

In [0]:
print(joined_data.storageLevel.useMemory)

True


Vi kan printe antall parisjoner datasettet har. For å kunne gjøre det må vi hente informasjon fra det underliggende Resiliant Distributed Dataset (RDD).

In [0]:
print(f"DataFrame består av {joined_data.rdd.getNumPartitions()} partisjoner")

DataFrame består av 1 partisjoner


Man får ikke helt de store parallelle operasjonen på dette datasettet.