# Leser inn bibliotek og genererer testdata

## Leser inn bibliotek og setter opp spark session

In [None]:
import dapla as dp
import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import datetime
spark

In [None]:
%run ../../ssb_sparktools/processing/processing.py

In [None]:
myspark = SparkSession.builder.getOrCreate()

## Lager testdata 

#### Hierarkiske data

In [None]:
hierarki_schema = StructType([
                              StructField('persid', StringType(), False),
                              StructField('dato', StringType(), False),
                              StructField('arbeidsgiver',ArrayType(
                                                                      StructType([StructField('navn',StringType(),True),
                                                                                  StructField('adresse',StringType(),True),
                                                                                  StructField('ansatte', ArrayType(
                                                                                      StructType([StructField('navn', StringType(), True),
                                                                                                  StructField('adresse',StringType(),True)])))
                                                                                 ])
                                                                     )),
                              StructField('utdanning',ArrayType(
                                                                      StructType([StructField('utdanningsinstitusjon',StringType(),True),
                                                                                  StructField('adresse',StringType(),True),
                                                                                  StructField('utdanning', ArrayType(
                                                                                      StructType([StructField('fag', StringType(), True),
                                                                                                  StructField('eksamensdato',StringType(),True)])))
                                                                                 ])
                                                                     ))
                             ])
hierarkidata_raw = [('#ID1', '01Jan2020', [('Industri AS', 'Jernveien 24', [('Per', 'Storgata 3'),('Kari', 'Toppen 2')])], [('Mek Skole', 'Mek veien 1',[('Mekaniskefag', '21Jun2013'), ('Byggingeniør', '11Jun2018')])]),
                    ('#ID2', '02Mar2020', [('Lommerusk AS', 'Sliteveien 23', [('Espen', 'Ukjent'),('Ronny', 'Kaiegata 2')])], [('Harde Skole', 'Kjeppveien 10', [('Grunnskole', '19Jun2014')])]),
                    ('#ID3', '15Feb2020', [('Papir AS', 'Papirveien 24', [('Ole', 'Storgata 3'),('Siri', 'Toppen 3')])], [('Skogen Skole', 'Treveien 5', [('Papirfag', '21Jun2014'), ('Papiringeniør', '11Jun2012')])])]
hierarki_testdata = myspark.createDataFrame(hierarkidata_raw, hierarki_schema)

#### Skriver ut testdata som er laget

In [None]:
hierarki_testdata.printSchema()

In [None]:
hierarki_testdata.show()

# Test av funksjonen *cross_sectional*

### Uten spark_session som parameter

In [None]:
dato_hendelsedata = hierarki_testdata.withColumn('dato', F.to_timestamp('dato', "ddMMMyyyy"))
tverrsnitt = cross_sectional(dato_hendelsedata, 'dato', ['persid'])

In [None]:
tverrsnitt.show()

### Med spark_session som parameter

In [None]:
dato_hendelsedata = hierarki_testdata.withColumn('dato', F.to_timestamp('dato', "ddMMMyyyy"))
tverrsnitt = cross_sectional(dato_hendelsedata, 'dato', ['persid'])

In [None]:
tverrsnitt.show()

### Tar tverrsnitt på en gitt dato

In [None]:
REFDATO = '2020-03-01 00:00:00'
referansedato = datetime.strptime(REFDATO, '%Y-%m-%d %H:%M:%S')

In [None]:
tverrsnitt_co = cross_sectional(dato_hendelsedata, 'dato', ['persid'], coDate=referansedato)

In [None]:
tverrsnitt_co.show()

# Test av funksjonen *unpack_parquet*

### Uten spark_session som parameter

In [None]:
pakketut = unpack_parquet(hierarki_testdata)
for k in pakketut.keys():
    print(k)

### Med spark_session som parameter

In [None]:
pakketut = unpack_parquet(hierarki_testdata, spark_session=myspark)
for k in pakketut.keys():
    print(k)

### Viser utpakket data

In [None]:
pakketut['arbeidsgiver'].show()

In [None]:
pakketut['arbeidsgiver_ansatte'].show()

In [None]:
pakketut['utdanning'].show()

In [None]:
pakketut['utdanning_utdanning-child'].show()

In [None]:
print(f'Totalt antall dataframes: {len(pakketut)}')

In [None]:
test_dict_less = unpack_parquet(hierarki_testdata, levels=1, spark_session=myspark)
print(f'Totalt antall dataframes: {len(test_dict_less)}')