In [0]:
# List all files in the specified directory
files = dbutils.fs.ls('/Volumes/aitest/crm_bronze/crm_files')

# Display the list of files
for file in files:
    print(file.path)

In [0]:
spark.table()

In [0]:
# Step 1: Define the file path correctly
files = 'dbfs:/Volumes/aitest/crm_bronze/crm_files/Dataset'

# Step 2: Read the CSV file into a DataFrame
df = spark.read.format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .load(f'{files}/Account.csv')

# Step 3: Display the DataFrame
display(df)

Databricks data profile. Run in Databricks to view.

In [0]:
# Create a Delta Live Table from the DataFrame
df.write.format('delta') \
    .option('mergeSchema', 'true') \
    .option('delta.columnMapping.mode', 'name') \
    .mode('overwrite') \
    .saveAsTable('crm_bronze.dlt_account')

In [0]:
%sql select * from crm_bronze.dlt_account 
  order by `Last Activity` desc
  limit 100;

In [0]:
# Column descriptions to be added as comments in the table
column_descriptions = {
    "Account Fax": "Fax number associated with the account",
    "Account ID": "Unique identifier for each account",
    "Account Name": "The name of the account",
    "Account Number": "A unique number assigned to the account",
    "Account Rating": "Rating assigned to the account based on some criteria",
    "Account Site": "Location or site associated with the account",
    "Account Type": "Type or category of the account",
    "Application": "Application associated with the account",
    "Billing City": "City for billing purposes",
    "Billing Country": "Country for billing purposes",
    "Billing Geocode Accuracy": "Accuracy level of the geocode associated with billing information",
    "Billing State/Province": "State or province for billing purposes",
    "Create in Zendesk": "Indicates whether the account was created in Zendesk",
    "Created By ID": "Unique identifier of the user who created the account",
    "Created Date": "Date and time when the account was created",
    "Created Updated Flag": "Flag indicating whether the record was created or updated",
    "Deleted": "Indicates whether the account is deleted",
    "Domain Mapping": "Mapping of the domain associated with the account",
    "Industry": "Industry type associated with the account",
    "Is Demo Center": "Indicates whether the account is a demo center",
    "Jigsaw Company ID": "Identifier associated with the Jigsaw company",
    "Last Activity": "Date of the last activity associated with the account",
    "Last Modified By ID": "Unique identifier of the user who last modified the account",
    "Last Modified Date": "Date and time when the account was last modified",
    "Last Referenced Date": "Date when the account was last referenced",
    "Last Sync Date": "Date of the last synchronization",
    "Last Sync Status": "Status of the last synchronization",
    "Last Viewed Date": "Date when the account was last viewed",
    "Lead Type": "Type of lead associated with the account",
    "Life Science KeyAccount": "Indicates whether the account is a key account in life sciences"
}


# Add column descriptions as comments
for column, description in column_descriptions.items():
    spark.sql(f"ALTER TABLE crm_bronze.dlt_account CHANGE COLUMN `{column}` COMMENT '{description}'")

In [0]:
# Add Lead.csv to df1
df1 = spark.read.format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .load(f'{files}/Lead.csv')

display(df1)

Databricks data profile. Run in Databricks to view.

In [0]:
# Create a Delta Live Table from the DataFrame
df1.write.format('delta') \
    .option('mergeSchema', 'true') \
    .option('delta.columnMapping.mode', 'name') \
    .mode('overwrite') \
    .saveAsTable('crm_bronze.dlt_lead')

In [0]:
%sql select * from crm_bronze.dlt_lead limit 100;

