# RA data


TODO:

(- remove duplicated columns, e.g. $"lpk_ver_nr" === $"ver_nr")<br>




In [1]:
spark.catalog.listTables.show(false)

+------------------+--------+-----------+---------+-----------+
|name              |database|description|tableType|isTemporary|
+------------------+--------+-----------+---------+-----------+
|raw_tarif_init    |default |null       |EXTERNAL |false      |
|raw_tarif_update  |default |null       |EXTERNAL |false      |
|stg_aufenthaltsart|default |null       |EXTERNAL |false      |
|stg_deckung       |default |null       |EXTERNAL |false      |
|stg_ecdetail      |default |null       |EXTERNAL |false      |
|stg_eckopf        |default |null       |EXTERNAL |false      |
|stg_ecprodukt     |default |null       |EXTERNAL |false      |
|stg_familie       |default |null       |EXTERNAL |false      |
|stg_gemeinde      |default |null       |EXTERNAL |false      |
|stg_leipkopf      |default |null       |EXTERNAL |false      |
|stg_leippos       |default |null       |EXTERNAL |false      |
|stg_schadenart    |default |null       |EXTERNAL |false      |
|stg_tarif_init    |default |null       

In [2]:
val df_leiPKopf = spark.table("default.stg_LeiPKopf")
df_leiPKopf.printSchema()

