Skip to content

Commit

Permalink
Merge pull request #7 from stewartbryson/stewart
Browse files Browse the repository at this point in the history
Code cleanup. New blog post.
  • Loading branch information
stewartbryson committed Sep 11, 2023
2 parents 991972c + dcbe8ec commit c109f89
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 25 deletions.
190 changes: 190 additions & 0 deletions blogs/customer.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
import sys, json
from snowflake.snowpark import Session, DataFrame
from snowflake.snowpark.types import *
from snowflake.snowpark.functions import *
from pathlib import Path

# Read the credentials.json file
with open("credentials.json") as jsonfile:
credentials_dict = json.load(jsonfile)

# build the session
session = (
Session
.builder
.configs(credentials_dict)
.create()
)

# glob the files
pathlist = (
Path('/Users/stewartbryson/dev/tpcdi-output/Batch1')
.glob("CustomerMgmt.xml")
)
stage_path = "@tpcdi/Batch1/"

for file in pathlist:
# put the file(s) in the stage
put_result = (
session
.file
.put(
str(file),
stage_path,
parallel=4,
auto_compress=True,
overwrite=False
)
)
for result in put_result:
print(f"File {result.source}: {result.status}")

# Simplifies retrieving XML elements
def get_xml_element(
column:str,
element:str,
datatype:str,
with_alias:bool = True
):
new_element = (
get(
xmlget(
col(column),
lit(element),
),
lit('$')
)
.cast(datatype)
)

# alias needs to be optional
return (
new_element.alias(element) if with_alias else new_element
)

# Simplifies retrieving XML attributes
def get_xml_attribute(
column:str,
attribute:str,
datatype:str,
with_alias:bool = True
):
new_attribute = (
get(
col(column),
lit(f"@{attribute}")
)
.cast(datatype)
)

# alias needs to be optional
return (
new_attribute.alias(attribute) if with_alias else new_attribute
)

# Simplifies the logic for constructing a phone number from multiple nested fields
def get_phone_number(
phone_id:str,
separator:str = '-'
):
return (
concat (
get_xml_element(f"phone{phone_id}", 'C_CTRY_CODE', 'STRING', False),
when(get_xml_element(f"phone{phone_id}", 'C_CTRY_CODE', 'STRING', False) == '', '')
.otherwise(separator),
get_xml_element(f"phone{phone_id}", 'C_AREA_CODE', 'STRING', False),
when(get_xml_element(f"phone{phone_id}", 'C_AREA_CODE', 'STRING', False) == '', '')
.otherwise(separator),
get_xml_element(f"phone{phone_id}", 'C_LOCAL', 'STRING', False),
when(get_xml_element(f"phone{phone_id}", 'C_EXT', 'STRING', False) == '', '')
.otherwise(" ext: "),
get_xml_element(f"phone{phone_id}", 'C_EXT', 'STRING', False)
)
.alias(f"c_phone_{phone_id}")
)


# Read the XML file into a DataFrame and show it
table_name = 'customer_mgmt'
df = (
session
.read
.option('STRIP_OUTER_ELEMENT', True) # Strips the TPCDI:Actions node
.xml(f"{stage_path}CustomerMgmt.xml")
.select(
# flatten out all of the nested elements
xmlget(col('$1'), lit('Customer'), 0).alias('customer'),
xmlget(col('customer'), lit('Name'), 0).alias('name'),
xmlget(col('customer'), lit('Address'), 0).alias('address'),
xmlget(col('customer'), lit('ContactInfo'), 0).alias('contact_info'),
xmlget(col('contact_info'), lit('C_PHONE_1')).alias('phone1'),
xmlget(col('contact_info'), lit('C_PHONE_2')).alias('phone2'),
xmlget(col('contact_info'), lit('C_PHONE_3')).alias('phone3'),
xmlget(col('customer'), lit('TaxInfo'), 0).alias('tax_info'),
xmlget(col('customer'), lit('Account'), 0).alias('account'),
# get the Action attributes
get_xml_attribute('$1','ActionType','STRING'),
get_xml_attribute('$1','ActionTS','STRING'),
)
.select(
# Handling Action attributes
to_timestamp(
col('ActionTs'),
lit('yyyy-mm-ddThh:mi:ss')
).alias('action_ts'),
col('ActionType').alias('ACTION_TYPE'),
# Get Customer Attributes
get_xml_attribute('customer','C_ID','NUMBER'),
get_xml_attribute('customer','C_TAX_ID','STRING'),
get_xml_attribute('customer','C_GNDR','STRING'),
try_cast(
get_xml_attribute('customer','C_TIER','STRING', False),
'NUMBER'
).alias('c_tier'),
get_xml_attribute('customer','C_DOB','DATE'),
# Get Name elements
get_xml_element('name','C_L_NAME','STRING'),
get_xml_element('name','C_F_NAME','STRING'),
get_xml_element('name','C_M_NAME','STRING'),
# Get Address elements
get_xml_element('address','C_ADLINE1','STRING'),
get_xml_element('address', 'C_ADLINE2', 'STRING'),
get_xml_element('address','C_ZIPCODE','STRING'),
get_xml_element('address','C_CITY','STRING'),
get_xml_element('address','C_STATE_PROV','STRING'),
get_xml_element('address','C_CTRY','STRING'),
# Get Contact Info elements
get_xml_element('contact_info','C_PRIM_EMAIL','STRING'),
get_xml_element('contact_info','C_ALT_EMAIL','STRING'),
# Contruct phone numbers from multi-nested elements
get_phone_number('1'),
get_phone_number('2'),
get_phone_number('3'),
# Get TaxInfo elements
get_xml_element('tax_info','C_LCL_TX_ID','STRING'),
get_xml_element('tax_info','C_NAT_TX_ID','STRING'),
# Get Account Attributes
get_xml_attribute('account','CA_ID','STRING'),
get_xml_attribute('account','CA_TAX_ST','NUMBER'),
# Get Account elements
get_xml_element('account','CA_B_ID','NUMBER'),
get_xml_element('account','CA_NAME','STRING'),
)
.write
.mode("overwrite")
.save_as_table(table_name)
)

