# Parquet - Hvordan få oversikt og flate ut komplekse filstukturer

## Praktiske eksempler fra Sirius Næringsteamet

I Dapla er det filformatet Parquet som benyttes. 

Apache Parquet is a file format designed to support fast data processing for complex data, with several notable characteristics:
  
1. Columnar: Unlike row-based formats such as CSV or Avro, Apache Parquet is column-oriented – meaning the values of each table column are stored next to each other, rather than those of each record:
![Column](Parquet_column.png)
2. Open-source: Parquet is free to use and open source under the Apache Hadoop license, and is compatible with most Hadoop data processing frameworks.
3. Self-describing: In Parquet, metadata including schema and structure is embedded within each file, making it a self-describing file format. 

#### Hvordan gå fra dette:

![Column](parquet_raadata.PNG)

#### Til dette:

![Column](parquet_utflatet.PNG)

### Importerer nødvendige pakker - kode kjøres på Pyspark

In [1]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *

#### Lister ut alle filer i en mappe på Dapla

In [2]:
df1 = spark.read.path("/kilde/ske/skatt/naering/naeringsopplysninger/2020/syntetisk/*")
df1.show(10, False)

+---------------------------------------------------------------------+
|path                                                                 |
+---------------------------------------------------------------------+
|/kilde/ske/skatt/naering/naeringsopplysninger/2020/syntetisk/20210709|
|/kilde/ske/skatt/naering/naeringsopplysninger/2020/syntetisk/20210819|
+---------------------------------------------------------------------+



#### Lester inn ønsket Parquet-fil og tar ut en oversikt over filstrukturen

In [3]:
df1 = spark.read.path("/kilde/ske/skatt/naering/naeringsopplysninger/2020/syntetisk/20210819")

df1.printSchema()

root
 |-- skjermet: boolean (nullable = true)
 |-- norskIdentifikator: string (nullable = true)
 |-- inntektsaar: string (nullable = true)
 |-- regnskapsperiode: struct (nullable = true)
 |    |-- start: string (nullable = true)
 |    |-- slutt: string (nullable = true)
 |-- resultatregnskap: struct (nullable = true)
 |    |-- driftsinntekt: struct (nullable = true)
 |    |    |-- sumDriftsinntekt: struct (nullable = true)
 |    |    |    |-- beloep: double (nullable = true)
 |    |    |    |-- erOverstyrt: boolean (nullable = true)
 |    |    |-- salgsinntekter: struct (nullable = true)
 |    |    |    |-- salgsinntekt: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |    |-- salgsinntektstype: string (nullable = true)
 |    |    |    |    |    |-- beloep: struct (nullable = true)
 |    |    |    |    |    |    |-- beloep: double (nullable = true)
 |    |    |    |-- 

#### Tar ut første observasjon i datasettet

In [5]:
df1.head(1)