root
 |-- lpk_id: integer (nullable = true)
 |-- lpk_ver_nr: integer (nullable = true)
 |-- lpk_fam_nr: integer (nullable = true)
 |-- lpk_deb_nr: integer (nullable = true)
 |-- lpk_vtg_nr: integer (nullable = true)
 |-- lpk_gde_id: short (nullable = true)
 |-- lpk_adr_id: integer (nullable = true)
 |-- lpk_ksk_nr: string (nullable = true)
 |-- lpk_ksk_nr_vst: string (nullable = true)
 |-- lpk_ksk_nr_vst2: string (nullable = true)
 |-- lpk_esr_nr: decimal(27,0) (nullable = true)
 |-- lpk_esr_nr_len: integer (nullable = true)
 |-- lpk_esr_kto: integer (nullable = true)
 |-- lpk_rech_nr: string (nullable = true)
 |-- lpk_rech_dat: timestamp (nullable = true)
 |-- lpk_rech_zahl_dat: timestamp (nullable = true)
 |-- lpk_rech_betr: decimal(19,4) (nullable = true)
 |-- lpk_korr_betr: decimal(19,4) (nullable = true)
 |-- lpk_kas_betr: decimal(19,4) (nullable = true)
 |-- lpk_beh_beg_dat: timestamp (nullable = true)
 |-- lpk_beh_end_dat: timestamp (nullable = true)
 |-- lpk_zaba_cd: string (nu

In [3]:
spark.table("default.stg_Versicherter").printSchema()

root
 |-- ver_nr: integer (nullable = true)
 |-- ver_id: integer (nullable = true)
 |-- ver_vea_id: integer (nullable = true)
 |-- ver_fam_nr: integer (nullable = true)
 |-- ver_name: string (nullable = true)
 |-- ver_vname: string (nullable = true)
 |-- ver_such1: string (nullable = true)
 |-- ver_sex_cd: string (nullable = true)
 |-- ver_geb_dat: timestamp (nullable = true)
 |-- ver_ahv: string (nullable = true)
 |-- ver_beruf: string (nullable = true)
 |-- ver_ziv_cd: string (nullable = true)
 |-- ver_nat_id: short (nullable = true)
 |-- ver_aea_id: integer (nullable = true)
 |-- ver_aea_beg_dat: timestamp (nullable = true)
 |-- ver_aea_end_dat: timestamp (nullable = true)
 |-- ver_beg_dat: timestamp (nullable = true)
 |-- ver_end_dat: timestamp (nullable = true)
 |-- ver_erf_uid: short (nullable = true)
 |-- ver_erf_dat: timestamp (nullable = true)
 |-- ver_mut_uid: short (nullable = true)
 |-- ver_mut_dat: timestamp (nullable = true)
 |-- ver_fehler: integer (nullable = true)
 |--

In [4]:
spark.table("default.stg_LeiPPos").count()

1127420

In [5]:
import org.apache.spark.sql.{DataFrame, SparkSession}
import java.time.LocalDate
import java.time.format.DateTimeFormatter
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.{lit, when, datediff, month, dayofmonth, last_day, round}

val join_type = "left" // "left_semi" would only take rows which match


In [6]:
val preciseMonth = false

/**
 * Function calculates the intersection of 2 date ranges in months (float). 
 * E.g. a customer has a membership from 12.05.2015 - 31.03.2020 and the target range is the year 2020,
 * results in 3.0 month.
 * @param startDate given start date, e.g customer membership start date
 * @param endDate given end date, e.g customer membership end date
 * @param rangeBegin reference start date
 * @param rangeEnd reference end date
 * @return month range
 */
def datesToMonths(startDate: Column, endDate: Column, rangeBegin: LocalDate, rangeEnd: LocalDate): Column = {
    val diff = datediff(lit(rangeEnd), startDate)
    // double check if there is a period within the specified range
    val colBegin = when(datediff(lit(rangeEnd), startDate) <= 0, null)
                .when(datediff(startDate, lit(rangeBegin)) <=0, rangeBegin)
                .otherwise(startDate)
    val colEnd = when(datediff(endDate, lit(rangeBegin)) <= 0, null)
                .when(datediff(lit(rangeEnd), endDate) <=0, rangeEnd)
                .otherwise(endDate)
   // compute the periode
   // -- TODO decide if computed with constant 30: m+d/30 or accurate
   val monthBeg = month(colBegin)-1 + (dayofmonth(colBegin) -1)/ dayofmonth(last_day(colBegin))
   val monthEnd = month(colEnd)-1 + dayofmonth(colEnd) / dayofmonth(last_day(colEnd))
   round( monthEnd - monthBeg, 2 )
   monthEnd - monthBeg
}
/**
 * Function joins the "Familie" table and the "Gemeinde" 
 *  - for entries in a selected period
 *  - only the Kanton
 *  - remove duplicates, e.g. if new address is in same Kanton
 * @param df_familie DataFrame for "Familie" table
 * @param df_gemeinde DataFrame for "Gemeinde" table
 * @param rangeBegin start date of considered period
 * @param rangeEnd end date of considered period
 */
def getFamKantons(df_familie: DataFrame, df_gemeinde: DataFrame, rangeBegin: LocalDate, rangeEnd: LocalDate) = {
    df_familie
        .where($"fam_end_dat" >= rangeBegin && $"fam_beg_dat" <= rangeEnd)
        .join(df_gemeinde.select("gde_id", "gde_kt"), $"fam_ra_gde_id" === $"gde_id", join_type).drop("fam_ra_gde_id")
        .withColumnRenamed("gde_kt", "Kanton")
        
}

In [7]:

val dayFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd")
val rangeStart = LocalDate.parse("2020-01-01", dayFormat)
val tmp = ""
//if (tmp == "") {
  val rangeEnd = rangeStart.plusYears(1).minusDays(1)
//} else {
//  val rangeEnd = LocalDate.parse(tmp, dayFormat)
//}

//val df_leiPPos = spark.table("default.stg_LeiPPos").select("lpp_lpk_id", "lpp_id")
//val df_leiPKopf = spark.table("default.stg_LeiPKopf").select("lpk_id", "lpk_ver_id", "lpk_rech_nr")
//val df_vers = spark.table("default.stg_Versicherter").select("ver_id", "ver_nnss_nr", "ver_fam_nr", "ver_geb_dat", "ver_sex_cd")
//val df_familie = spark.table("default.stg_Familie").select("fam_nr", "fam_ra_gde_id", "fam_beg_dat", "fam_end_dat")
//val df_gemeinde = spark.table("default.stg_Gemeinde").select("gde_id", "gde_kt")
//val df_ecKopf = spark.table("default.stg_ECKopf").select("eck_id", "eck_rech_nr")
//val df_ecDetail = spark.table("default.stg_ECDetail").select("ecd_eck_id", "ecd_ecp_id")
//val df_ecProdukt = spark.table("default.stg_ECProdukt").select("ecp_id", "ecp_produkt_nr")

val df_leiPPos = spark.table("default.stg_LeiPPos")
val df_leiPKopf = spark.table("default.stg_LeiPKopf")
val df_vers = spark.table("default.stg_Versicherter")
val df_familie = spark.table("default.stg_Familie")
val df_gemeinde = spark.table("default.stg_Gemeinde")
val df_ecKopf = spark.table("default.stg_ECKopf")
val df_ecDetail = spark.table("default.stg_ECDetail")
val df_ecProdukt = spark.table("default.stg_ECProdukt")

val df_familieWithMonths = df_familie.withColumn("fam_versmon", datesToMonths($"fam_beg_dat", $"fam_end_dat", rangeStart, rangeEnd))

val df = df_leiPPos
       // filter for specified time range
      .filter($"LPP_BEH_BEG_DAT" >= rangeStart && $"LPP_BEH_BEG_DAT" <= rangeEnd )

      // BAG-Nummer
      //TODO
      .withColumn("BAG-Nummer", lit("TBD"))

      // Jahr
      //TODO
      .withColumn("Jahr", lit("TBD"))


      // AHV-Nr: LeiPPos.LPP_LPK_ID → LeiPKopf.LPK_VER_NR → Versicherter.VER_NNSS_NR
      .join(df_leiPKopf, $"lpp_lpk_id" === $"lpk_id", join_type).drop("lpp_lpk_id")
      .join(df_vers, $"lpk_ver_id" === $"ver_id", join_type).drop("lpk_ver_nr")
      .withColumnRenamed("ver_nnss_nr", "AHV-Nummer")

      // Wohnkanton: LeiPPos.LPP_LPK_ID → LeiPKopf.LPK_VER_NR → Versicherter.VER_FAM_NR → Familie.FAM_RA_GDE_ID (Test auf Zeitraum) → Gemeinde.GDE_KT
      .join(df_familieWithMonths, $"ver_fam_nr" === $"fam_nr", join_type).drop("ver_fam_nr")
      .join(df_gemeinde, $"fam_ra_gde_id" === $"gde_id", join_type).drop("fam_ra_gde_id")
      .withColumnRenamed("gde_kt", "Kanton")

      // Geburtsjahr: LeiPPos.LPP_LPK_ID → LeiPKopf.LPK_VER_NR → Versicherter.VER_GEB_DAT
      .withColumnRenamed("ver_geb_dat", "Geburtsjahr")

      // Geschlecht: LeiPPos.LPP_LPK_ID → LeiPKopf.LPK_VER_NR → Versicherter.VER_SEX_CD
      .withColumnRenamed("ver_sex_cd", "Geschlecht")

      // Spital Aufenthalt:
      // Exakte Ein/Austritte Spital/Reha o.ä. ECP: LeiPPos.LPP_LPK_ID → LeiPKopf.LPK_RECH_NR → ECKopf.ECK_RECH_NR, ~.ECK_ID → ECKopfXtraCaseDetail.EXC_ECK_ID, ~.EXC_BEG_DAT, ~.EXC_END_DAT (korrekte Summierung)
      // Exakte Ein/Austritte Spital/Reha o.ä. Sumex/Secon: LeiPPos.LPP_LPK_ID → LpkCaseDetail.LCD_BEG_DAT,~.LCD_END_DAT 
      // TODO
      .withColumn("Aufenthalt", lit("TBD"))

      // GTIN: LeiPPos.LPP_LPK_ID → LeiPKopf.LPK_RECH_NR → ECKopf.ECK_RECH_NR, ~.ECK_ID → ECDetail.ECD_ECP_ID→ ECProdukt.ECP_PRODUKT_NR (PharmaCode)
      // 13-stellig
      //TODO double check
      .join(df_ecKopf, $"LPK_RECH_NR" === $"ECK_RECH_NR", join_type)
      .join(df_ecDetail, $"ECK_ID" === $"ECD_ECK_ID", join_type).drop("ECD_ECK_ID")
      .join(df_ecProdukt, $"ECD_ECP_ID" === $"ECP_ID", join_type).drop("ECD_ECP_ID")
      .withColumnRenamed("ecp_produkt_nr", "GTIN")

      // PharmaCode (5-7 stellig)
      //TODO
      .withColumn("PharmaCode", lit("TBD"))

      // Packungen - Da die Leistungserbringer nicht immer ganze Packungen abgeben (z.B. Abgabe einzelner Tabletten),
        // ist die Packungsanzahl mit zwei Kommastellen anzugeben (kaufmännisch gerundet).
      //TODO
      .withColumn("Packungen", lit("TBD"))

      // Monate - Die angebrochenen Versicherungsmonate bei Beginn und Ende der Versicherungsdeckung sind
        // in der Datenlieferung taggenau zu berücksichtigen (siehe Kapitel 3.7.1).
        // Die Versicherungsmonate sind deshalb mit zwei Kommastellen anzugeben (kaufmännisch gerundet).
      .withColumnRenamed("fam_versmon", "Monate")

      // Kosten - Angegeben werden müssen die Kosten vor Abzug der Kostenbeteiligung
        //(d.h. die Bruttokosten). Jeweils in Franken mit zwei Kommastellen anzugeben (kaufmännisch gerundet).
      //TODO
      .withColumn("Kosten", lit("TBD"))

      // Kostenbeteiligung - Jeweils in Franken mit zwei Kommastellen anzugeben (kaufmännisch gerundet).
      //TODO
      .withColumn("Kostenbeteiligung", lit("TBD"))

      //Output: BAG-Nummer, Jahr, AHV-Nummer, Kanton, Geburtsjahr, Geschlecht, Aufenthalt, GTIN, PharmaCode, Packungen, Monate, Kosten, Kostenbeteiligung
      //.select("BAG-Nummer", "Jahr", "AHV-Nummer", "Kanton", "Geburtsjahr", "Geschlecht", "Aufenthalt", "GTIN", "PharmaCode", "Monate", "Kosten", "Kostenbeteiligung")
      //.select("AHV-Nummer", "Kanton", "Geburtsjahr", "Geschlecht", "GTIN")

df.printSchema()

root
 |-- lpp_id: integer (nullable = true)
 |-- lpp_typ: integer (nullable = true)
 |-- lpp_lpl_id: short (nullable = true)
 |-- lpp_sca_id: integer (nullable = true)
 |-- lpp_bha_id: string (nullable = true)
 |-- lpp_ktk_id: integer (nullable = true)
 |-- lpp_kma_id: integer (nullable = true)
 |-- lpp_lsa_id: short (nullable = true)
 |-- lpp_dkg_id: short (nullable = true)
 |-- lpp_dgp_id: integer (nullable = true)
 |-- lpp_vde_card: integer (nullable = true)
 |-- lpp_vde_id: integer (nullable = true)
 |-- lpp_vde_pk: integer (nullable = true)
 |-- lpp_beh_beg_dat: timestamp (nullable = true)
 |-- lpp_pos_betr: decimal(19,4) (nullable = true)
 |-- lpp_npl_id: short (nullable = true)
 |-- lpp_np_text: string (nullable = true)
 |-- lpp_np_betr: decimal(19,4) (nullable = true)
 |-- lpp_rl_betr: decimal(19,4) (nullable = true)
 |-- lpp_fr_betr: decimal(10,4) (nullable = true)
 |-- lpp_fr_tot_betr: decimal(10,4) (nullable = true)
 |-- lpp_sb_betr: decimal(10,4) (nullable = true)
 |-- lpp_

In [19]:
//df.select("AHV-Nummer").show()
df.filter(not($"Monate" === 12.0 )).select("lpp_id", "lpk_id", "AHV-Nummer", "fam_id", "fam_nr", "fam_beg_dat", "fam_end_dat", "Monate", "Kanton", "gde_id").show()
df.count()

+------+------+-------------+------+-------+-------------------+-------------------+------+------+------+
|lpp_id|lpk_id|   AHV-Nummer|fam_id| fam_nr|        fam_beg_dat|        fam_end_dat|Monate|Kanton|gde_id|
+------+------+-------------+------+-------+-------------------+-------------------+------+------+------+
|     1|424130|7567055082854|   214|1000194|2012-10-26 00:00:00|2020-01-31 00:00:00|   1.0|    ZH|     2|
|     1|424132|7567055082854|   214|1000194|2012-10-26 00:00:00|2020-01-31 00:00:00|   1.0|    ZH|     2|
|     1|424132|7567055082854| 26965|1000194|2020-02-01 00:00:00|2050-01-01 00:00:00|  11.0|    ZH|     2|
|     1|448471|7567055082854|   214|1000194|2012-10-26 00:00:00|2020-01-31 00:00:00|   1.0|    ZH|     2|
|     1|448471|7567055082854| 26965|1000194|2020-02-01 00:00:00|2050-01-01 00:00:00|  11.0|    ZH|     2|
|     1|448471|7567055082854|   214|1000194|2012-10-26 00:00:00|2020-01-31 00:00:00|   1.0|    ZH|     2|
|     1|448471|7567055082854|   214|1000194|20

5142703

In [8]:
spark.table("default.stg_familie").where($"fam_nr" === 1010834).show()

+------+-------+-------------------+-------------------+----------+----------+----------+----------+------------+------------+----------+-------------+-----------+-----------+-------------------+-----------+
|fam_id| fam_nr|        fam_beg_dat|        fam_end_dat|fam_fehler|fam_ges_id|fam_spr_id|fam_prt_id|fam_fkt_intv|fam_saf_intv|fam_gde_id|fam_ra_gde_id|fam_ext_ref|fam_mut_uid|        fam_mut_dat|fam_aktuell|
+------+-------+-------------------+-------------------+----------+----------+----------+----------+------------+------------+----------+-------------+-----------+-----------+-------------------+-----------+
|  9531|1010834|1900-01-01 00:00:00|2012-08-23 00:00:00|         0|      1000|         D|         1|           1|           0|       191|          191|       null|          2|2015-09-30 09:31:00|          0|
|  9532|1010834|2012-08-24 00:00:00|2015-01-07 00:00:00|         0|      1000|         D|         1|           1|           0|       243|          243|       null|     

In [9]:
import java.time.LocalDate
import java.time.format.DateTimeFormatter

val dayFormat = DateTimeFormatter.ofPattern("yyyy-MM-dd")
val rangeStart = LocalDate.parse("2020-01-01", dayFormat)
val tmp = ""
//if (tmp == "") {
  val rangeEnd = rangeStart.plusYears(1).minusDays(1)
//} else {
//  val rangeEnd = LocalDate.parse(tmp, dayFormat)
//}


val df_leiPPos = spark.table("default.stg_LeiPPos")//.as("leippos")
val df_leiPKopf = spark.table("default.stg_LeiPKopf")//.as("leipkopf")
val df_vers = spark.table("default.stg_Versicherter")
val df_familie = spark.table("default.stg_Familie")
val df_gemeinde = spark.table("default.stg_Gemeinde")
val df_ecKopf = spark.table("default.stg_ECKopf")
val df_ecDetail = spark.table("default.stg_ECDetail")
val df_ecProdukt = spark.table("default.stg_ECProdukt")

val join_type = "left" // "left_semi" would only take rows which match



In [10]:
var counts = List(df_leiPPos.count())

var df = df_leiPPos
      // Jahr
      .filter($"LPP_BEH_BEG_DAT" > rangeStart && $"LPP_BEH_BEG_DAT" < rangeEnd)

counts = counts :+ df.count() 
      // AHV-Nr: LeiPPos.LPP_LPK_ID → LeiPKopf.LPK_VER_NR → Versicherter.VER_NNSS_NR
df = df.join(df_leiPKopf, $"lpp_lpk_id" === $"lpk_id", join_type)
      .join(df_vers, $"lpk_ver_id" === $"ver_id", join_type)
      .withColumnRenamed("ver_nnss_nr", "AHV_Nr")

counts = counts :+ df.count() 

      // Wohnkanton: LeiPPos.LPP_LPK_ID → LeiPKopf.LPK_VER_NR → Versicherter.VER_FAM_NR → Familie.FAM_RA_GDE_ID (Test auf Zeitraum) → Gemeinde.GDE_KT
df = df.join(df_familie, $"ver_fam_nr" === $"fam_nr", join_type)
      .join(df_gemeinde, $"fam_ra_gde_id" === $"gde_id", join_type)
      .withColumnRenamed("gde_kt", "Wohnkanton")
val df2 = df

//counts = counts :+ df.count() 

      // Geburtsjahr: LeiPPos.LPP_LPK_ID → LeiPKopf.LPK_VER_NR → Versicherter.VER_GEB_DAT
df = df.withColumnRenamed("ver_geb_dat", "Geburtsjahr")

counts = counts :+ df.count() 

      // Geschlecht: LeiPPos.LPP_LPK_ID → LeiPKopf.LPK_VER_NR → Versicherter.VER_SEX_CD
df = df.withColumnRenamed("ver_sex_cd", "Geschlecht")

      // Spital Aufenthalt:
      // Exakte Ein/Austritte Spital/Reha o.ä. ECP: LeiPPos.LPP_LPK_ID → LeiPKopf.LPK_RECH_NR → ECKopf.ECK_RECH_NR, ~.ECK_ID → ECKopfXtraCaseDetail.EXC_ECK_ID, ~.EXC_BEG_DAT, ~.EXC_END_DAT (korrekte Summierung)
      // Exakte Ein/Austritte Spital/Reha o.ä. Sumex/Secon: LeiPPos.LPP_LPK_ID → LpkCaseDetail.LCD_BEG_DAT,~.LCD_END_DAT 
      // TODO

counts = counts :+ df.count() 

      // GTIN: LeiPPos.LPP_LPK_ID → LeiPKopf.LPK_RECH_NR → ECKopf.ECK_RECH_NR, ~.ECK_ID → ECDetail.ECD_ECP_ID→ ECProdukt.ECP_PRODUKT_NR (PharmaCode)
df = df.join(df_ecKopf, $"LPK_RECH_NR" === $"ECK_RECH_NR", join_type)
      .join(df_ecDetail, $"ECK_ID" === $"ECD_ECK_ID", join_type)
      .join(df_ecProdukt, $"ECD_ECP_ID" === $"ECP_ID", join_type)
      .withColumnRenamed("ecp_produkt_nr", "GTIN")
      

      //Output: AHV-Nummer ,Geburtsjahr, Geschlecht, Wohnkanton, Aufenthalt, GTIN-Code, Anzahl Packungen, Versicherungsmonate, Bruttokosten, Kostenbeteiligung
      .select("AHV_Nr", "Geburtsjahr", "Geschlecht", "Wohnkanton", "GTIN")

//counts = counts :+ df.count() 

print(counts)

List(1127420, 198712, 198712, 346648, 346648)

In [11]:
df2.show()

+----------+------+-------+----------+----------+----------+----------+----------+----------+----------+----------+------------+----------+----------+-------------------+------------+----------+-----------+-----------+-----------+-----------+---------------+-----------+----------------+---------------+----------------+-------------+-----------+-----------+----------+-----------+--------+---------+---------+------------+---------------+--------------+-------------+-----------+-------------+-----------+-------------+------------+--------------+-----------------+---------------+------------------+----------------+---------------+-----------------+--------------------+------------------+------+----------+----------+----------+----------+----------+----------+----------+--------------+---------------+--------------------+--------------+-----------+-----------+-------------------+-------------------+-------------+-------------+------------+-------------------+-------------------+-----------+

In [12]:
df2.select("fam_beg_dat").dtypes.foreach(f=>println(f._1+","+f._2))

fam_beg_dat,TimestampType


In [13]:
df_leiPPos.filter($"lpp_lpk_id" === 400974)

Error: identifier expected but '(' found. (4)Error: `=', `>:', or `<:' expected (49)

In [14]:
df_leiPKopf.filter($"lpk_id" === 400974).show()

+------+----------+----------+----------+----------+----------+----------+----------+--------------+---------------+--------------------+--------------+-----------+-----------+-------------------+-------------------+-------------+-------------+------------+-------------------+-------------------+-----------+-----------+----------+-----------+-----------+-------------------+-----------+-------------------+-----------+---------------+---------------+----------+--------------------+----------------+--------------------+------------------+----------+----------+----------+-----------+--------------+----------+--------------------+----------------+-------------------+------------------+-------------+-----------------+-----------------+--------+-------+----------+-------------+--------------------+--------------------+----------+----------+---------------+--------------+------------------------+-----------+--------------+
|lpk_id|lpk_ver_nr|lpk_fam_nr|lpk_deb_nr|lpk_vtg_nr|lpk_gde_id|lpk_adr_

In [15]:
df_vers.filter($"ver_nr" === 2008659).show()

+-------+------+----------+----------+--------+---------+---------+----------+-------------------+-----------+---------+----------+----------+----------+---------------+---------------+-------------------+-------------------+-----------+-------------------+-----------+-------------------+----------+-----------+-------------+
| ver_nr|ver_id|ver_vea_id|ver_fam_nr|ver_name|ver_vname|ver_such1|ver_sex_cd|        ver_geb_dat|    ver_ahv|ver_beruf|ver_ziv_cd|ver_nat_id|ver_aea_id|ver_aea_beg_dat|ver_aea_end_dat|        ver_beg_dat|        ver_end_dat|ver_erf_uid|        ver_erf_dat|ver_mut_uid|        ver_mut_dat|ver_fehler|ver_aktuell|  ver_nnss_nr|
+-------+------+----------+----------+--------+---------+---------+----------+-------------------+-----------+---------+----------+----------+----------+---------------+---------------+-------------------+-------------------+-----------+-------------------+-----------+-------------------+----------+-----------+-------------+
|2008659| 10896|   

In [16]:
df_familie.filter($"fam_nr" === 1008659).show()

+------+-------+-------------------+-------------------+----------+----------+----------+----------+------------+------------+----------+-------------+-----------+-----------+-------------------+-----------+
|fam_id| fam_nr|        fam_beg_dat|        fam_end_dat|fam_fehler|fam_ges_id|fam_spr_id|fam_prt_id|fam_fkt_intv|fam_saf_intv|fam_gde_id|fam_ra_gde_id|fam_ext_ref|fam_mut_uid|        fam_mut_dat|fam_aktuell|
+------+-------+-------------------+-------------------+----------+----------+----------+----------+------------+------------+----------+-------------+-----------+-----------+-------------------+-----------+
|  7814|1008659|1900-01-01 00:00:00|2015-09-01 00:00:00|         0|      1000|         D|         1|           1|           0|       133|          133|       null|          2|2015-09-30 09:31:00|          0|
|  7815|1008659|2015-09-02 00:00:00|2021-10-31 00:00:00|         0|      1000|         D|         1|           1|           0|       138|          138|       null|     

In [17]:
spark.table("default.btl_ra_data").printSchema

root
 |-- BAG-Nummer: void (nullable = true)
 |-- Jahr: void (nullable = true)
 |-- AHV_Nr: string (nullable = true)
 |-- Kanton: string (nullable = true)
 |-- Geburtsjahr: timestamp (nullable = true)
 |-- Geschlecht: string (nullable = true)
 |-- Aufenthalt: void (nullable = true)
 |-- GTIN: string (nullable = true)
 |-- PharmaCode: void (nullable = true)
 |-- Monate: double (nullable = true)
 |-- Kosten: void (nullable = true)
 |-- Kostenbeteiligung: void (nullable = true)



In [18]:
spark.table("default.btl_ra_data").filter(not($"Monate" === 12.0)).show()

+----------+----+-------------+------+-------------------+----------+----------+-------------+----------+------+------+-----------------+
|BAG-Nummer|Jahr|   AHV-Nummer|Kanton|        Geburtsjahr|Geschlecht|Aufenthalt|         GTIN|PharmaCode|Monate|Kosten|Kostenbeteiligung|
+----------+----+-------------+------+-------------------+----------+----------+-------------+----------+------+------+-----------------+
|       TBD| TBD|7566670203088|    ZH|2000-04-24 00:00:00|         F|       TBD|      00.1580|       TBD|   5.0|   TBD|              TBD|
|       TBD| TBD|7566670203088|    ZH|2000-04-24 00:00:00|         F|       TBD|      00.0141|       TBD|   5.0|   TBD|              TBD|
|       TBD| TBD|7566670203088|    ZH|2000-04-24 00:00:00|         F|       TBD|      00.0030|       TBD|   5.0|   TBD|              TBD|
|       TBD| TBD|7566670203088|    ZH|2000-04-24 00:00:00|         F|       TBD|      00.0010|       TBD|   5.0|   TBD|              TBD|
|       TBD| TBD|7566670203088|   