In [0]:
# Add comments to dlt_lead columns
column_descriptions = {
    "Alyssa has been Notified": "Indicates whether Alyssa has been notified",
    "Auto Convert All Leads From This Company": "Automatically convert all leads from this company",
    "Bio Reactors used": "Information about the bio reactors used",
    "Cell Culture Media": "Type of cell culture media used",
    "Cell Type": "Type of cell",
    "City": "City of the lead or account",
    "Companion Lead": "Indicates if its a companion lead",
    "Converted": "Indicates if the lead has been converted",
    "Converted Account ID": "ID of the converted account",
    "Converted Opportunity ID": "ID of the converted opportunity",
    "Country": "Country of the lead or account",
    "Create in Zendesk": "Indicates whether the lead was created in Zendesk",
    "Created By eContacts": "User who created the lead in eContacts",
    "Created Date": "Date and time when the lead was created",
    "Dead Reason": "Reason for marking the lead as dead",
    "Email Opt Out": "Indicates if the lead has opted out of email communications",
    "Google Analytics Campaign": "Campaign information from Google Analytics",
    "Google Analytics Content": "Content information from Google Analytics",
    "Google Analytics Medium": "Medium information from Google Analytics",
    "Google Analytics Source": "Source information from Google Analytics",
    "Google Analytics Term": "Term information from Google Analytics",
    "Incompatible MS Details": "Details about compatibility with Microsoft",
    "Industry": "Industry associated with the lead",
    "isCreatedUpdatedFlag": "Flag indicating if the lead has been created or updated",
    "Key Account": "Indicates if the lead is a key account",
    "Last Status Change": "Date of the last status change for the lead",
    "Last Sync Date": "Date of the last synchronization",
    "Last Sync Status": "Status of the last synchronization",
    "Lead Application": "Application associated with the lead",
    "Lead ID": "Unique identifier for each lead",
    "Lead Source": "Source through which the lead was acquired",
    "Lead Status at Conversion": "Lead status at the time of conversion",
    "Lead Status Automation Override": "Override for lead status automation",
    "Lead Type": "Type or category of the lead",
    "LeadConSource": "Source of the lead conversion",
    "LeadRecordType": "Record type associated with the lead",
    "Location Text": "Textual description of the location",
    "LS Other Research Area": "Other research area in life sciences",
    "LS Research Area": "Research area in life sciences",
    "LS Team Notified": "Indicates whether the life sciences team has been notified",
    "Marketing Segmentation": "Segmentation information for marketing purposes",
    "Mass Spec Manufacturer": "Manufacturer of the mass spectrometer",
    "Mass Spec Type": "Type of mass spectrometer",
    "Media Provider": "Provider of the media",
    "Needs Score Synced": "Indicates if the score needs to be synced",
    "Next_Step__c (Leads)": "Next step in the lead process",
    "Notes": "Additional notes or comments",
    "Opted Out of Email": "Indicates if the lead has opted out of email communications",
    "Organization": "Organization associated with the lead",
    "Other Application": "Other application associated with the lead",
    "Other Dead Reason": "Other reason for marking the lead as dead",
    "Other Mass Spec Type": "Other type of mass spectrometer",
    "Other Research Area": "Other research area",
    "Pardot Conversion Date": "Date of conversion in Pardot",
    "Pardot Conversion Object Type": "Object type of conversion in Pardot",
    "Pardot Created Date": "Date when the lead was created in Pardot",
    "Pardot First Activity": "Date of the first activity recorded in Pardot",
    "Pardot First Referrer Query": "Referrer query for the first visit in Pardot",
    "Pardot First Referrer Type": "Referrer type for the first visit in Pardot",
    "Pardot Grade": "Grade assigned in Pardot",
    "Pardot Hard Bounced": "Indicates if the lead has hard bounced in Pardot",
    "Pardot Last Activity": "Date of the last activity recorded in Pardot",
    "Pardot Last Scored At": "Date when the lead was last scored in Pardot",
    "Pre-Act-on Working Lead": "Indicates if the lead is a pre-Act-on working lead",
    "Primary Application": "Primary application associated with the lead",
    "Product Category": "Category of the product",
    "Record Type ID": "Unique identifier for the record type",
    "Region": "Region associated with the lead",
    "Research Area": "Main research area of interest",
    "Secondary Application": "Secondary application associated with the lead",
    "Secondary Email": "Secondary email address",
    "SS Team Notified": "Indicates whether the social sciences team has been notified",
    "State/Province": "State or province of the lead or account",
    "Status": "Current status of the lead",
    "Status (Simplified)": "Simplified status of the lead",
    "Trained": "Indicates if the lead has been trained",
    "Web Form Applications": "Applications from web forms",
    "Web Lead Notification Sent": "Indicates if a notification has been sent for a web lead",
    "Zendesk Result": "Result in Zendesk",
    "Zendesk User Id": "User ID in Zendesk",
    "Zendesk_OutofSync": "Indicates if the lead is out of sync in Zendesk",
    "# Converted Accounts": "Number of converted accounts",
    "# Converted Opportunities": "Number of converted opportunities",
    "Campaign Membership Count": "Count of campaign memberships",
    "Conversion Rate": "Rate of lead conversion",
    "Lead Score": "Score assigned to the lead",
    "Lead Score1": "Another score assigned to the lead",
    "Location (Latitude)": "Latitude of the location",
    "Location (Longitude)": "Longitude of the location",
    "Number of Records": "Number of records associated with the lead",
    "Pardot Score": "Score assigned in Pardot",
    "Population Density": "Density of the population",
    "Total Leads": "Total number of leads"
}

