In [None]:
# READ DCRM ACCOUNTS TABLE FROM SILVER
from pyspark.sql.functions import col, concat

dcrm_accounts = spark.read.format("delta").load("abfss://Mythic@onelake.dfs.fabric.microsoft.com/silver_lakehouse.Lakehouse/Tables/nucleus__dcrm_accounts")

#add prefix to column names
for column in dcrm_accounts.columns:
    dcrm_accounts = dcrm_accounts.withColumnRenamed(column, "dcrm_" + column)

# Update dcrm_account_id by concatenating it with dcrm_oe_id
dcrm_accounts = dcrm_accounts.withColumn("dcrm_account_id", concat(dcrm_accounts["dcrm_account_id"], dcrm_accounts["dcrm_oe_id"]))
    
display(dcrm_accounts.limit(10))

In [None]:
from pyspark.sql.functions import col, when, concat
from pyspark.sql import functions as F

# READ ICRM ACCOUNTS TABLE FROM SILVER
icrm_accounts = spark.read.format("delta").load("abfss://Mythic@onelake.dfs.fabric.microsoft.com/silver_lakehouse.Lakehouse/Tables/nucleus__icrm_accounts")

# add prefix to column names
for column in icrm_accounts.columns:
    icrm_accounts = icrm_accounts.withColumnRenamed(column, "icrm_" + column)

# Update icrm_account_id by concatenating it with icrm_bac_code
icrm_accounts = icrm_accounts.withColumn("icrm_account_id", concat(icrm_accounts["icrm_account_id"], icrm_accounts["icrm_bac_code"]))

# Clean up erroneous data in fields
icrm_accounts = icrm_accounts.withColumn("icrm_f___i_territory_description", 
                when(col("icrm_f___i_territory_description") == "None", "")
                .otherwise(col("icrm_f___i_territory_description")))
icrm_accounts = icrm_accounts.withColumn("icrm_account_executive_p_c", 
                when((col("icrm_account_executive_p_c") == "None") | 
                     (col("icrm_account_executive_p_c") == "Unassigned"), "")
                .otherwise(col("icrm_account_executive_p_c")))
# Below is commented out because it was removed from the nucleus table
# icrm_accounts = icrm_accounts.withColumn("icrm_account_executive_f_i", 
#                 when(col("icrm_account_executive_f_i") == "Unassigned Owner", "")
#                 .otherwise(col("icrm_account_executive_f_i")))

# Look up PDNs by account name to correct PDNs in ICRM. This is meant to overwrite when ICRM has a PDN that doesn’t match DCRM by account name.
# Perform a left join on the condition that account names match
joined_df = icrm_accounts.alias("icrm").join(
    dcrm_accounts.alias("dcrm"),
    col("icrm.icrm_account_dba_name") == col("dcrm.dcrm_account_name"),
    "left"
)
# Use the withColumn method to conditionally update the icrm_pdn column
icrm_accounts_updated = joined_df.withColumn(
    "icrm_pdn",
    when(
        col("icrm.icrm_account_dba_name") == col("dcrm.dcrm_account_name"),
        col("dcrm.dcrm_oe_id")
    ).otherwise(col("icrm.icrm_pdn"))
)
# Assuming `icrm_accounts.columns` contains the column names as they are in the original DataFrame without any alias prefix.
result_df = icrm_accounts_updated.select(
    *[col(c) if c != 'icrm_pdn' else col('icrm_pdn') for c in icrm_accounts.columns]
)
    
display(result_df.limit(20))

In [None]:
# JOIN DCRM AND ICRM ACCOUNTS TABLES

joined_accounts = result_df.join(dcrm_accounts, result_df.icrm_pdn == dcrm_accounts.dcrm_oe_id, "right")

# Customer Filter for dcrm accounts where oe_id is null
joined_accounts = joined_accounts.filter(
    (dcrm_accounts.dcrm_oe_id.isNotNull())
)

