# Bronze to silver
There needs to be a prior execution of setup once for the mount container and creation of the database with its table.
Upserts data from the health_status_updates.csv file from the bronze folder into the silver 'health_data' table. 

## Process the data


In [0]:
from pyspark.sql.types import IntegerType, StringType, DoubleType, StructField, StructType

mount_point = '/mnt/health-updates'
health_data_path = f"{mount_point}/bronze/health_status_updates.csv"

health_data_schema = StructType([
                    StructField("STATUS_UPDATE_ID", IntegerType(), False),
                    StructField("PATIENT_ID", IntegerType(), False),
                    StructField("DATE_PROVIDED", StringType(), False),
                    StructField("FEELING_TODAY", StringType(), True),
                    StructField("IMPACT", StringType(), True),
                    StructField("INJECTION_SITE_SYMPTOMS", StringType(), True),
                    StructField("HIGHEST_TEMP", DoubleType(), True),
                    StructField("FEVERISH_TODAY", StringType(), True),
                    StructField("GENERAL_SYMPTOMS", StringType(), True),
                    StructField("HEALTHCARE_VISIT", StringType(), True)
                    ]
                    )

health_data = spark.read.csv(path = health_data_path, header=True, schema=health_data_schema)

In [0]:
from pyspark.sql.functions import to_date, current_timestamp
health_data = health_data.select(
                                'STATUS_UPDATE_ID',
                                'PATIENT_ID',
                                to_date(health_data['DATE_PROVIDED'],'MM/dd/yyyy').alias('DATE_PROVIDED'),
                                'FEELING_TODAY',
                                'IMPACT',
                                'INJECTION_SITE_SYMPTOMS',
                                'HIGHEST_TEMP',
                                'FEVERISH_TODAY',
                                'GENERAL_SYMPTOMS',
                                'HEALTHCARE_VISIT',
                                current_timestamp().alias("UPDATED_TIMESTAMP")
                            )

## Merging into the table

In [0]:
# upsert into health_data table
from pyspark.sql.functions import current_timestamp
from delta.tables import DeltaTable

deltaTable = DeltaTable.forPath(spark, '/mnt/health-updates/silver/health_data')

deltaTable.alias('tgt') \
  .merge(
    health_data.alias('src'),
    'tgt.status_update_id = src.status_update_id'
  ) \
  .whenMatchedUpdate(set =
    {
      "status_update_id": "src.status_update_id",
      "patient_id": "src.patient_id",
      "date_provided": "src.date_provided",
      "feeling_today": "src.feeling_today",
      "impact": "src.impact",
      "injection_site_symptoms": "src.injection_site_symptoms",
      "highest_temp": "src.highest_temp",
      "feverish_today": "src.feverish_today",
      "general_symptoms": "src.general_symptoms",
      "healthcare_visit": "src.healthcare_visit",
      "updated_timestamp": current_timestamp()
    }
  ) \
  .whenNotMatchedInsert(values =
    {
      "status_update_id": "src.status_update_id",
      "patient_id": "src.patient_id",
      "date_provided": "src.date_provided",
      "feeling_today": "src.feeling_today",
      "impact": "src.impact",
      "injection_site_symptoms": "src.injection_site_symptoms",
      "highest_temp": "src.highest_temp",
      "feverish_today": "src.feverish_today",
      "general_symptoms": "src.general_symptoms",
      "healthcare_visit": "src.healthcare_visit",
      "updated_timestamp": current_timestamp()
    }
  ) \
  .execute()

In [0]:
dbutils.notebook.exit('Bronze to silver complete')