In [1]:
from typing import Union, Dict, Optional
import yaml

from snowflake.snowpark import Session
import snowflake.snowpark.functions as F
import snowflake.snowpark.types as T



In [2]:
def create_session(config: Union[str, Dict[str, str]],
                   connection: Optional[str] = None) -> Session:

    """Establishes a snowpark connection to snowflake.

    Uses connection parameters, passed via .json config file or directly in python dict.

    Args:
        config (str/Dict) : (Relative/absolute) path to .json config file or
                            dict of connection(s) params.
        connection (str) : Specific key of preferred connection parameters held in config.
                           Defaults to None, meaning a single set of connection parameters
                           should be passed.

    Returns
        snowflake.snowpark.Session

    """

    import json

    if isinstance(config, str):  # File path passed
        with open(config) as f:
            connection_parameters = json.load(f)
    else:  # Dict of connections passed
        connection_parameters = config
    if connection:  # A specific key passed specifying connection params in config
        session = Session.builder.configs(connection_parameters[connection]).create()
    else:
        session = Session.builder.configs(connection_parameters).create()
    return session

session = create_session(config = '/Users/jsummer/.snowpark/config.json', # Set to path to .json credentials similar to snowSQL
                         connection = 'SCS')

In [3]:
with open('../setup/setup.yaml', 'r') as yaml_file:
    account_specs = yaml.safe_load(yaml_file)

UDF_STAGE = account_specs['UDF_STAGE']
SOURCE_TABLE =  account_specs['SOURCE_TABLE']
TARGET_TABLE = "DESCRIBER_RESULTS"
SNOW_ROLE =  account_specs['SNOW_ROLE']
SNOW_DATABASE = account_specs['SNOW_DATABASE']
SNOW_SCHEMA = account_specs['SNOW_SCHEMA']
SNOW_WAREHOUSE = account_specs['SNOW_WAREHOUSE']

session.use_role(SNOW_ROLE)
session.use_database(SNOW_DATABASE)
session.use_schema(SNOW_SCHEMA)
session.use_warehouse(SNOW_WAREHOUSE)

In [4]:
################################################
##### This is the cell to change contents ######
################################################

##### Natural Language Description ######
# Create a natural language description of each record as a concatenation of column values.
# String literals must be wrapped in F.lit(). Example is below and should be changed based on columns.
description = (
                F.lit('Company '),
                'ORGANIZATION_NAME',
                F.lit(' is offering a '),
                'REWARD',
                F.lit(' '),
                'REWARD_TYPE',
                F.lit(' for a '),
                'PRODUCT_CATEGORY',
                F.lit(' category product. To receive the reward, '),
                'QUALIFICATION',
                F.lit('. The product description is '),
                'PRODUCT_DESCRIPTION',
                F.lit(' More info at '),
                'HYPERLINK',
                F.lit('.')
                )

##### Replace Empty/Null Values ######
# Columns to replace nulls with constant to make description more natural if value is omitted
fillna_columns = [
    'ORGANIZATION_NAME',
    'REWARD_TYPE',
    'PRODUCT_CATEGORY',
    'QUALIFICATION',
    'REWARD',
    'REWARD_TYPE',
    'PRODUCT_DESCRIPTION',
    'HYPERLINK'
            ]
fillna_value = 'unknown' # Value to replace nulls

##### Mapping Value ######
# Any columns that contain literal value(s) to be mapped should go in map_cols.
# Create a corresponding dictionary for each column. 
# The dictionary will be passed to UDFs to transform

 # Add column names here as strings. Leave as empty list otherwise.
map_cols = [
    'REWARD_TYPE'
            ]

# Create dictionary of maps {current value : intended value}
REWARD_TYPE = {
    "Cash": "cash back", 
    "Card": "gift card",
    "Code": "discount code"
}

# Create list of map dictionaries
mappings = [
    REWARD_TYPE
    ]


In [5]:
if map_cols:
    @F.udf(session = session,
        name="map_values",
        is_permanent=True, 
        stage_location=UDF_STAGE, 
        input_types = [T.StringType(), T.MapType()],
        return_type=T.StringType(),
        replace=True)
    def map_values(key, map):
        val = map.get(key, key)
        if val == '':
            val = None
        return val

    @F.udf(session = session,
        name="map_values",
        is_permanent=True, 
        stage_location=UDF_STAGE, 
        input_types = [T.ArrayType(), T.MapType()],
        return_type=T.StringType(),
        replace=True)
    def map_values(key, map):
        if len(key) > 2:
            sep = ', '
        else:
            sep = ' or '
        val = sep.join(map.get(i, i) for i in key)
        if val == '':
            val = None
        return val

    mapping_funcs = []
    for i, col_name in enumerate(map_cols):
        mapping_funcs.append(F.call_udf('map_values',F.col(col_name), mappings[i]))

In [6]:
df = session.table(SOURCE_TABLE)

if map_cols:
  mapping_funcs = []
  for i, col_name in enumerate(map_cols):
      mapping_funcs.append(F.call_udf('map_values',F.col(col_name), mappings[i]))
  df = df.with_columns(map_cols, mapping_funcs)

df.fillna(fillna_value, subset = fillna_columns)\
  .withColumn('DESCRIPTION', F.concat(*description))\
  .write.mode("overwrite").save_as_table(TARGET_TABLE)