print(f"{table_name.upper()} table created.")

df = (
session
.table('customer_mgmt')
.select(
col('action_ts'),
col('c_id'),
col('c_tier'),
col('c_phone_1')
)
.show()
)
File renamed without changes.
16 changes: 1 addition & 15 deletions models/bronze/finwire/finwire_companies.sql
Original file line number Diff line number Diff line change
@@ -1,18 +1,4 @@

select
pts,
company_name,
to_number(cik) as cik,
status,
industry_id,
sp_rating,
to_date(nullif(trim(founding_date), ''), 'yyyymmdd') as founding_date,
address_line1,
address_line2,
postal_code,
city,
state_province,
country,
ceo_name,
description
*
from {{ source("finwire", "cmp") }}
30 changes: 20 additions & 10 deletions tpcdi.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,10 +144,10 @@ def get_xml_element(
with_alias:bool = True
):
new_element = get(xmlget(col(column), lit(element)), lit('$')).cast(datatype)
if with_alias:
return new_element.alias(element)
else:
return new_element
# alias needs to be optional
return (
new_element.alias(element) if with_alias else new_element
)

# Simplifies the DataFrame transformations for retrieving XML attributes
def get_xml_attribute(
Expand All @@ -157,10 +157,10 @@ def get_xml_attribute(
with_alias:bool = True
):
new_attribute = get(col(column), lit(f"@{attribute}")).cast(datatype)
if with_alias:
return new_attribute.alias(attribute)
else:
return new_attribute
# alias needs to be optional
return (
new_attribute.alias(attribute) if with_alias else new_attribute
)

# Simplifies the logic for constructing a phone number from multiple nested fields
def get_phone_number(
Expand Down Expand Up @@ -304,7 +304,7 @@ def get_phone_number(
# Get Contact Info elements
get_xml_element('contact_info','C_PRIM_EMAIL','STRING'),
get_xml_element('contact_info','C_ALT_EMAIL','STRING'),
# Contruct a phone number from multi-nested elements
# Contruct phone numbers from multi-nested elements
get_phone_number('1'),
get_phone_number('2'),
get_phone_number('3'),
Expand Down Expand Up @@ -456,7 +456,17 @@ def get_phone_number(
.withColumn('status', substring(col("line"), lit(89), lit(4)))
.withColumn('industry_id', substring(col("line"), lit(93), lit(2)))
.withColumn('sp_rating', substring(col("line"), lit(95), lit(4)))
.withColumn('founding_date', substring(col("line"), lit(99), lit(8)))
.withColumn(
'founding_date',
try_cast(
trim(
substring(
col("line"), lit(99), lit(8)
)
),
'DATE'
)
)
.withColumn('address_line1', substring(col("line"), lit(107), lit(80)))
.withColumn('address_line2', substring(col("line"), lit(187), lit(80)))
.withColumn('postal_code', substring(col("line"), lit(267), lit(12)))
Expand Down

0 comments on commit c109f89

Please sign in to comment.