# Select Packages

To get started, let's select a few packages that we will need. In the **Packages** drop-down picker in the top right of the UI, search for and add the following packages by clicking on them:

- snowflake-ml-python

Once you add the packages, click the **Start** button! Once it says **Active**, you're ready to run the rest of the Notebook.

## 1. Data Ingestion

The data for this notebook should have been ingested into a table called cdc_data, following the steps outlined in the Example ML Flow Workload found in our roachlong public repositories on GitHub.  This data represents simulated credit card transactions and is a fairly simple dataset for the purposes of creating an end-to-end process of capturing, publishing and anlayzing high volume transactional data in near real-time.  We'll use this notebook to explore some of the data.



### Import Libraries

In [None]:
# Snowpark for Python
from snowflake.snowpark.types import DoubleType
import snowflake.snowpark.functions as F
import numpy as np
import pandas as pd

### Setup and establish Secure Connection to Snowflake

Notebooks establish a Snowpark Session when the notebook is attached to the kernel. We're using a trial acocunt for this tutorial and leveraging the x-small COMPUTE_WH that comes with it.

In [None]:
-- Using Warehouse, Database, and Schema created during Setup
USE WAREHOUSE COMPUTE_WH;
USE DATABASE PAYMENTS;
USE SCHEMA PUBLIC;

In [None]:
# Get Snowflake Session object
session = get_active_session()
session.sql_simplifier_enabled = True

# Add a query tag to the session.
session.query_tag = {"origin":"sf_sit-is", 
                     "name":"e2e_ml_snowparkpython", 
                     "version":{"major":1, "minor":0,},
                     "attributes":{"is_quickstart":1, "source":"notebook"}}

# Current Environment Details
print('Connection Established with the following parameters:')
print('User      : {}'.format(session.get_current_user()))
print('Role      : {}'.format(session.get_current_role()))
print('Database  : {}'.format(session.get_current_database()))
print('Schema    : {}'.format(session.get_current_schema()))
print('Warehouse : {}'.format(session.get_current_warehouse()))

### Use the Snowpark DataFrame Reader to read in data from the internally ingested cdc_data table

In our Sample ML Flow Workload, we staged CDC generated JSON files from an external s3 bucket. Then transformed and auto-loaded the data into a table using a Snowflake pipe.

Next, for each source table,
* we'll parse the CDC data based on file type,
* match each record to a respective resolve timestamp,
* and reduce the records per resolve time to the latest updates

