# 07 Load Daily City Metrics

load data into `DAILY_CITY_METRICS` with incremental processing.

In [None]:
-- This won't be needed when we can pass variables to Notebooks!
SELECT current_database() AS DATABASE_NAME, current_schema() AS SCHEMA_NAME

In [None]:
# Import python packages
import logging
from snowflake.core import Root

logger = logging.getLogger("demo_logger")

current_context_df = cells.sql_get_context.to_pandas()       # Get schema from "sql_get_context" above #  won't be needed when we can pass variables to Notebooks!
database_name      = current_context_df.iloc[0,0]
schema_name        = current_context_df.iloc[0,1]

   
from snowflake.snowpark.context import get_active_session    # We can also use Snowpark for our analyses!
session = get_active_session()
#session.use_schema(f"{database_name}.{schema_name}")

logger.info("07_load_daily_city_metrics start")

## Function to check if table exists

[Snowflake Python Management API](https://docs.snowflake.com/en/developer-guide/snowflake-python-api/snowflake-python-overview).

In [None]:
def table_exists(session, 
                 database_name='', 
                 schema_name  ='', 
                 table_name   =''):
    
    root   = Root(session)
    tables = root.databases[database_name].schemas[schema_name].tables.iter(like=table_name)
    
    for table_obj in tables:
        if table_obj.name == table_name:
            return True

    return False

# Not used, SQL alternative to Python
def table_exists2(session, 
                  database_name='', 
                  schema_name  ='', 
                  table_name   =''):
    
    exists = session.sql("SELECT EXISTS (SELECT * FROM {}.INFORMATION_SCHEMA.TABLES WHERE TABLE_SCHEMA = '{}' AND TABLE_NAME = '{}') AS TABLE_EXISTS".format(database_name, schema_name, table_name)).collect()[0]['TABLE_EXISTS']
    return exists

## Pipeline to update daily_city_metrics

In [None]:
import snowflake.snowpark.functions as F

table_name = "DAILY_CITY_METRICS"

# Define
history_day  = session.table("FROSTBYTE_WEATHERSOURCE.ONPOINT_ID.HISTORY_DAY")
order_detail = session.table("ORDER_DETAIL")  # FROSTBYTE_RAW_STAGE 's3://sfquickstarts/data-engineering-with-snowpark-python/'
location     = session.table("LOCATION")      # FROSTBYTE_RAW_STAGE 's3://sfquickstarts/data-engineering-with-snowpark-python/'

# Join
order_detail = order_detail.join(location   ,                    order_detail['LOCATION_ID'     ] == location   ['LOCATION_ID'   ] )
order_detail = order_detail.join(history_day, (F.builtin("DATE")(order_detail['ORDER_TS'        ])== history_day['DATE_VALID_STD'] ) 
                                                                  & (location['ISO_COUNTRY_CODE'] == history_day['COUNTRY'       ] ) 
                                                                  & (location['CITY'            ] == history_day['CITY_NAME'     ] ) )

# Aggregate
final_agg = order_detail.group_by( F.col('DATE_VALID_STD'  ), 
                                   F.col('CITY_NAME'       ), 
                                   F.col('ISO_COUNTRY_CODE') 
                                   ) \
                        .agg     ( F.sum('PRICE'                   ).alias('DAILY_SALES_SUM'     ), 
                                   F.avg('AVG_TEMPERATURE_AIR_2M_F').alias("AVG_TEMPERATURE_F"   ), 
                                   F.avg("TOT_PRECIPITATION_IN"    ).alias("AVG_PRECIPITATION_IN"), 
                                   ) \
                        .select  ( F.col("DATE_VALID_STD"          ).alias("DATE"                      ), 
                                   F.col("CITY_NAME"               ).alias("CITY_NAME"                 ),  
                                   F.col("ISO_COUNTRY_CODE"        ).alias("COUNTRY_DESC"              ),          
           F.builtin("ZEROIFNULL")(F.col("DAILY_SALES_SUM"     )   ).alias("DAILY_SALES"               ),   # agg from above
                           F.round(F.col("AVG_TEMPERATURE_F"   ), 2).alias("AVG_TEMPERATURE_FAHRENHEIT"),   # agg from above
                           F.round(F.col("AVG_PRECIPITATION_IN"), 2).alias("AVG_PRECIPITATION_INCHES"  ),   # agg from above
                                   )


if not table_exists(session, 
                    database_name=database_name, 
                    schema_name  =schema_name, 
                    table_name   =table_name): # If table doesn't exist , create it
    
    final_agg.write.mode("overwrite").save_as_table(table_name)

    logger.info(f"Successfully created {table_name}")

else:                                                                    # Otherwise upsert
    cols_to_update = {c: final_agg[c] for c in final_agg.schema.names}   # loop over all columns in finall_agg to create a dictionary mapping. new columns are added automatically.
                   # {
                   # "DATE"         : final_agg["DATE"         ],
                   # "CITY_NAME"    : final_agg["CITY_NAME"    ],
                   # "COUNTRY_DESC" : final_agg["COUNTRY_DESC" ],
                   # "DAILY_SALES"  : final_agg["DAILY_SALES"  ],
                   # "AVG_TEMP"     : final_agg["AVG_TEMP"     ],
                   # "PRECIPITATION": final_agg["PRECIPITATION"]
                   # }


    dcm = session.table(table_name) # "DAILY_CITY_METRICS" - target table to merge into
    
    dcm.merge(final_agg,                                        # MERGE SOURCE: final_agg INTO DCM
              #-------------------------------------------------# MERGE/JOIN KEYS
              (dcm['DATE'        ] == final_agg['DATE'        ]) 
            & (dcm['CITY_NAME'   ] == final_agg['CITY_NAME'   ]) 
            & (dcm['COUNTRY_DESC'] == final_agg['COUNTRY_DESC']), \
              #------------------------------------------------ # MERGE ACTIONS (match = update, notfound = insert)
            [F.when_matched    ().update(cols_to_update), 
             F.when_not_matched().insert(cols_to_update)]
             )

    logger.info(f"Successful update: {table_name}")


## Debugging

In [None]:
SELECT * FROM DAILY_CITY_METRICS LIMIT 10;