# Statistics

| SF | Duration |
| ----------- | ----------- |
| 10 | 00:00:32 |
| 100 | 00:02:18 |
| 1,000 | 00:04:00 |
| 5,000 | 00:10:54 |
| 10,000 | 00:19:20 |

# Setup

In [57]:
scaleFactor = "SF1000"
storageAccount = "synapsemigrationdemoadls"
container = "tpc-di"
folder = "Batch1"
fileName = "CustomerMgmt.xml"
rowTag = "TPCDI:Action"

StatementMeta(TPCDI, 8, 3, Finished, Available)

In [58]:
filePath = f"abfss://{container}@{storageAccount}.dfs.core.windows.net/{scaleFactor}/{folder}/{fileName}"
tempPath = f"abfss://{container}@{storageAccount}.dfs.core.windows.net/temp"

StatementMeta(TPCDI, 8, 4, Finished, Available)

# Read the XML file and create a temp view on the raw data as all string values

In [59]:
spark.read.format("com.databricks.spark.xml").options(
    rowTag=rowTag, inferSchema=False
).load(filePath).createOrReplaceTempView("v_CustomerMgmt")

StatementMeta(TPCDI, 8, 5, Finished, Available)

In [49]:
# df = spark.read.format('com.databricks.spark.xml') \
#  .options(rowTag=rowTag, inferSchema=False) \
#  .load(filePath)

StatementMeta(TPCDI, 7, 50, Finished, Available)

In [50]:
# display(df)

StatementMeta(TPCDI, 7, 51, Finished, Available)

# Now insert into CustomerMgmt table with nested values parsed and data types applied

In [60]:
df = spark.sql(
    f"""
  SELECT 
    cast(Customer._C_ID as BIGINT) customerid, 
    cast(Customer.Account._CA_ID as BIGINT) accountid,
    cast(Customer.Account.CA_B_ID as BIGINT) brokerid, 
    nullif(Customer._C_TAX_ID, '') taxid,
    nullif(Customer.Account.CA_NAME, '') accountdesc, 
    --cast(Customer.Account._CA_TAX_ST as TINYINT) taxstatus,
    cast(Customer.Account._CA_TAX_ST as INT) taxstatus,
    decode(_ActionType,
      "NEW","Active",
      "ADDACCT","Active",
      "UPDACCT","Active",
      "UPDCUST","Active",
      "CLOSEACCT","Inactive",
      "INACT","Inactive") status,
    nullif(Customer.Name.C_L_NAME, '') lastname, 
    nullif(Customer.Name.C_F_NAME, '') firstname, 
    nullif(Customer.Name.C_M_NAME, '') middleinitial, 
    nullif(upper(Customer._C_GNDR), '') gender,
    --cast(Customer._C_TIER as TINYINT) tier, 
    cast(Customer._C_TIER as INT) tier, 
    cast(Customer._C_DOB as DATE) dob,
    --cast(Customer._C_DOB as TIMESTAMP) dob,
    nullif(Customer.Address.C_ADLINE1, '') addressline1, 
    nullif(Customer.Address.C_ADLINE2, '') addressline2, 
    nullif(Customer.Address.C_ZIPCODE, '') postalcode,
    nullif(Customer.Address.C_CITY, '') city, 
    nullif(Customer.Address.C_STATE_PROV, '') stateprov,
    nullif(Customer.Address.C_CTRY, '') country, 
    nvl2(
      nullif(Customer.ContactInfo.C_PHONE_1.C_LOCAL, ''),
      concat(
          nvl2(nullif(Customer.ContactInfo.C_PHONE_1.C_CTRY_CODE, ''), '+' || Customer.ContactInfo.C_PHONE_1.C_CTRY_CODE || ' ', ''),
          nvl2(nullif(Customer.ContactInfo.C_PHONE_1.C_AREA_CODE, ''), '(' || Customer.ContactInfo.C_PHONE_1.C_AREA_CODE || ') ', ''),
          Customer.ContactInfo.C_PHONE_1.C_LOCAL,
          nvl(Customer.ContactInfo.C_PHONE_1.C_EXT, '')),
      cast(null as string)) phone1,
    nvl2(
      nullif(Customer.ContactInfo.C_PHONE_2.C_LOCAL, ''),
      concat(
          nvl2(nullif(Customer.ContactInfo.C_PHONE_2.C_CTRY_CODE, ''), '+' || Customer.ContactInfo.C_PHONE_2.C_CTRY_CODE || ' ', ''),
          nvl2(nullif(Customer.ContactInfo.C_PHONE_2.C_AREA_CODE, ''), '(' || Customer.ContactInfo.C_PHONE_2.C_AREA_CODE || ') ', ''),
          Customer.ContactInfo.C_PHONE_2.C_LOCAL,
          nvl(Customer.ContactInfo.C_PHONE_2.C_EXT, '')),
      cast(null as string)) phone2,
    nvl2(
      nullif(Customer.ContactInfo.C_PHONE_3.C_LOCAL, ''),
      concat(
          nvl2(nullif(Customer.ContactInfo.C_PHONE_3.C_CTRY_CODE, ''), '+' || Customer.ContactInfo.C_PHONE_3.C_CTRY_CODE || ' ', ''),
          nvl2(nullif(Customer.ContactInfo.C_PHONE_3.C_AREA_CODE, ''), '(' || Customer.ContactInfo.C_PHONE_3.C_AREA_CODE || ') ', ''),
          Customer.ContactInfo.C_PHONE_3.C_LOCAL,
          nvl(Customer.ContactInfo.C_PHONE_3.C_EXT, '')),
      cast(null as string)) phone3,
    nullif(Customer.ContactInfo.C_PRIM_EMAIL, '') email1,
    nullif(Customer.ContactInfo.C_ALT_EMAIL, '') email2,
    nullif(Customer.TaxInfo.C_LCL_TX_ID, '') lcl_tx_id, 
    nullif(Customer.TaxInfo.C_NAT_TX_ID, '') nat_tx_id, 
    to_timestamp(_ActionTS) update_ts,
    _ActionType ActionType
  FROM v_CustomerMgmt
"""
)