[Row(skjermet=None, norskIdentifikator='1000000001', inntektsaar='2020', regnskapsperiode=Row(start='2020-01-01T00:00:00Z', slutt='2020-31-12T23:59:59Z'), resultatregnskap=Row(driftsinntekt=Row(sumDriftsinntekt=Row(beloep=74364.0, erOverstyrt=None), salgsinntekter=Row(salgsinntekt=[Row(id='id_1_0', salgsinntektstype='3100', beloep=Row(beloep=88667.0)), Row(id='id_1_1', salgsinntektstype='3200', beloep=Row(beloep=23002.0))], id='id_1_0'), andreDriftsinntekter=Row(annenDriftsinntekt=None, id='id_1_0')), driftskostnad=Row(sumDriftskostnad=Row(beloep=68472.0, erOverstyrt=None), varekostnader=Row(varekostnad=[Row(id='id_1_0', varekostnadstype='4995', beloep=Row(beloep=68761.0)), Row(id='id_1_1', varekostnadstype='4500', beloep=Row(beloep=58811.0)), Row(id='id_1_2', varekostnadstype='4500', beloep=Row(beloep=7810.0))], id='id_1_0'), loennskostnader=Row(loennskostnad=[Row(id='id_1_0', loennskostnadstype='5400', beloep=Row(beloep=54844.0))], id='id_1_0'), andreDriftskostnader=Row(annenDriftsko

#### Ønsker bare å se på tema "Balanse" for å få bedre oversikt¶

In [7]:
df1 = spark.read.path("/kilde/ske/skatt/naering/naeringsopplysninger/2020/syntetisk/20210819").select('norskIdentifikator', 'balanse')
df1.printSchema()

root
 |-- norskIdentifikator: string (nullable = true)
 |-- balanse: struct (nullable = true)
 |    |-- anleggsmiddel: struct (nullable = true)
 |    |    |-- sumAnleggsmiddelSkattemessigVerdi: struct (nullable = true)
 |    |    |    |-- beloep: double (nullable = true)
 |    |    |    |-- erOverstyrt: boolean (nullable = true)
 |    |    |-- sumAnleggsmiddelRegnskapsmessigVerdi: struct (nullable = true)
 |    |    |    |-- beloep: double (nullable = true)
 |    |    |    |-- erOverstyrt: boolean (nullable = true)
 |    |    |-- balanseverdiForAnleggsmidler: struct (nullable = true)
 |    |    |    |-- id: string (nullable = true)
 |    |    |    |-- balanseverdiForAnleggsmiddel: array (nullable = true)
 |    |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |    |-- id: string (nullable = true)
 |    |    |    |    |    |-- anleggsmiddeltype: string (nullable = true)
 |    |    |    |    |    |-- skattemessigVerdi: struct (nullable = true)
 |    |    |    

#### Finnes 2 typer av datastukturer
    Struct
    Array

#### Behandling av STRUCT

*    sumAnleggsmiddelSkattemessigVerdi
*    sumAnleggsmiddelRegnskapsmessigVerdi
*
*omloepsmiddel
*    sumOmloepsmiddelSkattemessigVerdi
*    sumOmloepsmiddelRegnskapsmessigVerdi
*
*gjeldOgEgenkapital
*    sumLangsiktigGjeldSkattemessigVerdi
*    sumLangsiktigGjeldRegnskapsmessigVerdi
*    sumKortsiktigGjeldSkattemessigVerdi
*sumKortsiktigGjeldRegnskapsmessigVerdi
*    sumEgenkapital
*
*sumEiendelSkattemessigVerdi
*sumEiendelRegnskapsmessigVerdi
*sumGjeldOgEgenkapitalSkattemessigVerdi
*sumGjeldOgEgenkapitalRegnskapsmessigVerdi

### Eksempel på utflating av en enkelt STRUCT-variabel - sumAnleggsmiddelSkattemessigVerdi

In [4]:
#Tar ut bare sumAnleggsmiddelSkattemessigVerdi og renamer beloep til sumAnleggsmiddelSkattemessigVerdi
balanse_struct = df1.select(df1.norskIdentifikator,\
                          df1.balanse.anleggsmiddel.sumAnleggsmiddelSkattemessigVerdi.beloep.alias('sumAnleggsmiddelSkattemessigVerdi'),\
                              )

balanse_struct.printSchema()

root
 |-- norskIdentifikator: string (nullable = true)
 |-- sumAnleggsmiddelSkattemessigVerdi: double (nullable = true)



In [5]:
#lister ut de 10 første observasjonene
balanse_struct.show(10, False)

+------------------+---------------------------------+
|norskIdentifikator|sumAnleggsmiddelSkattemessigVerdi|
+------------------+---------------------------------+
|1000000001        |103.0                            |
|1000000002        |34576.0                          |
|1000000003        |64228.0                          |
|1000000004        |66903.0                          |
|1000000005        |51454.0                          |
|1000000006        |68572.0                          |
|1000000007        |92094.0                          |
|1000000008        |10248.0                          |
|1000000009        |63435.0                          |
|1000000010        |75831.0                          |
+------------------+---------------------------------+
only showing top 10 rows



### Eksempel på utflating av flere STRUCT-variabler i samme steg

In [6]:
#tar ut flere variabler samtidig og renamer beloep-variabler
balanse_struct = df1.select(df1.norskIdentifikator,\
                          df1.balanse.anleggsmiddel.sumAnleggsmiddelSkattemessigVerdi.beloep.alias('sumAnleggsmiddelSkattemessigVerdi'),\
                          df1.balanse.anleggsmiddel.sumAnleggsmiddelRegnskapsmessigVerdi.beloep.alias('sumAnleggsmiddelRegnskapsmessigVerdi'),\
                          df1.balanse.omloepsmiddel.sumOmloepsmiddelSkattemessigVerdi.beloep.alias('sumOmloepsmiddelSkattemessigVerdi'),\
                          df1.balanse.omloepsmiddel.sumOmloepsmiddelRegnskapsmessigVerdi.beloep.alias('sumOmloepsmiddelRegnskapsmessigVerdi'),\
                                                                             )
balanse_struct.printSchema()
balanse_struct.show(10, False)

root
 |-- norskIdentifikator: string (nullable = true)
 |-- sumAnleggsmiddelSkattemessigVerdi: double (nullable = true)
 |-- sumAnleggsmiddelRegnskapsmessigVerdi: double (nullable = true)
 |-- sumOmloepsmiddelSkattemessigVerdi: double (nullable = true)
 |-- sumOmloepsmiddelRegnskapsmessigVerdi: double (nullable = true)

+------------------+---------------------------------+------------------------------------+---------------------------------+------------------------------------+
|norskIdentifikator|sumAnleggsmiddelSkattemessigVerdi|sumAnleggsmiddelRegnskapsmessigVerdi|sumOmloepsmiddelSkattemessigVerdi|sumOmloepsmiddelRegnskapsmessigVerdi|
+------------------+---------------------------------+------------------------------------+---------------------------------+------------------------------------+
|1000000001        |103.0                            |69257.0                             |51372.0                          |74879.0                             |
|1000000002        |34576.

### Lagrer resultatet som en dataframe på dapla - som så kan leses inn i Python

In [7]:
balanse_struct.write\
    .option("valuation", "INTERNAL")\
    .option("state", "INPUT")\
    .path('/produkt/skatt/naering/temp/parquet_demo/balanse_struct')

## Behandling av ARRAYS - balanseverdiForAnleggsmiddel

#### Arrays er en kompleks datastruktur som kan inneholde flere forekomster av en identifikator

In [26]:
balanse_array = 'balanseverdiForAnleggsmidler'
balanse_array = df1.select('norskIdentifikator', 'balanse.anleggsmiddel.balanseverdiForAnleggsmidler.balanseverdiForAnleggsmiddel')

balanse_array.printSchema()
balanse_array.show(10, False)


root
 |-- norskIdentifikator: string (nullable = true)
 |-- balanseverdiForAnleggsmiddel: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- id: string (nullable = true)
 |    |    |-- anleggsmiddeltype: string (nullable = true)
 |    |    |-- skattemessigVerdi: struct (nullable = true)
 |    |    |    |-- beloep: double (nullable = true)
 |    |    |-- regnskapsmessigVerdi: struct (nullable = true)
 |    |    |    |-- beloep: double (nullable = true)
 |    |    |-- overfoeresIkkeTilSkattemeldingen: boolean (nullable = true)

+------------------+--------------------------------------------------------------------------------------------------------------------------------------+
|norskIdentifikator|balanseverdiForAnleggsmiddel                                                                                                          |
+------------------+----------------------------------------------------------------------------------------------------

#### Bruker EXPLODE funksjonen for å flate ut data

In [8]:
balanse_array = 'balanseverdiForAnleggsmidler'
balanse_array = df1.select('norskIdentifikator', 'balanse.anleggsmiddel.balanseverdiForAnleggsmidler.balanseverdiForAnleggsmiddel')

balanse_array = balanse_array.select('norskIdentifikator', 'balanseverdiForAnleggsmiddel' )\
         .withColumn('sonavn', F.explode('balanseverdiForAnleggsmiddel')).select('norskIdentifikator', 'sonavn.*').drop('balanseverdiForAnleggsmiddel').na.fill(0)


balanse_array.show(10, False)
balanse_array.printSchema()

+------------------+------+-----------------+-----------------+--------------------+--------------------------------+
|norskIdentifikator|id    |anleggsmiddeltype|skattemessigVerdi|regnskapsmessigVerdi|overfoeresIkkeTilSkattemeldingen|
+------------------+------+-----------------+-----------------+--------------------+--------------------------------+
|1000000002        |id_2_0|1130             |[21610.0]        |[24512.0]           |null                            |
|1000000002        |id_2_1|1117             |[70829.0]        |[72071.0]           |null                            |
|1000000003        |id_3_0|1221             |[32746.0]        |[49578.0]           |null                            |
|1000000003        |id_3_1|1221             |[31348.0]        |[98614.0]           |null                            |
|1000000003        |id_3_2|1312             |[30939.0]        |[82373.0]           |true                            |
|1000000004        |id_4_0|1380             |[95260.0]  

##### Nå er vi nesten i mål - Henter ut 'beloep'-verdiene og renamer disse

In [9]:
balanse_array = balanse_array\
        .select('norskIdentifikator', 'id', 'anleggsmiddeltype', 'skattemessigVerdi.beloep', 'regnskapsmessigVerdi.beloep', 'overfoeresIkkeTilSkattemeldingen')\
        .toDF('norskIdentifikator', 'id', 'anleggsmiddeltype', 'skattemessigVerdi', 'regnskapsmessigVerdi', 'overfoeresIkkeTilSkattemeldingen')
balanse_array.show(10, False)
balanse_array.printSchema()

+------------------+------+-----------------+-----------------+--------------------+--------------------------------+
|norskIdentifikator|id    |anleggsmiddeltype|skattemessigVerdi|regnskapsmessigVerdi|overfoeresIkkeTilSkattemeldingen|
+------------------+------+-----------------+-----------------+--------------------+--------------------------------+
|1000000002        |id_2_0|1130             |21610.0          |24512.0             |null                            |
|1000000002        |id_2_1|1117             |70829.0          |72071.0             |null                            |
|1000000003        |id_3_0|1221             |32746.0          |49578.0             |null                            |
|1000000003        |id_3_1|1221             |31348.0          |98614.0             |null                            |
|1000000003        |id_3_2|1312             |30939.0          |82373.0             |true                            |
|1000000004        |id_4_0|1380             |95260.0    

#### Lagrer resultatet som en dataframe på dapla - som så kan leses inn i Python


In [29]:
balanse_array.write\
    .option("valuation", "INTERNAL")\
    .option("state", "INPUT")\
    .path('/produkt/skatt/naering/temp/parquet_demo/balanse_array')