For more information on loading data, see documentation on [snowflake.snowpark.DataFrameReader](https://docs.snowflake.com/ko/developer-guide/snowpark/reference/python/api/snowflake.snowpark.DataFrameReader.html).




In [None]:
# Create a Snowpark DataFrame using a sql query to parse JSON fields
# And match records to a resolved time based on ingested file timestamp
addr_df = session.sql("""
    with checkpoint as (
        select data:resolved::varchar(32) as resolved,
               split_part(filename, '.', 1) as timestamp
        from cdc_data
        where filename like '%.RESOLVED'
    )
    select cdc.data:after:id::varchar(64) as addr_id,
           cdc.data:after:acct_num::varchar(19) as acct_num,
           cdc.data:after:street::varchar(64) as street,
           cdc.data:after:zip::varchar(9) as zip,
           cdc.data:after:lat::number(11, 8) as lat,
           cdc.data:after:lng::number(11, 8) as lng,
           cp.resolved as cp_resolved,
           cp.timestamp as cp_timestamp,
           split_part(cdc.filename, '-', 1) as file_timestamp,
           cdc.data:mvcc_timestamp::varchar(32) as mvcc_timestamp,
           cdc.data:updated::varchar(32) as updated,
           cdc.file_row_number as file_row_number
    from cdc_data cdc, checkpoint cp
    where cdc.filename like '%-address-%'
      and cp.timestamp = (select min(timestamp)
                          from checkpoint
                          where timestamp >= split_part(cdc.filename, '-', 1))
      and file_timestamp <= cp.timestamp
      and file_timestamp > (select max(timestamp)
                            from checkpoint
                            where timestamp < split_part(cdc.filename, '-', 1))
""")
addr_df.limit(5)

In [None]:
# We'll reduce the dataframe to only inlcude the latest update
# associated with a single record per each resolve timestamp

# We need a pandas dataframe in order use the .loc and groupby functions
addr_pd = addr_df.to_pandas()

# And get the latest record per ID and resolve time in a single file
addr_pd = addr_pd.loc[addr_pd
    .groupby(
        ['ADDR_ID', 'CP_RESOLVED', 'FILE_TIMESTAMP']
    )['FILE_ROW_NUMBER']
    .agg(pd.Series.idxmax)]

# Then reduce to the latest record across multiple files
addr_pd = addr_pd.loc[addr_pd
    .groupby(
        ['ADDR_ID', 'CP_RESOLVED']
    )['FILE_TIMESTAMP']
    .agg(pd.Series.idxmax)]

# Now our data should be unique by the entity ID and CP_RESOLVED
# And we'll set the index to later join with transaction data
addr_pd.set_index(['ACCT_NUM', 'CP_RESOLVED'], inplace=True)

addr_pd.head()

In [None]:
# Create a Snowpark DataFrame using a sql query to parse JSON fields
# And match records to a resolved time based on ingested file timestamp
city_df = session.sql("""
    with checkpoint as (
        select data:resolved::varchar(32) as resolved,
               split_part(filename, '.', 1) as timestamp
        from cdc_data
        where filename like '%.RESOLVED'
    )
    select cdc.data:after:id::varchar(64) as city_id,
           cdc.data:after:zip::varchar(9) as zip,
           cdc.data:after:city::varchar(32) as city,
           cdc.data:after:state::varchar(2) as state,
           cdc.data:after:city_pop::number(9, 0) as city_pop,
           cp.resolved as cp_resolved,
           cp.timestamp as cp_timestamp,
           split_part(cdc.filename, '-', 1) as file_timestamp,
           cdc.data:mvcc_timestamp::varchar(32) as mvcc_timestamp,
           cdc.data:updated::varchar(32) as updated,
           cdc.file_row_number as file_row_number
    from cdc_data cdc, checkpoint cp
    where cdc.filename like '%-city_loc-%'
      and cp.timestamp = (select min(timestamp)
                          from checkpoint
                          where timestamp >= split_part(cdc.filename, '-', 1))
      and file_timestamp <= cp.timestamp
      and file_timestamp > (select max(timestamp)
                            from checkpoint
                            where timestamp < split_part(cdc.filename, '-', 1))
""")
city_df.limit(5)

In [None]:
# We'll reduce the dataframe to only inlcude the latest update
# associated with a single record per each resolve timestamp

# We need a pandas dataframe in order use the .loc and groupby functions
city_pd = city_df.to_pandas()

# And get the latest record per ID and resolve time in a single file
city_pd = city_pd.loc[city_pd
    .groupby(
        ['CITY_ID', 'CP_RESOLVED', 'FILE_TIMESTAMP']
    )['FILE_ROW_NUMBER']
    .agg(pd.Series.idxmax)]

# Then reduce to the latest record across multiple files
city_pd = city_pd.loc[city_pd
    .groupby(
        ['CITY_ID', 'CP_RESOLVED']
    )['FILE_TIMESTAMP']
    .agg(pd.Series.idxmax)]

# Now our data should be unique by the entity ID and CP_RESOLVED
# And we'll set the index to later join with transaction data
city_pd.set_index(['ZIP', 'CP_RESOLVED'], inplace=True)

city_pd.head()

In [None]:
# Create a Snowpark DataFrame using a sql query to parse JSON fields
# And match records to a resolved time based on ingested file timestamp
cust_df = session.sql("""
    with checkpoint as (
        select data:resolved::varchar(32) as resolved,
               split_part(filename, '.', 1) as timestamp
        from cdc_data
        where filename like '%.RESOLVED'
    )
    select cdc.data:after:id::varchar(64) as cust_id,
           cdc.data:after:ssn::varchar(16) as ssn,
           cdc.data:after:cc_num::varchar(19) as cc_num,
           cdc.data:after:first::varchar(16) as first,
           cdc.data:after:last::varchar(16) as last,
           cdc.data:after:gender::varchar(1) as gender,
           cdc.data:after:job::varchar(64) as job,
           cdc.data:after:dob::date as dob,
           cdc.data:after:acct_num::varchar(19) as acct_num,
           cdc.data:after:profile::varchar(32) as profile,
           cp.resolved as cp_resolved,
           cp.timestamp as cp_timestamp,
           split_part(cdc.filename, '-', 1) as file_timestamp,
           cdc.data:mvcc_timestamp::varchar(32) as mvcc_timestamp,
           cdc.data:updated::varchar(32) as updated,
           cdc.file_row_number as file_row_number
    from cdc_data cdc, checkpoint cp
    where cdc.filename like '%-customer-%'
      and cp.timestamp = (select min(timestamp)
                          from checkpoint
                          where timestamp >= split_part(cdc.filename, '-', 1))
      and file_timestamp <= cp.timestamp
      and file_timestamp > (select max(timestamp)
                            from checkpoint
                            where timestamp < split_part(cdc.filename, '-', 1))
""")
cust_df.limit(5)

In [None]:
# We'll reduce the dataframe to only inlcude the latest update
# associated with a single record per each resolve timestamp

# We need a pandas dataframe in order use the .loc and groupby functions
cust_pd = cust_df.to_pandas()

# And get the latest record per ID and resolve time in a single file
cust_pd = cust_pd.loc[cust_pd
    .groupby(
        ['CUST_ID', 'CP_RESOLVED', 'FILE_TIMESTAMP']
    )['FILE_ROW_NUMBER']
    .agg(pd.Series.idxmax)]

# Then reduce to the latest record across multiple files
cust_pd = cust_pd.loc[cust_pd
    .groupby(
        ['CUST_ID', 'CP_RESOLVED']
    )['FILE_TIMESTAMP']
    .agg(pd.Series.idxmax)]

# Now our data should be unique by the entity ID and CP_RESOLVED
# And we'll set the index to later join with transaction data
cust_pd.set_index(['CC_NUM', 'CP_RESOLVED'], inplace=True)

cust_pd.head()

In [None]:
# Create a Snowpark DataFrame using a sql query to parse JSON fields
# And match records to a resolved time based on ingested file timestamp
merc_df = session.sql("""
    with checkpoint as (
        select data:resolved::varchar(32) as resolved,
               split_part(filename, '.', 1) as timestamp
        from cdc_data
        where filename like '%.RESOLVED'
    )
    select cdc.data:after:id::varchar(64) as merch_id,
           cdc.data:after:merchant::varchar(64) as merchant,
           cdc.data:after:merch_lat::number(11, 8) as merch_lat,
           cdc.data:after:merch_lng::number(11, 8) as merch_lng,
           cp.resolved as cp_resolved,
           cp.timestamp as cp_timestamp,
           split_part(cdc.filename, '-', 1) as file_timestamp,
           cdc.data:mvcc_timestamp::varchar(32) as mvcc_timestamp,
           cdc.data:updated::varchar(32) as updated,
           cdc.file_row_number as file_row_number
    from cdc_data cdc, checkpoint cp
    where cdc.filename like '%-merchant-%'
      and cp.timestamp = (select min(timestamp)
                          from checkpoint
                          where timestamp >= split_part(cdc.filename, '-', 1))
      and file_timestamp <= cp.timestamp
      and file_timestamp > (select max(timestamp)
                            from checkpoint
                            where timestamp < split_part(cdc.filename, '-', 1))
""")
merc_df.limit(5)

In [None]:
# We'll reduce the dataframe to only inlcude the latest update
# associated with a single record per each resolve timestamp

# We need a pandas dataframe in order use the .loc and groupby functions
merc_pd = merc_df.to_pandas()

# And get the latest record per ID and resolve time in a single file
merc_pd = merc_pd.loc[merc_pd
    .groupby(
        ['MERCH_ID', 'CP_RESOLVED', 'FILE_TIMESTAMP']
    )['FILE_ROW_NUMBER']
    .agg(pd.Series.idxmax)]

# Then reduce to the latest record across multiple files
merc_pd = merc_pd.loc[merc_pd
    .groupby(
        ['MERCH_ID', 'CP_RESOLVED']
    )['FILE_TIMESTAMP']
    .agg(pd.Series.idxmax)]

# Now our data should be unique by the entity ID and CP_RESOLVED
# And we'll set the index to later join with transaction data
merc_pd.set_index(['MERCH_ID', 'CP_RESOLVED'], inplace=True)

merc_pd.head()

In [None]:
# Create a Snowpark DataFrame using a sql query to parse JSON fields
# And match records to a resolved time based on ingested file timestamp
tran_df = session.sql("""
    with checkpoint as (
        select data:resolved::varchar(32) as resolved,
               split_part(filename, '.', 1) as timestamp
        from cdc_data
        where filename like '%.RESOLVED'
    )
    select cdc.data:after:id::varchar(64) as tran_id,
           cdc.data:after:cc_num::varchar(19) as cc_num,
           cdc.data:after:merch_id::varchar(64) as merch_id,
           cdc.data:after:trans_num::varchar(32) as trans_num,
           cdc.data:after:trans_date::date as trans_date,
           cdc.data:after:trans_time::time as trans_time,
           cdc.data:after:unix_time::number(10, 0) as unix_time,
           cdc.data:after:category::varchar(16) as category,
           cdc.data:after:amt::number(12, 2) as amt,
           cdc.data:after:is_fraud::boolean as is_fraud,
           cp.resolved as cp_resolved,
           cp.timestamp as cp_timestamp,
           split_part(cdc.filename, '-', 1) as file_timestamp,
           cdc.data:mvcc_timestamp::varchar(32) as mvcc_timestamp,
           cdc.data:updated::varchar(32) as updated,
           cdc.file_row_number as file_row_number
    from cdc_data cdc, checkpoint cp
    where cdc.filename like '%-transaction-%'
      and cp.timestamp = (select min(timestamp)
                          from checkpoint
                          where timestamp >= split_part(cdc.filename, '-', 1))
      and file_timestamp <= cp.timestamp
      and file_timestamp > (select max(timestamp)
                            from checkpoint
                            where timestamp < split_part(cdc.filename, '-', 1))
""")
tran_df.limit(5)

In [None]:
# Next we'll join the transaction data with our customer data fields
# based on the resolved timestamps of the transactions and customer data

# We need a pandas dataframe in order use the .loc and groupby functions
tran_cust_pd = tran_df.to_pandas()
tran_cust_pd = tran_cust_pd.join(
    cust_pd,
    on=['CC_NUM', 'CP_RESOLVED'],
    how='left',
    rsuffix='_CUST'
)

# Find customer keys without a matching customer record
# i.e. the customer version has an earlier resolved timestamp
cust_missing = tran_cust_pd.loc[
    tran_cust_pd['SSN'].isnull()
][['CC_NUM', 'CP_RESOLVED']].drop_duplicates(keep=False)

# Then reset the index and generate customer records based on the
# pervious version but with the required transaction resolve time
cust_pd.reset_index(inplace=True)
for index, row in cust_missing.iterrows():
    cust_data = cust_pd.query("""
        CC_NUM == '{}' and \
        CP_RESOLVED < '{}'
    """.format(row['CC_NUM'], row['CP_RESOLVED']))
    cust_data = cust_data.loc[
        cust_data.groupby(['CC_NUM'])['CP_RESOLVED']
        .agg(pd.Series.idxmax)
    ]
    for next, cust in cust_data.iterrows():
        cust_pd.loc[len(cust_pd.index)] = {
            'CC_NUM': cust['CC_NUM'],
            'CP_RESOLVED': row['CP_RESOLVED'],
            'CUST_ID': cust['CUST_ID'],
            'SSN': cust['SSN'],
            'FIRST': cust['FIRST'],
            'LAST': cust['LAST'],
            'GENDER': cust['GENDER'],
            'JOB': cust['JOB'],
            'DOB': cust['DOB'],
            'ACCT_NUM': cust['ACCT_NUM'],
            'PROFILE': cust['PROFILE'],
            'CP_TIMESTAMP': cust['CP_TIMESTAMP'],
            'FILE_TIMESTAMP': cust['FILE_TIMESTAMP'],
            'MVCC_TIMESTAMP': cust['MVCC_TIMESTAMP'],
            'UPDATED': cust['UPDATED'],
            'FILE_ROW_NUMBER': cust['FILE_ROW_NUMBER'],
        }

# Now perform a second pass to join the transaction and customer data
cust_pd.set_index(['CC_NUM', 'CP_RESOLVED'], inplace=True)
tran_cust_pd = tran_df.to_pandas()
tran_cust_pd = tran_cust_pd.join(
    cust_pd,
    on=['CC_NUM', 'CP_RESOLVED'],
    how='left',
    rsuffix='_CUST'
)

# And confirm that there's no more missing data
cust_missing = tran_cust_pd.loc[
    tran_cust_pd['SSN'].isnull()
][['CC_NUM', 'CP_RESOLVED']].drop_duplicates(keep=False)
cust_missing

In [None]:
# Next we'll join the transaction data with our address data fields
# based on the resolved timestamps of the transactions and address data

# We'll create a new pandas dataframe to get started
tran_addr_pd = tran_cust_pd.join(
    addr_pd,
    on=['ACCT_NUM', 'CP_RESOLVED'],
    how='left',
    rsuffix='_ADDR'
)

# Find address keys without a matching address record
# i.e. the address version has an earlier resolved timestamp
addr_missing = tran_addr_pd.loc[
    tran_addr_pd['STREET'].isnull()
][['ACCT_NUM', 'CP_RESOLVED']].drop_duplicates(keep=False)

# Then reset the index and generate address records based on the
# pervious version but with the required transaction resolve time
addr_pd.reset_index(inplace=True)
for index, row in addr_missing.iterrows():
    addr_data = addr_pd.query("""
        ACCT_NUM == '{}' and \
        CP_RESOLVED < '{}'
    """.format(row['ACCT_NUM'], row['CP_RESOLVED']))
    addr_data = addr_data.loc[
        addr_data.groupby(['ACCT_NUM'])['CP_RESOLVED']
        .agg(pd.Series.idxmax)
    ]
    for next, addr in addr_data.iterrows():
        addr_pd.loc[len(addr_pd.index)] = {
            'ACCT_NUM': addr['ACCT_NUM'],
            'CP_RESOLVED': row['CP_RESOLVED'],
            'ADDR_ID': addr['ADDR_ID'],
            'STREET': addr['STREET'],
            'ZIP': addr['ZIP'],
            'LAT': addr['LAT'],
            'LNG': addr['LNG'],
            'CP_TIMESTAMP': addr['CP_TIMESTAMP'],
            'FILE_TIMESTAMP': addr['FILE_TIMESTAMP'],
            'MVCC_TIMESTAMP': addr['MVCC_TIMESTAMP'],
            'UPDATED': addr['UPDATED'],
            'FILE_ROW_NUMBER': addr['FILE_ROW_NUMBER'],
        }

# Now perform a second pass to join the transaction and address data
addr_pd.set_index(['ACCT_NUM', 'CP_RESOLVED'], inplace=True)
tran_addr_pd = tran_cust_pd.join(
    addr_pd,
    on=['ACCT_NUM', 'CP_RESOLVED'],
    how='left',
    rsuffix='_ADDR'
)

# And confirm that there's no more missing data
addr_missing = tran_addr_pd.loc[
    tran_addr_pd['STREET'].isnull()
][['ACCT_NUM', 'CP_RESOLVED']].drop_duplicates(keep=False)
addr_missing

In [None]:
# Next we'll join the transaction data with our city data fields
# based on the resolved timestamps of the transactions and city data

# We'll create a new pandas dataframe to get started
tran_city_pd = tran_addr_pd.join(
    city_pd,
    on=['ZIP', 'CP_RESOLVED'],
    how='left',
    rsuffix='_CITY'
)

# Find city keys without a matching city record
# i.e. the city version has an earlier resolved timestamp
city_missing = tran_city_pd.loc[
    tran_city_pd['CITY'].isnull()
][['ZIP', 'CP_RESOLVED']].drop_duplicates(keep=False)

# Then reset the index and generate city records based on the
# pervious version but with the required transaction resolve time
city_pd.reset_index(inplace=True)
for index, row in city_missing.iterrows():
    city_data = city_pd.query("""
        ZIP == '{}' and \
        CP_RESOLVED < '{}'
    """.format(row['ZIP'], row['CP_RESOLVED']))
    city_data = city_data.loc[
        city_data.groupby(['ZIP'])['CP_RESOLVED']
        .agg(pd.Series.idxmax)
    ]
    for next, city in city_data.iterrows():
        city_pd.loc[len(city_pd.index)] = {
            'ZIP': city['ZIP'],
            'CP_RESOLVED': row['CP_RESOLVED'],
            'CITY_ID': city['CITY_ID'],
            'CITY': city['CITY'],
            'STATE': city['STATE'],
            'CITY_POP': city['CITY_POP'],
            'CP_TIMESTAMP': city['CP_TIMESTAMP'],
            'FILE_TIMESTAMP': city['FILE_TIMESTAMP'],
            'MVCC_TIMESTAMP': city['MVCC_TIMESTAMP'],
            'UPDATED': city['UPDATED'],
            'FILE_ROW_NUMBER': city['FILE_ROW_NUMBER'],
        }

# Now perform a second pass to join the transaction and city data
city_pd.set_index(['ZIP', 'CP_RESOLVED'], inplace=True)
tran_city_pd = tran_addr_pd.join(
    city_pd,
    on=['ZIP', 'CP_RESOLVED'],
    how='left',
    rsuffix='_CITY'
)

# And confirm that there's no more missing data
city_missing = tran_city_pd.loc[
    tran_city_pd['CITY'].isnull()
][['ZIP', 'CP_RESOLVED']].drop_duplicates(keep=False)
city_missing

In [None]:
# Next we'll join the transaction data with our merchant data fields
# based on the resolved timestamps of the transactions and merchant data

# We'll create a new pandas dataframe to get started
tran_merc_pd = tran_city_pd.join(
    merc_pd,
    on=['MERCH_ID', 'CP_RESOLVED'],
    how='left',
    rsuffix='_MERC'
)

# Find merchant keys without a matching merchant record
# i.e. the merchant version has an earlier resolved timestamp
merc_missing = tran_merc_pd.loc[
    tran_merc_pd['MERCHANT'].isnull()
][['MERCH_ID', 'CP_RESOLVED']].drop_duplicates(keep=False)

# Then reset the index and generate merchant records based on the
# pervious version but with the required transaction resolve time
merc_pd.reset_index(inplace=True)
for index, row in merc_missing.iterrows():
    merc_data = merc_pd.query("""
        MERCH_ID == '{}' and \
        CP_RESOLVED < '{}'
    """.format(row['MERCH_ID'], row['CP_RESOLVED']))
    merc_data = merc_data.loc[
        merc_data.groupby(['MERCH_ID'])['CP_RESOLVED']
        .agg(pd.Series.idxmax)
    ]
    for next, merc in merc_data.iterrows():
        merc_pd.loc[len(merc_pd.index)] = {
            'MERCH_ID': merc['MERCH_ID'],
            'CP_RESOLVED': row['CP_RESOLVED'],
            'MERCHANT': merc['MERCHANT'],
            'MERCH_LAT': merc['MERCH_LAT'],
            'MERCH_LNG': merc['MERCH_LNG'],
            'CP_TIMESTAMP': merc['CP_TIMESTAMP'],
            'FILE_TIMESTAMP': merc['FILE_TIMESTAMP'],
            'MVCC_TIMESTAMP': merc['MVCC_TIMESTAMP'],
            'UPDATED': merc['UPDATED'],
            'FILE_ROW_NUMBER': merc['FILE_ROW_NUMBER'],
        }

# Now perform a second pass to join the transaction and merchant data
merc_pd.set_index(['MERCH_ID', 'CP_RESOLVED'], inplace=True)
tran_merc_pd = tran_city_pd.join(
    merc_pd,
    on=['MERCH_ID', 'CP_RESOLVED'],
    how='left',
    rsuffix='_MERC'
)

# Find merchant keys again without a matching merchant record
# i.e. a transaction happened before the first merchant record resolved
merc_missing = tran_merc_pd.loc[
    tran_merc_pd['MERCHANT'].isnull()
][['MERCH_ID', 'CP_RESOLVED']].drop_duplicates(keep=False)

# Then reset the index and generate merchant records based on the
# earliest version but with the required transaction resolve time
merc_pd.reset_index(inplace=True)
for index, row in merc_missing.iterrows():
    merc_data = merc_pd.query("""
        MERCH_ID == '{}' and \
        CP_RESOLVED >= '{}'
    """.format(row['MERCH_ID'], row['CP_RESOLVED']))
    merc_data = merc_data.loc[
        merc_data.groupby(['MERCH_ID'])['CP_RESOLVED']
        .agg(pd.Series.idxmin)
    ]
    for next, merc in merc_data.iterrows():
        merc_pd.loc[len(merc_pd.index)] = {
            'MERCH_ID': merc['MERCH_ID'],
            'CP_RESOLVED': row['CP_RESOLVED'],
            'MERCHANT': merc['MERCHANT'],
            'MERCH_LAT': merc['MERCH_LAT'],
            'MERCH_LNG': merc['MERCH_LNG'],
            'CP_TIMESTAMP': merc['CP_TIMESTAMP'],
            'FILE_TIMESTAMP': merc['FILE_TIMESTAMP'],
            'MVCC_TIMESTAMP': merc['MVCC_TIMESTAMP'],
            'UPDATED': merc['UPDATED'],
            'FILE_ROW_NUMBER': merc['FILE_ROW_NUMBER'],
        }

# Now perform a third pass to join the transaction and merchant data
merc_pd.set_index(['MERCH_ID', 'CP_RESOLVED'], inplace=True)
tran_merc_pd = tran_city_pd.join(
    merc_pd,
    on=['MERCH_ID', 'CP_RESOLVED'],
    how='left',
    rsuffix='_MERC'
)

# And confirm that there's no more missing data
merc_missing = tran_merc_pd.loc[
    tran_merc_pd['MERCHANT'].isnull()
][['MERCH_ID', 'CP_RESOLVED']].drop_duplicates(keep=False)
merc_missing

In [None]:
print("Raw Data: {}".format(tran_df.count()))
print("Resolved: {}".format(tran_merc_pd.shape[0]))

tran_merc_pd.head()

In [None]:
# make a copy of the transaction data and cleanup columns
tran_pd = tran_merc_pd.copy().drop(columns=[
    "TRAN_ID",
    "MERCH_ID",
    "TRANS_NUM",
    "CP_RESOLVED",
    "CP_TIMESTAMP",
    "FILE_TIMESTAMP",
    "MVCC_TIMESTAMP",
    "UPDATED",
    "FILE_ROW_NUMBER",
    "CUST_ID",
    "CP_TIMESTAMP_CUST",
    "FILE_TIMESTAMP_CUST",
    "MVCC_TIMESTAMP_CUST",
    "UPDATED_CUST",
    "FILE_ROW_NUMBER_CUST",
    "ADDR_ID",
    "CP_TIMESTAMP_ADDR",
    "FILE_TIMESTAMP_ADDR",
    "MVCC_TIMESTAMP_ADDR",
    "UPDATED_ADDR",
    "FILE_ROW_NUMBER_ADDR",
    "CITY_ID",
    "CP_TIMESTAMP_CITY",
    "FILE_TIMESTAMP_CITY",
    "MVCC_TIMESTAMP_CITY",
    "UPDATED_CITY",
    "FILE_ROW_NUMBER_CITY",
    "CP_TIMESTAMP_MERC",
    "FILE_TIMESTAMP_MERC",
    "MVCC_TIMESTAMP_MERC",
    "UPDATED_MERC",
    "FILE_ROW_NUMBER_MERC"
])

# Look at descriptive stats on the DataFrame
tran_pd.describe()

### Data cleaning

First, let's force headers to uppercase using Snowpark DataFrame operations for standardization when columns are later written to a Snowflake table.

In [None]:
# Force headers to uppercase
tran_df = session.create_dataframe(tran_pd)
for colname in tran_df.columns:
    new_colname = str.upper(colname)
    tran_df = tran_df.with_column_renamed(colname, new_colname)

tran_df.limit(5)

Check the schema.

In [None]:
list(tran_df.schema)

### Write cleaned data to a Snowflake table

In [None]:
tran_df.write.mode('overwrite').save_as_table('transaction')

In the next notebook, we will perform data transformations with the Snowpark ML Preprocessing API for feature engineering. 