for column, description in column_descriptions.items():
    spark.sql(f"ALTER TABLE crm_bronze.dlt_lead CHANGE COLUMN `{column}` COMMENT '{description}'")

In [0]:
# Add Opportunity_Table.csv to df2
df2 = spark.read.format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .load(f'{files}/Opportunity_Table.csv')

display(df2)

Databricks data profile. Run in Databricks to view.

In [0]:
# Create a Delta Live Table from the DataFrame
df2.write.format('delta') \
    .option('mergeSchema', 'true') \
    .option('delta.columnMapping.mode', 'name') \
    .mode('overwrite') \
    .saveAsTable('crm_bronze.dlt_opportunityTable')

In [0]:
column_descriptions = {
    "Account ID": "Unique identifier for each account",
    "Backlog Rev": "Backlog revenue for the opportunity",
    "Bio Reactors used": "Information about the bio reactors used",
    "BM Test": "Information about the BM test",
    "Campaign ID": "Identifier for the associated marketing campaign",
    "Cell Culture Media": "Type of cell culture media used",
    "Cell Type": "Type of cell",
    "Close Date": "Date when the opportunity was closed",
    "Closed": "Indicates if the opportunity is closed",
    "Closed Lost Reason": "Reason for closing the opportunity as lost",
    "Competitive Product Details": "Details about competitive products",
    "Contact ID": "Unique identifier for the associated contact",
    "COVID Notes": "Notes related to COVID",
    "COVID Status": "Status related to COVID",
    "Created By ID": "Unique identifier of the user who created the opportunity",
    "Created by Lead Conversion": "Indicates if the opportunity was created by lead conversion",
    "Created Date": "Date and time when the opportunity was created",
    "Date Opportunity was Closed": "Date when the opportunity was closed",
    "Deleted": "Indicates if the opportunity is deleted",
    "DOR Distributor": "Distributor associated with DOR",
    "DOR Expiration": "Expiration date of DOR distributor",
    "Final Quote": "Indicates if a final quote is available for the opportunity",
    "Fiscal Period": "Fiscal period associated with the opportunity",
    "Fiscal Quarter": "Fiscal quarter associated with the opportunity",
    "Fiscal Year": "Fiscal year associated with the opportunity",
    "Forecast Category": "Forecast category for the opportunity",
    "Forecast Category1": "Another forecast category for the opportunity",
    "Forecast Q Commit": "Forecast commitment for the quarter",
    "Forecast Q Prior Commit": "Prior forecast commitment for the quarter",
    "Funding Source": "Source of funding for the opportunity",
    "Has Line Item": "Indicates if the opportunity has a line item",
    "Has Open Activity": "Indicates if there is an open activity related to the opportunity",
    "Has Overdue Task": "Indicates if there is an overdue task related to the opportunity",
    "Industry": "Industry associated with the opportunity",
    "Install This Quarter": "Indicates if the opportunity is scheduled for installation this quarter",
    "Interface Type": "Type of interface for the opportunity",
    "Internal Forecast": "Indicates if the forecast is internal",
    "Last Activity": "Date of the last activity related to the opportunity",
    "Last Modified By ID": "Unique identifier of the user who last modified the opportunity",
    "Last Modified Date": "Date and time when the opportunity was last modified",
    "Last Referenced Date": "Date when the opportunity was last referenced",
    "Last Stage Change Date": "Date of the last stage change for the opportunity",
    "Last Stage Change Date1": "Another date of the last stage change for the opportunity",
    "Last Viewed Date": "Date when the opportunity was last viewed",
    "LDO": "LDO associated with the opportunity",
    "LDO Priority Level": "Priority level of the LDO",
    "Lead Application": "Application associated with the lead",
    "Lead Source": "Source through which the lead was acquired",
    "LS Other Research Area": "Other research area in life sciences",
    "LS Research Area": "Research area in life sciences",
    "Mass Spec Manufacturer": "Manufacturer of the mass spectrometer",
    "Mass Spec Type": "Type of mass spectrometer",
    "Media Provider": "Provider of the media",
    "Opportunity ID": "Unique identifier for each opportunity",
    "Opportunity Type": "Type or category of the opportunity",
    "Order Finalized": "Indicates if the order is finalized",
    "Other Closed Lost Details": "Additional details for closed lost opportunities",
    "Other Mass Spec Type": "Other type of mass spectrometer",
    "Other Research Area": "Other research area",
    "Owner ID": "Unique identifier of the owner of the opportunity",
    "Price Book ID": "Identifier of the associated price book",
    "Primary Application": "Primary application associated with the opportunity",
    "Primary Application (FF)": "Another primary application associated with the opportunity",
    "Primary Contact": "Primary contact associated with the opportunity",
    "Product Category": "Category of the product",
    "Product of Interest": "Product of interest for the opportunity",
    "Purchase Agent": "Agent responsible for the purchase of the opportunity",
    "Quote ID": "Unique identifier for the associated quote",
    "Record Type ID": "Unique identifier for the record type of the opportunity",
    "Registered Vendor (confirmed)": "Indicates if the vendor is confirmed as registered",
    "Secondary Application (FF)": "Another secondary application associated with the opportunity",
    "Ship This Quarter": "Indicates if the opportunity is scheduled to ship this quarter",
    "Ship This Quarter List": "List of opportunities scheduled to ship this quarter",
    "Signing Authority": "Authority responsible for signing the opportunity",
    "Stage": "Current stage of the opportunity",
    "Standard Application": "Standard application associated with the opportunity",
    "System Modstamp": "Date and time of the last system modification for the opportunity",
    "Technical Owner": "Technical owner of the opportunity",
    "Training Date": "Date of training related to the opportunity",
    "Validated Customer Needs": "Indicates if the customer needs are validated",
    "Won": "Indicates if the opportunity is won",
    "# Close Date Extensions": "Number of times the close date has been extended",
    "# Close Date Month Extensions": "Number of months the close date has been extended",
    "Amount": "Amount associated with the opportunity",
    "Days Open": "Number of days the opportunity has been open",
    "Expected Amount": "Expected amount for the opportunity",
    "Probability (%)": "Probability of winning the opportunity",
    "Push Count": "Count of pushes for the opportunity"
}