# Customer Filter for icrm accounts where pdn is null and dealer status is Active
icrm_accounts_filtered = icrm_accounts.filter(
    (F.col("pdn").isNull()) &
    (F.col("dealer_status") == "Active")
)

# union these filtered records to joined_accounts
joined_accounts = joined_accounts.unionByName(icrm_accounts_filtered, allowMissingColumns=True)

#Temp removing 'ae_f_i_user_id' ,'ae_p_c_user_id'

cols = [
    'dcrm_account_id'
    ,'dcrm_account_name'
    ,'dcrm_additional_attributes'
    ,'ae_autofi_user_id'
    ,'ae_csg_user_id'
    ,'dcrm_dealer_status'
    ,'dcrm_group_id'
    ,'dcrm_group_namne'
    ,'dcrm_make'
    ,'dealer_id_pdn'
    ,'dcrm_oem'
    ,'dcrm_parent_account'
    ,'dcrm_rbc'
    ,'icrm_account_dba_name'
    ,'icrm_account_id'
    ,'dealer_id_bac'
    ,'icrm_dealer_group_code'
    ,'icrm_dealer_group_name'
    ,'icrm_dealer_status'
    ,'icrm_dealer_type'
    ,'icrm_f_i_territory_description'
    ,'icrm_f_i_relationship'
    ,'icrm_makes'
    ,'icrm_parent_account'
    ,'icrm_parent_bac'
    ,'icrm_dealer_id_pdn'
    ,'icrm_region_description'
    ,'icrm_sales_area_description'
    ,'name'
]

joined_accounts = joined_accounts\
    .withColumnRenamed("dcrm_consumer_core_team___auto_finance_ae", "ae_autofi_user_id")\
    .withColumnRenamed("dcrm_consumer_core_team___csg_ae", "ae_csg_user_id")\
    .withColumnRenamed("dcrm_oe_id", "dealer_id_pdn")\
    .withColumnRenamed("icrm_account_executive_f&i","ae_f_i_user_id")\
    .withColumnRenamed("icrm_account_executive_p&c","ae_p_c_user_id")\
    .withColumnRenamed("icrm_bac_code", "dealer_id_bac")\
    .withColumnRenamed("icrm_f___i_territory_description", "icrm_f_i_territory_description")\
    .withColumnRenamed("icrm_f&i_relationship", "icrm_f_i_relationship")\
    .withColumnRenamed("icrm_pdn", "icrm_dealer_id_pdn")\
    .withColumnRenamed("dcrm_group_name", "dcrm_group_namne")\
    .withColumn("name", F.coalesce(F.col("dcrm_account_name"), F.col("icrm_account_dba_name")))\
    .select(*cols)

#Write to table
joined_accounts.write.format("delta")\
        .mode("overwrite")\
        .option("overwriteSchema", "true")\
        .save('abfss://Mythic@onelake.dfs.fabric.microsoft.com/smartsync_lakehouse.Lakehouse/Tables/nucleus_accounts_for_hubspot_companies')

display(joined_accounts.limit(20))
joined_accounts.count()

### New Companies For Insert Into Hubspot

In [None]:
from pyspark.sql.functions import col, coalesce, when

# Read hubspot_contacts from Silver
hubspot_companies = spark.read.format("delta").load("abfss://Mythic@onelake.dfs.fabric.microsoft.com/silver_lakehouse.Lakehouse/Tables/hubspot_sandbox__company") 

# Join nucleus_contacts_for_hubspot_contacts table with HubSpot records
joined_df = joined_accounts.join(hubspot_companies, (joined_accounts.icrm_dealer_id_pdn == hubspot_companies.property_icrm_dealer_id_pdn) | (joined_accounts.dealer_id_pdn == hubspot_companies.property_dealer_id_pdn), "left").select(joined_accounts["*"],hubspot_companies["id"],hubspot_companies["property_icrm_dealer_id_pdn"],hubspot_companies["property_dealer_id_pdn"])

# #Filter where hubspot 'id' is null to identify new records for insert
filtered_df = joined_df.filter(joined_df["id"].isNull())

