# Introduksjon til PySpark

[Apache Spark](https://spark.apache.org/) er et sterkt verktøy som utvider mulighetsrommet for databehandling med R og Python. Kjernefunksjonaliteten ligger i at den lar deg kjøre en jobb på flere maskiner samtidig, noe som ikke er mulig med klassiske rammeverk som **Pandas** og **Tidyverse**. Følgelig er det et rammeverk som blant annet er veldig egnet for å prosessere store datamengder eller gjøre store beregninger. Les om mer om [Apache Spark i Dapla-manualen](./spark.html)

I denne notebooken vises noen enkle eksempler på hvordan du kan jobbe med data med [PySpark](https://spark.apache.org/docs/latest/api/python/index.html), et Python-grensesnitt mot Spark. 

## Oppsett

Når du logger deg inn på Dapla kan du velge mellom 2 ferdigoppsatte *kernels* for å jobbe med PySpark:

1. Pyspark (local)
2. Pyspark (k8s cluster)

Den første lar deg bruke Spark på en enkeltmaskin, mens den andre lar deg distribuere kjøringen på mange maskiner avhengig av hvor store jobbene er. I eksemplene under brukes **Pyspark (local)**.

In [1]:
#| label: first-cell
#| code-fold: true
# Importer biblioteker
import numpy as np
from pyspark.sql import SparkSession
from pyspark.sql.functions import date_format, explode, expr, sequence
from pyspark.sql.types import DateType, DoubleType, StructField, StructType

# Initialierer en SparkSession
spark = (
    SparkSession.builder.master("local[2]")
    .appName("Dapla-manual-example")
    .getOrCreate()
)

I koden over importerer vi de bibliotekene vi skal bruke under. Grunnen til at vi importerer `pyspark.sql` er at dette er at **Spark SQL** er Apache Spark sin modul for å jobbe med strukturerte data. Og som navnet tilsier vil det si at vi kan blande Python og SQL i koden vår. Vi skal nærmere på hvordan man bruke SQL fra PySpark-notebook senere. 

Spark tilbyr også et eget grensesnitt, **Spark UI**, for å monitorere hva som skjer under en SparkSession. Vi kan bruke følgende kommando for å få opp en lenke til Spark UI i notebooken vår:

In [2]:
spark.sparkContext

Klikker du på **Spark UI**-lenken så tar den deg til dashboard som lar deg monitorere, debugge, optimalisere og forstå kjøringene dine. Det kan være et svært nyttig verktøy i mange tilfeller. 

## Generere data

Vi kan begynne med å generere en Spark DataFrame med en kolonne som inneholder månedlige datoer for perioden 2000M1-2023M8. 

In [3]:
# Generate a sequence of dates
dates_df = spark.range(1).select(
    explode(
        sequence(
            start=expr("date '2000-01-01'"),
            stop=expr("date '2023-08-01'"),
            step=expr("interval 1 month"),
        )
    ).alias("Date")
)
dates_df.show(5)

+----------+
|      Date|
+----------+
|2000-01-01|
|2000-02-01|
|2000-03-01|
|2000-04-01|
|2000-05-01|
+----------+
only showing top 5 rows



En Spark DataFrame er en distribuert samling av data som er organisert inn i kolonner. Siden Spark lar deg distribuere kjøringer på flere maskiner, er DataFrames optimalisert for å kunne splittes opp slik at de kan brukes på flere maskiner. Med andre er dette ikke det samme som en Pandas dataframe mange kjenner fra før. 

Over genererte vi en datokolonne. For å få litt mer data kan vi også generere 100 kolonner med tidsseriedata og så printer vi de 2 første av disse:

In [4]:
# Genererer random walk data
schema = StructType(
    [StructField(f"serie{i:02d}", DoubleType(), True) for i in range(100)]
)

data = [
    tuple((10 + np.random.normal(0, 1, 100)).cumsum().tolist())
    for _ in range(284)  # 284 months from 2000-01 to 2023-08
]

data_df = spark.createDataFrame(data, schema=schema)

data_df.select("serie00", "serie01").show(5)

+------------------+------------------+
|           serie00|           serie01|
+------------------+------------------+
| 7.265541501656777|16.791320476113313|
|10.026567688619423| 20.27479271345087|
|10.151112240824128|18.390601668853087|
|  9.23812028157886| 19.02831960059486|
| 9.480083508895559|19.287135174878998|
+------------------+------------------+
only showing top 5 rows



Til slutt kan vi joine de to datasettene sammen og lage noen kolonner som viser år, kvartal og måned. Deretter printer vi ut noen av kolonnene med kommandoen `show()`. 

In [5]:
# Legger til row index til DataFrame før join med dates_df
data_df = data_df.withColumn("row_index", expr("monotonically_increasing_id()"))

# Joiner de to datasettene
df = (
    dates_df.withColumn("row_index", expr("monotonically_increasing_id()"))
    .join(data_df, "row_index")
    .drop("row_index")
)

# Legger til år, kvartal og mnd
df = df.withColumn("Year", date_format(df.Date, "yyyy"))
df = df.withColumn("Quarter", expr("quarter(Date)"))
df = df.withColumn("Month", date_format(df.Date, "MM"))

df.select("Date", "Year", "Quarter", "Month", "serie00", "serie01").show(5)

+----------+----+-------+-----+------------------+------------------+
|      Date|Year|Quarter|Month|           serie00|           serie01|
+----------+----+-------+-----+------------------+------------------+
|2000-01-01|2000|      1|   01|11.400000236743876|22.549406739178536|
|2000-02-01|2000|      1|   02| 9.831046941220402|19.512681072618033|
|2000-03-01|2000|      1|   03|10.549444253386536| 21.55248520513885|
|2000-04-01|2000|      2|   04|10.841398016100063|19.738459409409366|
|2000-05-01|2000|      2|   05| 9.513312884451398| 18.45161583167829|
+----------+----+-------+-----+------------------+------------------+
only showing top 5 rows



Og med det har vi noe data vi kan jobbe med i resten av notebooken. 

## Skrive til Parquet

PySpark tilbyr mange opsjoner ved skriving til parquet-filer som vi kanskje ikke er vant til å forholde oss til med enklere rammeverk som Pandas. Den enkleste måten å skrive ut en fil er som følger:

```python
df.write.parquet(
    "gs://ssb-prod-dapla-felles-data-delt/temp/timeseries.parquet"
)
```

Dette vil fungere hvis filen ikke finnes fra før. Hvis den finnes fra før så vil den feile. Grunnen er at vi ikke har spesifisert hva vi ønsker at den skal gjøre. Vi kan velge mellom `overwrite`, `append`, `ignore` eller `errorifexists`. Sistnevnte er også default-oppførsel hvis du ikke ber den gjøre noe annet. 

Under bruker vi opsjonen `overwrite`, det vil si at den skriver over en evt eksisterende fil med samme navn. 

In [6]:
df.write.mode("overwrite").parquet(
    "gs://ssb-prod-dapla-felles-data-delt/temp/timeseries.parquet"
)

Vi kan inspisere hva som ble skrevet ved å liste ut innholder i bøtta. 

In [7]:
from dapla import FileClient

fs = FileClient.get_gcs_file_system()

fs.glob("gs://ssb-prod-dapla-felles-data-delt/temp/**")

['ssb-prod-dapla-felles-data-delt/temp/',
 'ssb-prod-dapla-felles-data-delt/temp/timeseries.parquet',
 'ssb-prod-dapla-felles-data-delt/temp/timeseries.parquet/',
 'ssb-prod-dapla-felles-data-delt/temp/timeseries.parquet/_SUCCESS',
 'ssb-prod-dapla-felles-data-delt/temp/timeseries.parquet/part-00000-a5d51fe9-a30a-42ae-967d-03c6121e7e1a-c000.snappy.parquet',
 'ssb-prod-dapla-felles-data-delt/temp/timeseries.parquet/part-00001-a5d51fe9-a30a-42ae-967d-03c6121e7e1a-c000.snappy.parquet']

Hvis denne parquet-filen hadde vært partisjonert etter en kolonne, så ville det vært egne undermapper med navnestruktur `column_name=value` som indikerte hva filen er partisjonert på. Siden vi her bruker en maskin og har et lite datasett, valgte Spark å ikke partisjonere. 

## PySpark og SQL

Du kan også skrive SQL med Spark. For å skrive SQL må vi først lage et `temporary view`. Her kaller vi de **tidsserie**.

In [8]:
df.createOrReplaceTempView("tidsserie")

Vi kan deretter skrive en SQL-statement som vi ønsker å kjøre på viewet:

In [9]:
query = "SELECT * FROM tidsserie WHERE Year > 2010"

Deretter kan vi bruke det til å filtrere datasettet:

In [10]:
result_df = spark.sql(query)
result_df.select("Date", "Year", "Quarter", "Month", "serie00", "serie01").show()

+----------+----+-------+-----+------------------+------------------+
|      Date|Year|Quarter|Month|           serie00|           serie01|
+----------+----+-------+-----+------------------+------------------+
|2011-01-01|2011|      1|   01| 9.854707963962703|19.133946043669454|
|2011-02-01|2011|      1|   02|10.179074587939663|  20.5107178162615|
|2011-03-01|2011|      1|   03|10.739177215503856| 21.06622572477892|
|2011-04-01|2011|      2|   04|  9.97595627489884| 20.49656707675814|
|2011-05-01|2011|      2|   05| 9.498355280804706| 18.63779112173551|
|2011-06-01|2011|      2|   06| 10.99071563452657| 21.66622684522529|
|2011-07-01|2011|      3|   07| 9.447888296305013|18.147813688294338|
|2011-08-01|2011|      3|   08|  9.26056155464723| 20.07349552968892|
|2011-09-01|2011|      3|   09|10.280652771851381| 19.91685876971767|
|2011-10-01|2011|      4|   10|  11.7096811436004|22.602236548172403|
+----------+----+-------+-----+------------------+------------------+