for column, description in column_descriptions.items():
    spark.sql(f"ALTER TABLE crm_bronze.dlt_opportunityTable CHANGE COLUMN `{column}` COMMENT '{description}'")

In [0]:
# Add Opportunity_Product.csv to df3
df3 = spark.read.format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .load(f'{files}/Opportunity_Product.csv')

display(df3)

Databricks data profile. Run in Databricks to view.

In [0]:
# Create a Delta Live Table from the DataFrame
df3.write.format('delta') \
    .option('mergeSchema', 'true') \
    .option('delta.columnMapping.mode', 'name') \
    .mode('overwrite') \
    .saveAsTable('crm_bronze.dlt_opportunityProduct')

In [0]:
%sql select * from crm_bronze.dlt_opportunityProduct limit 100;

Databricks data profile. Run in Databricks to view.

In [0]:
# Add column descriptions to dlt_opportunityProduct
column_descriptions = {
    "Created By ID": "Unique identifier of the user who created the record",
    "Created Date": "Date and time when the record was created",
    "Date": "Date associated with the record",
    "Deleted": "Indicates if the record is deleted",
    "Last Modified By ID": "Unique identifier of the user who last modified the record",
    "Last Modified Date": "Date and time when the record was last modified",
    "Last Referenced Date": "Date when the record was last referenced",
    "Last Viewed Date": "Date when the record was last viewed",
    "Line Description": "Description associated with the line item",
    "Line Item ID": "Unique identifier for each line item",
    "Opportunity ID": "Unique identifier for each opportunity",
    "Product Name": "Name of the product associated with the line item",
    "Price Book Entry ID": "Identifier of the associated price book entry",
    "Product Code": "Code associated with the product",
    "Product ID": "Unique identifier for each product",
    "System Modstamp": "Date and time of the last system modification for the record",
    "Discount": "Discount applied to the line item",
    "List Price": "List price of the product",
    "Quantity": "Quantity of the product in the line item",
    "Sales Price": "Price at which the product is sold",
    "Sort Order": "Order in which the line item is sorted",
    "Subtotal": "Subtotal amount for the line item",
    "Total Price": "Total price of the line item"
}