StatementMeta(TPCDI, 8, 6, Finished, Available)

In [52]:
# display(df)

StatementMeta(TPCDI, 7, 53, Finished, Available)

In [61]:
# from pyspark.sql.functions import col, length, max

# df_test=df.select([max(length(col(name))).alias(name) for name in df.schema.names])
# display(df_test)

StatementMeta(TPCDI, 8, 7, Finished, Available)

SynapseWidget(Synapse.DataFrame, 5a536f46-8b2f-462b-b447-8fce00d3394b)

%%sql
select * from v_CustomerMgmt
where length(Customer.ContactInfo.C_ALT_EMAIL)=51
--max(length(Customer.ContactInfo.C_ALT_EMAIL))

In [53]:
df.printSchema()

StatementMeta(TPCDI, 7, 54, Finished, Available)

root
 |-- customerid: long (nullable = true)
 |-- accountid: long (nullable = true)
 |-- brokerid: long (nullable = true)
 |-- taxid: string (nullable = true)
 |-- accountdesc: string (nullable = true)
 |-- taxstatus: integer (nullable = true)
 |-- status: string (nullable = true)
 |-- lastname: string (nullable = true)
 |-- firstname: string (nullable = true)
 |-- middleinitial: string (nullable = true)
 |-- gender: string (nullable = true)
 |-- tier: integer (nullable = true)
 |-- dob: date (nullable = true)
 |-- addressline1: string (nullable = true)
 |-- addressline2: string (nullable = true)
 |-- postalcode: string (nullable = true)
 |-- city: string (nullable = true)
 |-- stateprov: string (nullable = true)
 |-- country: string (nullable = true)
 |-- phone1: string (nullable = true)
 |-- phone2: string (nullable = true)
 |-- phone3: string (nullable = true)
 |-- email1: string (nullable = true)
 |-- email2: string (nullable = true)
 |-- lcl_tx_id: string (nullable = true)
 |-- na

# Save to ADLSgen2

In [54]:
export_path = f"abfss://{container}@{storageAccount}.dfs.core.windows.net/{scaleFactor}/Batch1/CustomerMgmt"
print("export file path: " + export_path)

StatementMeta(TPCDI, 7, 55, Finished, Available)

export file path: abfss://tpc-di@synapsemigrationdemoadls.dfs.core.windows.net/SF10000/Batch1/CustomerMgmt


In [55]:
# df.write.parquet(export_path, mode = 'overwrite')
df.write.options(
    header="false",
    delimiter="|",
    dateFormat="yyyy-MM-dd",
    timestampFormat="yyyy-MM-dd HH:mm:ss",
).mode("overwrite").csv(export_path)

StatementMeta(TPCDI, 7, 56, Finished, Available)