<img src="https://github.com/richardcerny/bricksflow/raw/rc-bricksflow2.1/docs/img/databricks_icon.png?raw=true" width=100/>
# Bricksflow example 4.

## Productionalizing notebook in Bricksflow
It always takse some time to productionalize notebook.
What is usually necesary to do:
- cleaning a code from testing part
- comments some part of code
- all code is in functions
- remove unnecesary comments
- resolve ToDos
- replace hardcoded variable with config parameters
- test that it still works the same after clean up
- use linting tools (pylint, black, flake8)
- ...

In [0]:
%run ../../../app/install_master_package

In [0]:
from pyspark.sql import functions as F
from logging import Logger
from datalakebundle.table.TableManager import TableManager
from pyspark.sql import SparkSession
from pyspark.sql.dataframe import DataFrame
from databricksbundle.notebook.decorators import dataFrameLoader, transformation, dataFrameSaver
from datalakebundle.table.TableNames import TableNames

In [0]:
@dataFrameLoader(display=True)
def read_bronze_covid_tbl_template_2_confirmed_case(spark: SparkSession, tableNames: TableNames):
    return (
        spark
            .read
            .table(tableNames.getByAlias('bronze_covid.tbl_template_2_confirmed_cases'))
            .select('countyFIPS','County_Name', 'State', 'stateFIPS')
            .dropDuplicates()
    )

countyFIPS,County_Name,State,stateFIPS
1003,Baldwin County,AL,1
1011,Bullock County,AL,1
1001,Autauga County,AL,1
1009,Blount County,AL,1
1015,Calhoun County,AL,1
1005,Barbour County,AL,1
1017,Chambers County,AL,1
1007,Bibb County,AL,1
1013,Butler County,AL,1
0,Statewide Unallocated,AL,1


In [0]:
@dataFrameLoader(display=True)
def read_table_silver_covid_tbl_template_3_mask_usage(spark: SparkSession, tableNames: TableNames):
    return (
        spark
            .read
            .table(tableNames.getByAlias('silver_covid.tbl_template_3_mask_usage'))
            .limit(10) # only for test
      
            .withColumn('EXECUTE_DATE', F.to_date(F.col('EXECUTE_DATETIME')))
    )

COUNTYFP,NEVER,RARELY,SOMETIMES,FREQUENTLY,ALWAYS,EXECUTE_DATETIME,CONFIG_YAML_PARAMETER,EXECUTE_DATE
1001,0.053,0.074,0.134,0.295,0.444,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11
1003,0.083,0.059,0.098,0.323,0.436,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11
1005,0.067,0.121,0.12,0.201,0.491,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11
1007,0.02,0.034,0.096,0.278,0.572,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11
1009,0.053,0.114,0.18,0.194,0.459,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11
1011,0.031,0.04,0.144,0.286,0.5,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11
1013,0.102,0.053,0.257,0.137,0.451,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11
1015,0.152,0.108,0.13,0.167,0.442,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11
1017,0.117,0.037,0.15,0.136,0.56,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11
1019,0.135,0.027,0.161,0.158,0.52,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11


### How to join more dataframes using @transformation?

In [0]:
@transformation(read_bronze_covid_tbl_template_2_confirmed_case, read_table_silver_covid_tbl_template_3_mask_usage, display=True)
def join_covid_datasets(df1: DataFrame, df2: DataFrame):
    return (
        df1.join(df2, df1.countyFIPS == df2.COUNTYFP, how='right')
    )

