# Create DMA dimension from sources
## Take Bronze data and load it into Silver

In [0]:
%python
#Set the operation needed when processing data. 
    #Daily will load only changes
    #Reload will flush the data and reload
    # Rebuild will drop and recreate the tables
    
dbutils.widgets.removeAll()
#Operations
dbutils.widgets.dropdown('Operations', 'Daily',['Daily','Reload','Rebuild'])

operations = dbutils.widgets.get('Operations')


## Rebuild the table if needed

In [0]:
%python
if operations =='Rebuild':
    rebsql = f'''
    CREATE OR REPLACE TABLE neighbor.silver.dma (
        DMAID INTEGER,
        DMAName STRING,
        Region STRING,
        DMARank  INTEGER,
        TVHouseHoldPop DECIMAL(18,2),
        Population DECIMAL(18,2),
        HHPopulationPct DECIMAL(18,2),
        MedianAge STRING,
        UnemploymentRate STRING,
        UPDATEDATEUTC TIMESTAMP
      )
    '''
    spark.sql(rebsql)
else:
    print("This operation is skipped")

## Reload data if needed

In [0]:
%python
if operations == 'Reload':
    asppsql = f'''
TRUNCATE TABLE neighbor.silver.dma
'''
    spark.sql(asppsql)
else: 
    print("This cell is skipped.")


## Load data into silver schema

In [0]:
merSQL = '''
MERGE INTO neighbor.silver.dma AS tgt
USING (
    SELECT DISTINCT
        dd.DMAID AS DMAID,
        res.dma AS DMAName,
        Region AS Region,
        dd.rank AS DMARank,
        CASE
            WHEN dd.TVHHS RLIKE '^[0-9.]+M' THEN TRY_CAST(REPLACE(dd.TVHHS, 'M', '') AS DECIMAL(18,2)) * 1000000
            ELSE TRY_CAST(dd.TVHHS AS DECIMAL(18,2))
        END AS TVHouseHoldPop,
        CASE
            WHEN dd.PctUSHHS RLIKE '^[0-9.]+%' THEN TRY_CAST(REPLACE(dd.PctUSHHS, '%', '') AS DECIMAL(18,2)) / 100
            ELSE TRY_CAST(dd.PctUSHHS AS DECIMAL(18,2))
        END AS HHPopulationPct,
        CASE
            WHEN dd.Pop RLIKE '^[0-9.]+M' THEN TRY_CAST(REPLACE(dd.Pop, 'M', '') AS DECIMAL(18,2)) * 1000000
            ELSE TRY_CAST(dd.Pop AS DECIMAL(18,2))
        END AS Population,
        MedAge AS MedianAge,
        CASE
            WHEN TRY_CAST(UnemployRate AS DECIMAL(18,2)) > 1 THEN TRY_CAST(UnemployRate AS DECIMAL(18,2)) / 100
            ELSE TRY_CAST(UnemployRate AS DECIMAL(18,2))
        END AS UnemploymentRate
    FROM neighbor.bronze.reservations res
    JOIN neighbor.bronze.dma_detail dd ON res.dma = dd.dmaName
) AS stg
ON tgt.DMAID = stg.DMAID
WHEN MATCHED THEN UPDATE SET
    tgt.DMAName = stg.DMAName,
    tgt.DMARank = stg.DMARank,
    tgt.Region = stg.Region,
    tgt.TVHouseHoldPop = stg.TVHouseHoldPop,
    tgt.HHPopulationPct = stg.HHPopulationPct,
    tgt.Population = stg.Population,
    tgt.MedianAge = stg.MedianAge,
    tgt.UnemploymentRate = stg.UnemploymentRate
WHEN NOT MATCHED THEN INSERT (
    DMAID, DMAName, Region, DMARank, TVHouseHoldPop, HHPopulationPct, Population, MedianAge, UnemploymentRate
) 
VALUES (
    stg.DMAID,
    stg.DMAName,
    stg.Region,
    stg.DMARank,
    stg.TVHouseHoldPop,
    stg.HHPopulationPct,
    stg.Population,
    stg.MedianAge,
    stg.UnemploymentRate
)
'''
spark.sql(merSQL)