for column, description in column_descriptions.items():
    spark.sql(f"ALTER TABLE crm_bronze.dlt_opportunityProduct CHANGE COLUMN `{column}` COMMENT '{description}'")

In [0]:
# Add User_Table.csv to df4
df4 = spark.read.format('csv') \
    .option('header', 'true') \
    .option('inferSchema', 'true') \
    .load(f'{files}/User_Table.csv')

display(df4)

Databricks data profile. Run in Databricks to view.

In [0]:
# Create a Delta Live Table from the DataFrame
df4.write.format('delta') \
    .option('mergeSchema', 'true') \
    .option('delta.columnMapping.mode', 'name') \
    .mode('overwrite') \
    .saveAsTable('crm_bronze.dlt_userTable')

In [0]:
%sql select * from crm_bronze.dlt_userTable limit 100;

Databricks data profile. Run in Databricks to view.

In [0]:
%python
# Add column descriptions to dlt_userTable
column_descriptions = {
    "Active": "Indicates whether the user is active",
    "Activity Reminders Popup": "Enables activity reminders popup for the user",
    "Admin Info Emails": "Receives admin information emails",
    "Allow Forecasting": "Permission to use forecasting",
    "Apex Pages Developer Mode": "Developer mode for Apex pages",
    "Auto Bcc": "Automatically includes the user in Bcc",
    "Auto Bcc Stay In Touch": "Auto Bcc for Stay In Touch emails",
    "Auto-login To Call Center": "Automatically logs in to the Call Center",
    "Cache Diagnostics": "Diagnostics for cache",
    "Call Center ID": "ID of the associated Call Center",
    "Can View Not Assigned Prospects": "Can view prospects not assigned to them",
    "Chatter Answers User": "Chatter Answers user",
    "Chatter Email Highlights Frequency": "Frequency of Chatter email highlights",
    "City": "City of the user",
    "Country": "Country of the user",
    "Create LEX Apps WT Shown": "Work together shown in Lightning Experience Apps",
    "Created By ID": "Unique identifier of the user who created the record",
    "Created Date": "Date and time when the record was created",
    "Default Notification Frequency when Joining Groups": "Default notification frequency when joining groups",
    "Delegated Approver ID": "ID of the delegated approver",
    "Department": "Department of the user",
    "Dis Comment After Like Email": "Disable comment after like email notification",
    "Dis Mentions Comment Email": "Disable mentions comment email notification",
    "Dis Prof Post Comment Email": "Disable profile post comment email notification",
    "Disable All Feeds Email": "Disable all feeds email notification",
    "Disable Bookmark Email": "Disable bookmark email notification",
    "Disable Change Comment Email": "Disable change comment email notification",
    "Disable Endorsement Email": "Disable endorsement email notification",
    "Disable File Share Notifications For Api": "Disable file share notifications for API",
    "Disable Followers Email": "Disable followers email notification",
    "Disable Later Comment Email": "Disable later comment email notification",
    "Disable Like Email": "Disable like email notification",
    "Disable Mentions Post Email": "Disable mentions post email notification",
    "Disable Message Email": "Disable message email notification",
    "Disable Profile Post Email": "Disable profile post email notification",
    "Disable Share Post Email": "Disable share post email notification",
    "Enable Auto Sub For Feeds": "Enable auto subscribe for feeds",
    "Event Reminders Checkbox Default": "Default for event reminders checkbox",
    "Exclude Mail App Attachments": "Exclude mail app attachments",
    "Extension": "Extension of the user",
    "Favorites Show Top Favorites": "Show top favorites in favorites",
    "Favorites WT Shown": "Work together shown in favorites",
    "Flow User": "User with flow access",
    "Full Name": "Full name of the user",
    "Geocode Accuracy": "Accuracy of the geocode",
    "Global Nav Bar WT Shown": "Work together shown in global navigation bar",
    "Global Nav Grid Menu WT Shown": "Work together shown in global navigation grid menu",
    "Has Celebration Badge": "Indicates whether the user has a celebration badge",
    "Has Profile Photo": "Indicates whether the user has a profile photo",
    "Hide Bigger Photo Callout": "Hide bigger photo callout",
    "Hide Browse Product Redirect Confirmation": "Hide browse product redirect confirmation",
    "Hide Chatter Onboarding Splash": "Hide Chatter onboarding splash",
    "Hide CSN Desktop Task": "Hide CSN desktop task",
    "Hide CSN Get Chatter Mobile Task": "Hide CSN get Chatter mobile task",
    "Hide End User Onboarding Assistant Modal": "Hide end user onboarding assistant modal",
    "Hide Invoices Redirect Confirmation": "Hide invoices redirect confirmation",
    "Hide Lightning Migration Modal": "Hide Lightning migration modal",
    "Hide Online Sales App Welcome Mat": "Hide online sales app welcome mat",
    "Hide Second Chatter Onboarding Splash": "Hide second Chatter onboarding splash",
    "Hide Sfx Welcome Mat": "Hide Sfx welcome mat",
    "Hide Statements Redirect Confirmation": "Hide statements redirect confirmation",
    "HideS1BrowserUI": "Hide S1 browser UI",
    "Individual ID": "Unique identifier of the individual",
    "Info Emails": "Receives informational emails",
    "Is Approver": "Indicates whether the user is an approver",
    "Language": "Language preference of the user",
    "Last Login": "Date and time of the last login",
    "Last Modified By ID": "Unique identifier of the user who last modified the record",
    "Last Modified Date": "Date and time when the record was last modified",
    "Last Name": "Last name of the user",
    "Last Referenced Date": "Date when the record was last referenced",
    "Last Viewed Date": "Date when the record was last viewed",
    "Lightning Experience Preferred": "Indicates whether Lightning Experience is preferred",
    "Locale": "Locale preference of the user",
    "Manage Escalated Assets": "Manages escalated assets",
    "Manager ID": "Unique identifier of the users manager",
    "Marketing User": "Indicates whether the user is a marketing user",
    "Native Email Client": "Uses the native email client",
    "NetSuite User ID": "User ID in NetSuite",
    "New Lightning Report Run Page Enabled": "Enable new Lightning report run page",
    "Nickname": "Nickname of the user",
    "Offline Edition Trial Expiration Date": "Expiration date of the offline edition trial",
    "Offline User": "Indicates whether the user is an offline user",
    "Out of office message": "Out of office message",
    "Pardot Api Key": "API key for Pardot",
    "Pardot Api Version": "API version for Pardot",
    "Pardot User Id": "User ID in Pardot",
    "Pardot User Key": "User key in Pardot",
    "Path Assistant Collapsed": "Path assistant collapsed",
    "Preview Custom Theme": "Preview custom theme",
    "Preview Lightning": "Preview Lightning",
    "Profile ID": "Unique identifier of the users profile",
    "Receive No Notifications As Approver": "Does not receive notifications as an approver",
    "Receive Notifications As Delegated Approver": "Receives notifications as a delegated approver",
    "Record Home Reserved WT Shown": "Work together shown in record home reserved",
    "Record Home Section Collapse WT Shown": "Work together shown in record home section collapse",
    "Reminder Sound Off": "Sound for reminders"
}

for column, description in column_descriptions.items():
    try:
        spark.sql(f"ALTER TABLE crm_bronze.dlt_userTable CHANGE COLUMN `{column}` COMMENT '{description}'")
    except Exception as e:
        print(f"Error with column {column}: {e}")