countyFIPS,County_Name,State,stateFIPS,COUNTYFP,NEVER,RARELY,SOMETIMES,FREQUENTLY,ALWAYS,EXECUTE_DATETIME,CONFIG_YAML_PARAMETER,EXECUTE_DATE
1001.0,Autauga County,AL,1.0,1001,0.053,0.074,0.134,0.295,0.444,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11
1003.0,Baldwin County,AL,1.0,1003,0.083,0.059,0.098,0.323,0.436,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11
1005.0,Barbour County,AL,1.0,1005,0.067,0.121,0.12,0.201,0.491,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11
1007.0,Bibb County,AL,1.0,1007,0.02,0.034,0.096,0.278,0.572,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11
1009.0,Blount County,AL,1.0,1009,0.053,0.114,0.18,0.194,0.459,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11
1011.0,Bullock County,AL,1.0,1011,0.031,0.04,0.144,0.286,0.5,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11
1013.0,Butler County,AL,1.0,1013,0.102,0.053,0.257,0.137,0.451,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11
1015.0,Calhoun County,AL,1.0,1015,0.152,0.108,0.13,0.167,0.442,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11
1017.0,Chambers County,AL,1.0,1017,0.117,0.037,0.15,0.136,0.56,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11
,,,,1019,0.135,0.027,0.161,0.158,0.52,2021-01-11T10:26:27.904+0000,This is a sample string config value,2021-01-11


In [0]:
@transformation(join_covid_datasets, display=False)
def agg_avg_mask_usage_per_county(df: DataFrame):
    return (
        df
          .groupBy('EXECUTE_DATE','County_Name', 'CONFIG_YAML_PARAMETER')
          .agg(F.avg('NEVER').alias('AVG_NEVER'),
              F.avg('RARELY').alias('AVG_RARELY'),
              F.avg('SOMETIMES').alias('AVG_SOMETIMES'),
              F.avg('FREQUENTLY').alias('AVG_FREQUENTLY'),
              F.avg('ALWAYS').alias('AVG_ALWAYS')
          )
    )

In [0]:
@transformation(agg_avg_mask_usage_per_county, display=True)
def standardize_dataset(df: DataFrame):
    return (
        df.withColumnRenamed('County_Name', 'COUNTY_NAME')
    )

EXECUTE_DATE,COUNTY_NAME,CONFIG_YAML_PARAMETER,AVG_NEVER,AVG_RARELY,AVG_SOMETIMES,AVG_FREQUENTLY,AVG_ALWAYS
2021-01-11,Autauga County,This is a sample string config value,0.053,0.074,0.134,0.295,0.444
2021-01-11,Baldwin County,This is a sample string config value,0.083,0.059,0.098,0.323,0.436
2021-01-11,Barbour County,This is a sample string config value,0.067,0.121,0.12,0.201,0.491
2021-01-11,Bibb County,This is a sample string config value,0.02,0.034,0.096,0.278,0.572
2021-01-11,Blount County,This is a sample string config value,0.053,0.114,0.18,0.194,0.459
2021-01-11,Bullock County,This is a sample string config value,0.031,0.04,0.144,0.286,0.5
2021-01-11,Butler County,This is a sample string config value,0.102,0.053,0.257,0.137,0.451
2021-01-11,Calhoun County,This is a sample string config value,0.152,0.108,0.13,0.167,0.442
2021-01-11,Chambers County,This is a sample string config value,0.117,0.037,0.15,0.136,0.56
2021-01-11,,This is a sample string config value,0.135,0.027,0.161,0.158,0.52


In [0]:
@dataFrameSaver(standardize_dataset)
def save_table_gold_tbl_template_4_mask_usage_per_count(df: DataFrame, logger: Logger, tableNames: TableNames, tableManager: TableManager):
    
    outputTableName = tableNames.getByAlias('gold_reporting.tbl_template_4_mask_usage_per_county')
    tableManager.recreate('gold_reporting.tbl_template_4_mask_usage_per_county')
    logger.info(f"Saving data to table: {outputTableName}")
    (
        df
            .select(
                 'EXECUTE_DATE',
                 'COUNTY_NAME',
                 'CONFIG_YAML_PARAMETER',
                 'AVG_NEVER',
                 'AVG_RARELY',
                 'AVG_SOMETIMES',
                 'AVG_FREQUENTLY',
                 'AVG_ALWAYS',
            )
            .write
            .option('partitionOverwriteMode', 'dynamic')
            .insertInto(outputTableName)
    )