## Beregning av statistiske størrelser med pyspark
### I denne noten viser vi hvordan vi kan beregne statistiske størrelser i pyspark. Utgangspunkt for beregninger er parquet datasett som leses inn fra dapla.

#### Innhenter verktøy fra bibliotek
Import-stegene henter inn bibliotek med kode og funksjoner utviklet ekstern.

In [None]:
from datetime import datetime
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import SQLContext
import pyspark.sql.functions as F
import numpy as np
from pyspark.sql import Row


#### Vis tilgjengelige datasett
Kjører metoden read.path for å få oversikt over hvilke parquet datasett som er tilrettelagt i tilknytning til veilderens lagringsområde i sky. <br> Oversikten blir lest inn i egen dataframe - df_datsets.<br>
Aktuelt lagringsområde blir lagt inn som parameter (string objekt som vi definerer selv) INNDATA_PATH.

In [None]:
INNDATA_PATH = '/felles/veiledning/datasett/*'
df_datasets = spark.read.path(INNDATA_PATH)


df_datsets skrives ut i output vindu. 

In [None]:
df_datasets.show(100, False)

Parquet datasettene df_sammensatt_pyspark og df_sammensatt_python som vises når man kjører paragrafen over, skal i utgangspunktet være identiske.<br> 
De er produsert i henholdsvis pyspark og python. I vårt eksempel tar vi utgangspunkt i  df_sammensatt_pyspark.

#### Leser inn parquet datasett
Leser inn parquet datasett df_sammensatt_pyspark, selekterer de variable vi skal bruke og etablerer dataframen df_areal_bnp_og_innbyggerantall

In [None]:
df_areal_bnp_og_innbyggerantall = spark.read.path('/felles/veiledning/datasett/df_sammensatt_pyspark').select('Land', 'Areal', 'BNP', 'Innbyggerantall')
df_areal_bnp_og_innbyggerantall.show()

#### Beregninger 
Vi ønsker som utgangspunkt å beregne størrelsene count, mean, std, min, max og median for alle variable i df_areal_bnp_og_innbyggerantall. Ved å kjøre pyspark metoden describe på datasettet får vi ut størrelsene count, mean, std, min og max. Dvs at pyspark describe har ikke støtte for å beregne medianen.
### Beregninger - eksempel 1
Vi ønsker som utgangspunkt å beregne størrelsene count, mean, std, min, max og median for alle variable i df_areal_bnp_og_innbyggerantall. Ved å kjøre metoden summary på datasettet får vi ut  størrelsene count, mean, std, min, max, 25% percentile, 50% percentile og 75% percentile.  Vi betrakter 50% percentile som medianen. Dvs at vi får ut alle de størrelser vi ønsker å beregne i dette eksempelet kun ved å describe metoden i pandas. 

##### 1. Kjører ut statistiske størrelser
Kjører ut statistiske størrelser fra dataframe df_areal_bnp_og_innbyggerantall med metoden summary. Resultat fra metode blir lagt i dataframen df_stat.

In [None]:
df_stat = df_areal_bnp_og_innbyggerantall.summary()
df_stat.show()

##### 2. Transformerer df_stat 
Transformerer df_stat (med bl.a. pivot funksjonen) slik at rader i df_stat blir kolonner i dataframe df_stat_piv. Formål er å gjøre output mer oversiktlig ved at alle størrelser på en gitt variabel blir liggende i samme rad. Dette kan være hensiktsmessig i de tilfeller antall "statistikkvaiable" er høyt (dvs langt høyere enn de fire som inngår i eksempel).

In [None]:
var_lst = list(df_stat.columns)
var_lst.remove('summary')
i = 1
for col in var_lst:
    df_tmp = df_stat.groupBy().pivot('summary').agg(F.sum(col)).withColumn('variabel',F.lit(col))
    if i == 1:
        df_stat_piv = df_tmp 
    else:
        df_stat_piv = df_stat_piv.unionByName(df_tmp)
    i = i + 1

In [None]:
df_stat_piv.show(100, False)

### Beregninger eksempel 2
I eksempel 1 fikk vi beregnet de størrelsene vi var ute etter kun ved å bruke metoden summary. Hvis vi ønsker å beregne størrelser som ikke er støttet av summary metoden kan vi bruke andre funksjoner og/eller bibliotek.

I eksempel 2 ønsker vi å beregne skewness gjennom å bruke en funksjon fra pyspark.sql modulen. Videre ønsker vi i samme steg å inkludere skewness "sammen" med størrelsene vi får ved å kjøre summary metoden. Resultat skal transformeres på tilsvarende måte som i eksempel 1.

Vi ønsker å etablere datasett hvor skewness er satt sammen med størrelsene vi får fra summary. For å få til dette må vi gjennomføre flere steg.

##### 1. Kjører ut statistiske størrelser med describe
Kjører ut statistiske størrelser fra dataframe df_areal_bnp_og_innbyggerantall med metoden describe. Resultat fra metode blir lagt i dataframen df_stat.

In [None]:
df_stat_adv = df_areal_bnp_og_innbyggerantall.summary()
df_stat_adv.show()

Printer schemaet til df_stat_adv

In [None]:
df_stat_adv.printSchema()

##### 2. Beregner skewness
Beregner størrelsen skewness for hver kolonne, med unntak av summary kolonne, i df_areal_bnp_og_innbyggerantall med funksjonen skewness. Beregningene gjøres i en for loop. Resultat fra funksjon blir returnert inn i egne dataframes (ett pr variabel fra dataframen df_stat_adv).  Datasettene kobles så sammen slik at vi får ett datasett der skewness for alle variable er samlet i en rad. 

In [None]:
var_lst = list(df_stat_adv.columns)
var_lst.remove('summary')
i = 1
for col in var_lst:
     df_tmp = df_areal_bnp_og_innbyggerantall.agg(F.skewness(col).alias(col))
     if i == 1:
         df_stat_ske = df_tmp
     else:        
         df_stat_ske = df_stat_ske.join(df_tmp)
     i = i + 1  
df_stat_ske = df_stat_ske.withColumn("summary", F.lit('skewness'))    
df_stat_ske.show()    

##### 3. Slår sammen df_stat og df_stat_ske
Slår sammen df_stat og df_stat_ske. Viktig at man her bruker unionByName fordi rekkefølgen på variablene ikke er identisk.

In [None]:
df_stat_sammensatt = df_stat_adv.unionByName(df_stat_ske)
df_stat_sammensatt.show()

##### 4. Transformerer df_stat_sammensatt 
Transformerer df_stat (med bl.a. pivot funksjonen) slik at rader i df_stat blir kolonner i dataframe df_stat_piv. Formål er å gjøre output mer oversiktlig ved at alle størrelser på en gitt variabel blir liggende i samme rad. Dette kan være hensiktsmessig i de tilfeller antall "statistikkvariable" er høyt (dvs langt høyere enn de fire som inngår i eksempel).

In [None]:
var_lst = list(df_stat_sammensatt.columns)
var_lst.remove('summary')
i = 1
for col in var_lst:
    df_tmp = df_stat_sammensatt.groupBy().pivot('summary').agg(F.sum(col)).withColumn('variabel',F.lit(col))
    if i == 1:
        df_stat_sammensatt_piv = df_tmp 
    else:
        df_stat_sammensatt_piv = df_stat_sammensatt_piv.unionByName(df_tmp)
    i = i + 1
    
    


In [None]:
df_stat_sammensatt_piv.show(100, False)