# Introduksjon til Delta Lake

Delta Lake er et databaselag som kan *legges over* parquet-filer i bøtter. Det kan gi oss mye av den funksjonaliteten vi har vært vant til i relasjonelle databaser og datavarehus. I denne notebooken vises hvordan man kan ta det i bruk på Dapla. 

## Oppsett

Hvis du logger deg inn på <https://jupyter.dapla.ssb.no/> kan du ta i bruk Delta Lake via Spark. Men for å gjøre det må du installere [delta-spark](https://pypi.org/project/delta-spark/). For å installere pakken må du jobbe i et [ssb-project](https://manual.dapla.ssb.no/jobbe-med-kode.html#ssb-project). I tillegg må du installere `delta-spark`-versjon som er kompatibel med Spark-versjonen som er installert på Dapla. Gjør derfor følgende:

1. [Opprett et ssb-project](https://manual.dapla.ssb.no/jobbe-med-kode.html#ssb-project) med kommandoen:  
`ssb-project create test-delta-lake`
2. I terminalen skriver du følgende for å sjekke Spark-versjonen som er installert:  
`spark-shell --version`
3. [Sjekk hvilken **delta-spark**-versjon](https://docs.delta.io/latest/releases.html) som er kompatibel med din Spark-versjon og installer den med kommandoen^[I eksempelet er det Spark V3.3.1 som er installert og jeg installerer derfor delta-spark v2.3]:   
`poetry add delta-spark@2.3`
4. Åpne en ny notebook og velg kernel `test-delta-lake`. 

Nå har du satt opp et virtuelt miljø med en PySpark-kernel som kjører en maskin (såkalt Pyspark local kernel), der du har installert delta-spark. Vi kan nå importere de bibliotekene vi trenger og sette igang en Spark-session. 

In [None]:
# Importerer biblioteker
import pyspark
from delta import *

Deretter initialiserer jeg en Spark-session. `%%capture_output` er med for å forhindre at det ikke blir printet ut sensitiv informasjon i en notebook som skal inn i Dapla-manualen. 

In [None]:
%%capture captured_output
%run /usr/local/share/jupyter/kernels/pyspark_local/init.py

## Genererer noe data

In [None]:
# Genererer noe data med Spark
data = spark.range(10, 15)
data.show()

## Skriver ut en Delta Lake Table

La oss skrive ut Spark DataFrame som en Delta Lake Table og bli kjent med strukturen i objektet:

In [None]:
%%time
data.write.format("delta").mode("overwrite").save(
    "gs://ssb-dapla-felles-data-delt-prod/temp4"
)

Vi kan deretter printe ut hva som ble opprettet når vi skrev ut en Delta Lake Table:

In [None]:
from dapla import FileClient

fs = FileClient.get_gcs_file_system()

fs.glob("gs://ssb-dapla-felles-data-delt-prod/temp4/**")

### Delta Lake Tabellstruktur

Mappenstrukturen du ser over er typisk for en Delta Lake-tabell. La oss bryte ned komponentene:

1. **`_delta_log/`**:
   - Dette er transaksjonsloggmappen for Delta Lake-tabellen. Den inneholder en serie med JSON-filer som registrerer alle transaksjoner gjort på tabellen.
   - Transaksjonsloggen er avgjørende for Delta Lakes ACID-transaksjonsegenskaper, som muliggjør funksjoner som atomiske forpliktelser, tilbakerullinger og tid-reise-forespørsler.

2. **`_delta_log/00000000000000000000.json`**:
   - Dette er en JSON-fil innenfor transaksjonsloggkatalogen. Hver JSON-fil i denne mappen tilsvarer en transaksjon (eller en batch av transaksjoner) gjort på tabellen.
   - Filnavnet er null-fylt og representerer transaksjonsversjonen. I dette tilfellet representerer `00000000000000000000.json` den aller første versjonen (versjon 0) av tabellen. Etter hvert som flere transaksjoner blir gjort, vil nye filer bli lagt til med økende versjonsnumre.

3. **`part-00000-450715bd-6b0c-4827-bb8a-b0265505ca72-c000.snappy.parquet`** og **`part-00001-977ed55f-5ce5-469f-8a1d-9eafb143c215-c000.snappy.parquet`**:
   - Dette er de faktiske datafilene til Delta Lake-tabellen, lagret i Parquet-format.
   - `.snappy.parquet`-utvidelsen indikerer at dataene er komprimert ved hjelp av Snappy-komprimeringsalgoritmen.
   - Filnavnene er typiske for Sparks navngivningskonvensjon for datadeler. Prefiksene `part-00000` og `part-00001` indikerer partisjonsnumre. De lange UUID-lignende strengene er unike identifikatorer for datafilene. Suffikset `c000` indikerer Spark-oppgaven som skrev filen.

Mappen representerer en Delta Lake-tabell med data lagret i Parquet-format og en transaksjonslogg som sporer endringer i tabellen. Tilstedeværelsen av `_delta_log`-mappen og innholdet er det som skiller en Delta Lake-tabell fra et vanlig Parquet-datasett.


## Lese inn tabell



In [None]:
deltaTable = DeltaTable.forPath(spark, "gs://ssb-dapla-felles-data-delt-prod/temp4")
deltaTable.toDF().show()

## Modifisere datasettet

La oss modifisere datasettet ved å bytte ut verdien `13` med `15` i `id`-kolonnen:

In [None]:
from pyspark.sql.functions import col, lit

# Update the cell with value 13 to 15
deltaTable.update(condition=(col("id") == 13), set={"id": lit(15)})

Et viktig poeng å få med seg her er at vi nå oppdaterte Delta Lake Table objektet både i minnet og på disk. La oss bevise det ved å lese inn fra disk:

In [None]:
deltaTable2 = DeltaTable.forPath(spark, "gs://ssb-dapla-felles-data-delt-prod/temp4")
deltaTable2.toDF().show()

Og deretter ved å printe ut det opprinnelige objektet vi leste inn:

In [None]:
deltaTable.toDF().show()

## Append data

La oss legge til verdiene `20` og `21` til datasettet. Først lager vi en Spark dataframe:

In [None]:
new_data = [(20,), (21,)]
new_df = spark.createDataFrame(new_data, ["id"])
new_df.show()

Deretter kan vi appendere det til vår opprinnelige dataframe:

In [None]:
new_df.write.format("delta").mode("append").save(
    "gs://ssb-dapla-felles-data-delt-prod/temp4"
)

In [None]:
deltaTable.toDF().show()

## Historien og metadata til filen

Nå som vi har gjort noen endringer kan vi se på historien til filen:

In [None]:
# Lister ut filene i bøtta
fs = FileClient.get_gcs_file_system()
fs.glob("gs://ssb-dapla-felles-data-delt-prod/temp4/**")

Vi ser av transaksjonsloggen i `_delta_log`-mappen at det nå har vært 3 transaksjoner på datasettet. vi ser også av navnene på parquet-filene, `part-00000` og `part-00001`, at det finnes to versjoner av filen. Hvis vi ønsker å bli bedre kjent med hva slags informasjon som blir lagret fra transaksjonene, så kan vi printe ut den siste transaksjonen som heter `00000000000000000002.json`:

In [None]:
import json

from dapla import FileClient

# Kobler oss på bøttene
fs = FileClient.get_gcs_file_system()

# Filsti
path = "gs://ssb-dapla-felles-data-delt-prod/temp4/_delta_log/00000000000000000002.json"

with fs.open(path, "r") as f:
    for line in f:
        data = json.loads(line)
        pretty_content = json.dumps(data, indent=4)
        print(pretty_content)
        print("-" * 50)  # Print separator for clarity

Her ser vi at vi får masse informasjon om endringen, både metadata om transaksjonen i `commitInfo`, og informasjon om nye data-filer i  `add`-delen. Vi ser at det er en veldig rik beskrivelse av endringer, men det kan være vanskeig å lese innholdet direkte. La oss heller bruke `history()`-funksjonen som kommer med `delta`:

In [None]:
history = deltaTable.history()
history.show()

Siden det blit trangt i tabellen over så kan vi velge hvilke variabler vi ønsker å se på:

In [None]:
# Oversikt over alle kolonner som finnes i historien
history.columns

In [None]:
# Velger de kolonnene jeg er interessert i
selected_history = deltaTable.history().select(
    "version", "timestamp", "operation", "operationParameters"
)

In [None]:
# Display the selected columns
selected_history.show(truncate=50)

## Egendefinert metadata

Delta Lake støtter også egendefinert metadata. Det kan for eksempel være nyttig hvis man ønsker å bruke Delta Lake som en backend for et GUI som lar brukeren oppdatere en tabell fra GUI-et. Da ønsker man typisk å lagre hvem som gjorde endringer og når det ble gjort. La oss legge på noe slik metadata:

In [None]:
# Leser inn filen
df = spark.read.format("delta").load("gs://ssb-dapla-felles-data-delt-prod/temp4")

In [None]:
#| label: show-meta
#| code-fold: true

# Lagrer egendefinert metadata i en json-fil
import json

metadata = {
    "comment": "Kontaktet oppgavegiver og kranglet!",
    "manueltEditert": "True",
    "maskineltEditert": "False",
}
metadata

Vi kan deretter appende dette til den siste versjonen av fila. 

In [None]:
(
    df.write.format("delta")
    .mode("append")
    .option("userMetadata", json.dumps(metadata))  # Serialize metadata to a string
    .save("gs://ssb-dapla-felles-data-delt-prod/temp4")
)

In [None]:
# Laster inn tabellen
deltaTable = DeltaTable.forPath(spark, "gs://ssb-dapla-felles-data-delt-prod/temp4")

# Henter ut historien
history_df = deltaTable.history()

In [None]:
# Show the operation details, including metadata
history_df.select(
    "version", "timestamp", "operation", "operationParameters", "userMetadata"
).show(truncate=10)

In [None]:
history_df.select("version", "userMetadata").show(truncate=50)

Vi ser at vi la til vår egen metadata i versjon 3 av fila. Vi kan printe ut den rå transaksjonsloggen som tidligere, men nå er vi på transaksjon 3 `00000000000000000003.json`:

In [None]:
from dapla import FileClient

# Kobler oss på bøttene
fs = FileClient.get_gcs_file_system()

# Filsti
path = "gs://ssb-dapla-felles-data-delt-prod/temp4/_delta_log/00000000000000000003.json"

with fs.open(path, "r") as f:
    for line in f:
        data = json.loads(line)
        pretty_content = json.dumps(data, indent=4)
        print(pretty_content)
        print("-" * 50)  # Print separator for clarity

Vi er da at metadataene ligger som forventet i metadata-delen. 