insert_df = filtered_df.drop("id").drop("property_icrm_dealer_id_pdn").drop("property_dealer_id_pdn")

display(insert_df)
insert_df.write.format("delta")\
    .mode("overwrite")\
    .option("overwriteSchema", "true")\
    .save('abfss://Mythic@onelake.dfs.fabric.microsoft.com/smartsync_lakehouse.Lakehouse/Tables/hubspot_companies_insert')


insert_df.count()

### Existing Companies to Update in Hubspot

In [None]:
from pyspark.sql.functions import col, coalesce, when

# Read hubspot_contacts from Silver
hubspot_companies = spark.read.format("delta").load("abfss://Mythic@onelake.dfs.fabric.microsoft.com/silver_lakehouse.Lakehouse/Tables/hubspot_sandbox__company") 

# Join nucleus_contacts_for_hubspot_contacts table with HubSpot records
joined_df2 = joined_accounts.join(hubspot_companies, (joined_accounts.icrm_dealer_id_pdn == hubspot_companies.property_icrm_dealer_id_pdn) | (joined_accounts.dealer_id_pdn == hubspot_companies.property_dealer_id_pdn), "inner").select(joined_accounts["*"],hubspot_companies["id"])
#joined_df2.count()


# Identify records to update
update_conditions = (
    (col("dcrm_account_id") != col("property_dcrm_account_id")) |
    (col("dcrm_account_name") != col("property_dcrm_account_name")) |
    (col("dcrm_additional_attributes") != col("property_dcrm_additional_attributes")) |
    (col("ae_autofi_user_id") != col("property_ae_autofi_user_id")) |
    (col("ae_csg_user_id") != col("property_ae_csg_user_id")) |
    (col("dcrm_dealer_status") != col("property_dcrm_dealer_status")) |
    (col("dcrm_group_id") != col("property_dcrm_group_id")) |
    (col("dcrm_group_namne") != col("property_dcrm_group_name")) |
    (col("dcrm_make") != col("property_dcrm_make")) |
    (col("dcrm_oem") != col("property_dcrm_oem")) |
    (col("dcrm_parent_account") != col("property_dcrm_parent_account")) |
    (col("dcrm_rbc") != col("property_dcrm_rbc")) |
    (col("icrm_account_dba_name") != col("property_icrm_account_dba_name")) |
    (col("icrm_account_id") != col("property_icrm_account_id")) |
    (col("dealer_id_bac") != col("property_dealer_id_bac")) |
    (col("icrm_dealer_group_code") != col("property_icrm_dealer_group_code")) |
    (col("icrm_dealer_group_name") != col("property_icrm_dealer_group_name")) |
    (col("icrm_dealer_status") != col("property_icrm_dealer_status")) |
    (col("icrm_dealer_type") != col("property_icrm_dealer_type")) |
    (col("icrm_f_i_territory_description") != col("property_icrm_f_i_territory_description")) |
    (col("icrm_f_i_relationship") != col("property_icrm_f_i_relationship")) |
    (col("icrm_makes") != col("property_icrm_makes")) |
    (col("icrm_parent_account") != col("property_icrm_parent_account")) |
    (col("icrm_parent_bac") != col("property_icrm_parent_bac")) |
    (col("icrm_dealer_id_pdn") != col("property_icrm_dealer_id_pdn")) |
    (col("icrm_region_description") != col("property_icrm_region_description")) |
    (col("icrm_sales_area_description") != col("property_icrm_sales_area_description")) |
    (col("name") != col("property_name"))
)

update_df = joined_df2.filter(update_conditions)
#update_df.count()

final_update_df = update_df.dropDuplicates(["id"])

display(final_update_df)
final_update_df.write.format("delta")\
    .mode("overwrite")\
    .option("overwriteSchema", "true")\
    .save('abfss://Mythic@onelake.dfs.fabric.microsoft.com/smartsync_lakehouse.Lakehouse/Tables/hubspot_companies_update')

final_update_df.count()