In [1]:
bronze_location = "abfss://bronze@prsynapselab.dfs.core.windows.net/"
write_mode="overwrite"


In [2]:
claimSchema = spark.read.json(bronze_location+"historic_data/claim/year=2016/part-00000-6650129e-31bc-4bc3-9443-0c6529438b3d.c000.json").schema
claim_df = spark.read.schema(claimSchema).json(bronze_location+"historic_data/claim/*/*.json")

In [3]:
claim_df.printSchema()

In [4]:
display(claim_df)

In [5]:
claim_df_selected = claim_df.select("id","resourceType","status","billablePeriod","created","patient","prescription","provider","total","use","insurance","diagnosis","procedure") \
                        .toDF(*("claim_id","resourceType","status","billablePeriod","created","patient","prescription","provider","total","isuse","insurance","diagnosis","procedure"))

In [6]:
claim_df_selected.createOrReplaceTempView("claim_df_selected")

In [7]:
%%spark
import org.apache.spark.sql.types.{StructType,ArrayType}
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.Column
import org.apache.spark.sql.functions.col
import org.apache.spark.sql.functions.explode_outer

def flattenDataFrame(df: DataFrame): DataFrame = {
  val fields = df.schema.fields
  val fieldNames = fields.map(x => x.name)

  for (i <- fields.indices) {
    val field = fields(i)
    val fieldType = field.dataType
    val fieldName = field.name
    fieldType match {
      case _: ArrayType =>
        val fieldNamesExcludingArray = fieldNames.filter(_ != fieldName)
        val fieldNamesAndExplode = fieldNamesExcludingArray ++ Array(
          s"explode_outer($fieldName) as $fieldName"
        )
        val explodedDf = df.selectExpr(fieldNamesAndExplode: _*)
        return flattenDataFrame(explodedDf)
      case structType: StructType =>
        val childFieldNames =
          structType.fieldNames.map(childname => fieldName + "." + childname)
        val newFieldNames = fieldNames.filter(_ != fieldName) ++ childFieldNames
        import org.apache.spark.sql.functions.col

        val renamedCols =
          newFieldNames.map { x =>
            col(x.toString).as(x.toString.replace(".", "_"))
          }

        val explodedDf = df.select(renamedCols: _*)
        return flattenDataFrame(explodedDf)
      case _ =>
    }
  }

  df
}

## Creating Claim Main Table

In [8]:
claim_df_selected.printSchema()

In [9]:
%%spark
val claim_main_df = spark.sql("select * from claim_df_selected").drop("insurance","diagnosis","procedure");


In [10]:
%%spark
val claim_main_df_flattened = flattenDataFrame(claim_main_df)

In [11]:
%%spark
display(claim_main_df_flattened)

In [12]:
%%spark
val silver_location = "abfss://silver@prsynapselab.dfs.core.windows.net/"

claim_main_df_flattened.coalesce(200).write.format("delta").option("path", silver_location+"claim_main").saveAsTable("fhir.claim_main_hash")


## Creating Claim Insurance Table

In [13]:
%%spark
val claim_insurance_df = spark.sql("select claim_id, insurance from claim_df_selected")
val claim_insurance_df_flattened = flattenDataFrame(claim_insurance_df)
display(claim_insurance_df_flattened)

In [14]:
%%spark
val silver_location = "abfss://silver@prsynapselab.dfs.core.windows.net/"

claim_insurance_df_flattened.coalesce(200).write.format("delta").option("path", silver_location+"claim_insurance").saveAsTable("fhir.claim_insurance_hash")

## Creating Claim Diagnosis Table

In [15]:
%%spark
val claim_diagnosis_df = spark.sql("select claim_id, diagnosis from claim_df_selected")
val claim_diagnosis_df_flattened = flattenDataFrame(claim_diagnosis_df)
display(claim_diagnosis_df_flattened)

In [16]:
%%spark
val silver_location = "abfss://silver@prsynapselab.dfs.core.windows.net/"

claim_diagnosis_df_flattened.coalesce(200).write.format("delta").option("path", silver_location+"claim_diagnosis").saveAsTable("fhir.claim_diagnosis_hash")

## Creating Claim procedure Table

In [17]:
%%spark
val claim_procedure_df = spark.sql("select claim_id, procedure from claim_df_selected")
val claim_procedure_df_flattened = flattenDataFrame(claim_procedure_df)
display(claim_procedure_df_flattened)

In [18]:
%%spark
val silver_location = "abfss://silver@prsynapselab.dfs.core.windows.net/"

claim_procedure_df_flattened.coalesce(200).write.format("delta").option("path", silver_location+"claim_procedure").saveAsTable("fhir.claim_procedure_hash")