# POC - Sharepoint Lakeflow Connect Ingestion

The goal is to document all the POC and step-by-step guideline for the project 2749.

We are testing cuple of new Databricks features:
- **Read excel with spark**
- **Lakeflow Sharepoint connector**

Here's an example that the feature to read excel directly from spark (in this case reading from Volume):
```
df = (spark.read
        .format("excel")
        .option("headerRows", 1)                   # optional
        .load(f"/Volumes/uc_raw_d/default/budget_forecast_commerce_pl_volume_poc/XX01-Dummy Brand Ltd/Finance/Forecast/XX Forecast.xlsx"))

display(df)
```

And here is an example including and using using the sharepoint connector feature:
```
df = (spark.read
        .format("excel")
        .option("databricks.connection", "sharepoint_connection_dev")
        .option("headerRows", 1)                    # optional
        .load("https://hablandodedata.sharepoint.com/sites/Budget_Forecast_Commerce_PandL/Shared%20Documents/XX01-Dummy%20Brand%20Ltd/Finance/Forecast/XX%20Forecast.xlsx"))
        
display(df)
```

The POC notebook is divided in 3 parts, 3 different ways of ingesting the data, which are the following:
- (Micro)-Batch processing with COPY INTO
  - Autoloader + availableNow
- Near real-time (streaming) processing with autoloader
- DLT with delta live tables

Also each of these ingestion parts is also testing the same use cases and reported, which are the following:
- Deletion of the excel file
- Rename of excel file
- Schema evolution (new column)
- Column value modified
- Column data type change
- New registry added (new rows)
- DQ checks

## Conclusion

```

And the write as Delta table with `overwrite` to handle schema evolution, and scheduled within Jobs and jobs compute to optimize the costs (and possibility of handling micro-batching). **This is the ideal technical solution to implement the ingestion of the sharepoint excel files.**

2. Another recommendation would be to have a 2-step ingestion, following the same steps as above, but to write the delta output (this is the key difference) table with a timestamp, and then have an autoloader (or can even already explore DLT), and this can even run in parallel with point `1`.

3. Other recommendations would rely on more chnages from business side, for example swapping to CSV format instead of Excel in order to handle incremental (not single file) and schema evolution, but also would need to write the CSV files with timestamp (uniqueness).



## Micro-batch Processing (COPY INTO)

In [0]:
%sql

CREATE TABLE IF NOT EXISTS uc_curated_d.default.sharepoint_excel_table_budget
  -- There's a column 'Local Currency' with a space, not allowed by default.
  -- Can we fix it? Show business change/check this?
  -- For now is fixed by enabling column mapping.
  TBLPROPERTIES ('delta.columnMapping.mode' = 'name');


COPY INTO uc_curated_d.default.sharepoint_excel_table_budget
  FROM "https://perfettivanmelle.sharepoint.com/sites/Budget_Forecast_Commerce_PandL/Shared%20Documents/XX01-Dummy%20Brand%20Ltd/Finance/Budgets/Act_Budget_Forecast_input_file_XX01_B2025.xlsx"
  FILEFORMAT = EXCEL
  FORMAT_OPTIONS ('databricks.connection' = 'sharepoint_connection_dev', 'headerRows' = '1')
  COPY_OPTIONS ('mergeSchema' = 'true');

--SELECT * FROM uc_curated_d.default.sharepoint_excel_table_budget;


## Micro-batch Processing (Autoloader + availableNow)

In [0]:
%sql

CREATE TABLE IF NOT EXISTS uc_curated_d.default.sharepoint_excel_table_forecast
  -- There's a column 'Local Currency' with a space, not allowed by default.
  -- Can we fix it? Show business change/check this?
  -- For now is fixed by enabling column mapping.
  TBLPROPERTIES ('delta.columnMapping.mode' = 'name');

In [0]:
# Auto Loader is a streaming ingestion mechanism (Structured Streaming).
# But it can also be treated as a micro-batch streaming solution.

# First to create the stream instance by reading the excel data from sharepoint.
df = (spark.readStream.format("cloudFiles")    
    .option("cloudFiles.format", "excel")
    .option("databricks.connection", "sharepoint_connection_dev")
    .option("inferColumnTypes", True) # optional
    .option("headerRows", 1)
    .option("cloudFiles.schemaLocation", "/Volumes/uc_raw_d/default/budget_forecast_commerce_pl_volume_poc/XX01-Dummy Brand Ltd/autoloader_meta/sharepoint_excel_table_forecast/schema/")
    .option("cloudFiles.schemaEvolutionMode", "none")
    #.option("cloudFiles.schemaEvolutionMode", "addNewColumns") # Not supported for excel format
    .option("cloudFiles.allowOverwrites", "true")
    .load(f"https://perfettivanmelle.sharepoint.com/sites/Budget_Forecast_Commerce_PandL/Shared%20Documents/XX01-Dummy%20Brand%20Ltd/Finance/Forecast/XX%20Forecast.xlsx")
)

# display(df)
# df = df.withColumnRenamed("Local Currency", "Local_Currency")

# Second by writing the stream to delta table in the target location.
# The availableNow parameter processes all files that arrived before the query start time, then stops.
df.writeStream.format("delta").option("mergeSchema", "true").option("checkpointLocation", "/Volumes/uc_raw_d/default/budget_forecast_commerce_pl_volume_poc/XX01-Dummy Brand Ltd/autoloader_meta/sharepoint_excel_table_forecast/checkpoint/").trigger(availableNow=True).toTable("uc_curated_d.default.sharepoint_excel_table_forecast")

## Streaming batch (autoloader)

In [0]:
%sql

CREATE TABLE IF NOT EXISTS uc_curated_d.default.sharepoint_excel_table_actual
  -- There's a column 'Local Currency' with a space, not allowed by default.
  -- Can we fix it? Show business change/check this?
  -- For now is fixed by enabling column mapping.
  TBLPROPERTIES ('delta.columnMapping.mode' = 'name');

In [0]:
# First to create the stream instance by reading the excel data from sharepoint
df = (spark.readStream.format("cloudFiles")    
    .option("cloudFiles.format", "excel")
    .option("databricks.connection", "sharepoint_connection_dev")
    .option("inferColumnTypes", True)
    .option("headerRows", 1)
    .option("cloudFiles.schemaLocation", "/Volumes/uc_raw_d/default/budget_forecast_commerce_pl_volume_poc/XX01-Dummy Brand Ltd/autoloader_meta/sharepoint_excel_table_actual/schema/")
    .option("cloudFiles.schemaEvolutionMode", "none")
    .load("https://perfettivanmelle.sharepoint.com/sites/Budget_Forecast_Commerce_PandL/Shared%20Documents/XX01-Dummy%20Brand%20Ltd/Finance/Historical%20data%20(Actuals)/Act_Budget_Forecast_input_file_XX01_A2024_20251022.xlsx")
)

# display(df)
# df = df.withColumnRenamed("Local Currency", "Local_Currency")

# Second by writing the stream to delta table in the target location
df.writeStream.format("delta").option("mergeSchema", "true").option("checkpointLocation", "/Volumes/uc_raw_d/default/budget_forecast_commerce_pl_volume_poc/XX01-Dummy Brand Ltd/autoloader_meta/sharepoint_excel_table_actual/checkpoint/").table("uc_curated_d.default.sharepoint_excel_table_actual")


## DLT + sharepoint connector feature

As mentioned in the [documentation](https://docs.databricks.com/aws/en/ingestion/sharepoint#ingest-sharepoint-files-in-lakeflow-spark-declarative-pipelines), there will be a possbility (guessing once it gets GA) to use DLT with Lakeflow for the ingestion of sharepoint files, but currenlty is not possible.

**Note:**
SharePoint Connector requires Databricks Runtime 17.3 or above. This is not available in Lakeflow Spark Declarative Pipelines release yet. To see the Databricks Runtime versions used with a Lakeflow Spark Declarative Pipelines release, see the release notes for that release.

In [0]:
from pyspark import pipelines as dp

# Solution would look something like.
# Read a specific Excel file from SharePoint in a materialized view

@dp.table
def sharepoint_excel_table_budget():
  return (spark.read.format("excel")
    .option("databricks.connection", "sharepoint_connection_dev")
    .option("headerRows", 1)                   # optional
    #.option("inferColumnTypes", True)            # optional
    #.option("dataAddress", "'Sheet1'!A1:M20")  # optional
    .load("https://perfettivanmelle.sharepoint.com/sites/Budget_Forecast_Commerce_PandL/Shared%20Documents/XX01-Dummy%20Brand%20Ltd/Finance/Budgets/Act_Budget_Forecast_input_file_XX01_B2025.xlsx"))



## Extended - Data Quality and Production Proof

One of the advantages of ingesting data through databricks, with any of the ingestion methods like COPY INTO, Autoloader, DLT, batch processing, is the capabality to also apply Data Quality rules (aka also known as 'expectations') to guarantee a certain level of maturity and quality from the data object.

In [0]:
# ideally from a requiremenets.txt file by default or with uv
#%pip install databricks-labs-dqx==0.8.0
#dbutils.library.restartPython()

In [0]:
# import libraries
import uuid
import yaml
from pyspark.sql.functions import col, current_timestamp
from datetime import datetime
from databricks.labs.dqx.engine import DQEngine
from databricks.sdk import WorkspaceClient

In [0]:
# Set variables
connector_name = "sharepoint_connection_dev"
URL = "https://perfettivanmelle.sharepoint.com/sites/Budget_Forecast_Commerce_PandL/Shared%20Documents/XX01-Dummy%20Brand%20Ltd/Finance/Budgets/Act_Budget_Forecast_input_file_XX01_B2025.xlsx"
STATE_TABLE = "uc_curated_d.default.sharepoint_file_state"
TARGET_TABLE = "uc_curated_d.default.sharepoint_excel_table_budget"
QUARANTINE_TABLE = f"{TARGET_TABLE}_quarantined"
RULES_PATH = "budget_rules.yml"  # adjust

In [0]:
# THIS IS A ONE TIME ACTION, to create a state/logging table

spark.sql("""
CREATE TABLE IF NOT EXISTS uc_curated_d.default.sharepoint_file_state (
  source_url STRING,
  last_modified_ts TIMESTAMP,
  last_length_bytes BIGINT
)
USING DELTA
""")

# Collect current metadata of one excel file

meta = (
  spark.read.format("binaryFile")
    .option("databricks.connection", connector_name)
    .load(URL)
    .select(
      col("path").alias("source_url"),
      col("modificationTime").alias("last_modified_ts"),
      col("length").alias("last_length_bytes")
    )
)

# Insert current metadata of one excel file
#meta.write.format("delta").mode("overwrite").saveAsTable("uc_curated_d.default.sharepoint_file_state")
meta.write.format("delta").mode("append").saveAsTable("uc_curated_d.default.sharepoint_file_state")

In [0]:
%sql
select * from uc_curated_d.default.sharepoint_file_state

source_url,last_modified_ts,last_length_bytes
https://perfettivanmelle.sharepoint.com/sites/Budget_Forecast_Commerce_PandL/Shared%20Documents/XX01-Dummy%20Brand%20Ltd/Finance/Budgets/Act_Budget_Forecast_input_file_XX01_B2025.xlsx,2026-01-29T14:31:35.000Z,141497


In [0]:
# Load helper functions -> in the pvm_commons repo

def get_sharepoint_file_metadata(url: str, connection: str):
    # IMPORTANT: column names match the state table
    return (
        spark.read.format("binaryFile")
          .option("databricks.connection", connection)
          .load(url)
          .select(
              col("path").alias("source_url"),
              col("modificationTime").alias("last_modified_ts"),
              col("length").alias("last_length_bytes")
          )
    )

def file_has_changed(url: str, connection: str, state_table: str) -> bool:
    meta = get_sharepoint_file_metadata(url, connection).collect()[0]

    prev = (
        spark.table(state_table)
          .filter(col("source_url") == url)
          .select("last_modified_ts", "last_length_bytes")
          .limit(1)
          .collect()
    )

    if not prev:
        return True  # no state yet â†’ treat as changed

    return not (
        prev[0]["last_modified_ts"] == meta["last_modified_ts"] and
        prev[0]["last_length_bytes"] == meta["last_length_bytes"]
    )

def update_file_state(url: str, connection: str, state_table: str):
    meta = get_sharepoint_file_metadata(url, connection)
    meta.createOrReplaceTempView("new_state")

    spark.sql(f"""
    MERGE INTO {state_table} t
    USING new_state s
    ON t.source_url = s.source_url
    WHEN MATCHED THEN UPDATE SET
      t.last_modified_ts  = s.last_modified_ts,
      t.last_length_bytes = s.last_length_bytes
    WHEN NOT MATCHED THEN INSERT (
      source_url, last_modified_ts, last_length_bytes
    ) VALUES (
      s.source_url, s.last_modified_ts, s.last_length_bytes
    )
    """)


In [0]:
# 1. Check if file was modified
if not file_has_changed(URL, connector_name, STATE_TABLE):
    dbutils.notebook.exit("SKIP: file not modified")

# 2. Full load (only happens if changed)
df = (
  spark.read.format("excel")
    .option("databricks.connection", connector_name)
    .option("headerRows", "1")
    .option("inferColumnTypes", "true")
    .load(URL)
)

# 3) Load checks from YAML
with open(RULES_PATH, "r") as f:
    checks = yaml.safe_load(f)

# 4) Inject allowed legal entities from reference table
allowed_entities = [
    r["entity_id"]
    for r in spark.table("uc_hive_metastore_d.curated.gl_sherpa_legal_entity") # to adjust
                 .select("entity_id").distinct().collect()
]

for c in checks:
    if c.get("name") == "legal_entity_allowed":
        c["check"]["arguments"]["allowed"] = allowed_entities

# 5) Validate checks (optional but recommended)
status = DQEngine.validate_checks(checks)
if getattr(status, "has_errors", False):
    raise ValueError(f"DQX checks validation failed: {status}")

# 6) Apply checks + split into valid/quarantined
dq_engine = DQEngine(WorkspaceClient())
valid_df, quarantined_df = dq_engine.apply_checks_by_metadata_and_split(df, checks)

# 7) Write results (target and quarantine tables)
valid_df.write.format("delta").mode("overwrite").option("overwriteSchema", "true").saveAsTable(TARGET_TABLE)

# TODO
# .withColumnRenamed("Local Currency", "Local_Currency") should be adjusted
quarantined_df.withColumn("_quarantine_ts", current_timestamp()).withColumnRenamed("Local Currency", "Local_Currency").write.format("delta").mode("append").option("overwriteSchema", "true").saveAsTable(QUARANTINE_TABLE)

# 8) Update state table
update_file_state(URL, connector_name, STATE_TABLE)


In [0]:
display(quarantined_df)

scenario,year,period,legal_entity,profit_center,country,material,sales_org,dist_channel,division,plant,customer,plan_customer,sales_order,promo_ind,ship_to_party,document_type,combo,sales_volume_uom,sales_volume_3rd_party,sales_volume_icy,sales_volume_KG_3rd_party,unit_sales_3rd_party,Local Currency,eur_currency,gross_sales_finished_products,gross_sales_gumbase_and_flavors,gross_sales_raw_pack_and_semifinished,gross_sales_other,gpr_base_allow_invoice,gpr_base_allow_retro,gpr_acr_base_allowances,list_price_adjustments,gpr_base_allow_pln_invoice,gpr_acr_base_allow_pln,trr_grth_bonus_invoice,trr_grth_bonus_retro,trr_acr_grth_bonus,trr_acr_grth_bonus_plan,cts_pay_terms_invoice,cts_pay_terms_retro,cts_acr_pay_terms,cts_logistics_invoice,cts_logistics_retro,cts_acr_logistics,cts_clearance_invoice,cts_clearance_retro,cts_acr_clearance,cts_cooperation_invoice,cts_cooperation_retro,cts_acr_cooperation,cts_intl_agmt_invoice,cts_intl_agmt_retro,cts_acr_intl_agmt,cts_plan_invoice,cts_acr_plan,ce_asst_and_dist_invoice,ce_asst_and_dist_retro,ce_acr_asst_and_dist,ce_lists_intro_invoice,ce_lists_intro_retro,ce_acr_lists_intro,ce_vis_and_plmt_invoice,ce_vis_and_plmt_retro,ce_acr_vis_and_plmt,ce_co_marketing_invoice,ce_co_marketing_retro,ce_acr_co_marketing,ce_plan_invoice,ce_plan_retro,ce_acr_plan,pr_promo_invoice,pr_promo_retro,pr_acr_promo,pr_trial_and_lty_invoice,pr_trial_and_lty_retro,pr_acr_trial_and_lty,pr_promo_plan_invoice,pr_acr_promo_plan,pr_free_goods_PVM_invoice,pr_3rd_party_items_retro,pr_promo_plcmt_invoice,pr_acr_promo_plcmt,pr_promo_vis_invoice,pr_promo_vis_retro,pr_acr_promo_vis,np_non_perform_invoice,np_non_perform_retro,np_acr_non_perform,np_acr_non_perform_plan,sales_tax,revenue_services,cogs_sfp_std_cogs,cogs_fp_std_cogs_raw,cogs_fp_std_cogs_pack,cogs_fp_std_cogs_dl,cogs_fp_std_cogs_util,cogs_fp_std_cogs_deprec,cogs_fp_std_cogs_maint,cogs_fp_std_cogs_oth_fact_oh,cogs_fp_std_cogs_ga_oh,cogs_fp_std_cogs_grp_chg,cogs_fp_std_cogs_co_pack,cogs_fp_std_cogs_fp_icy,cogs_fp_std_cogs_fp_log,cogs_fp_std_cogs_duty,cogs_fp_std_cogs_trad_goods,cogs_fp_std_cogs_pvm_ingr,cogs_fptg_net_sales_icy_principal_to_lrd,cogs_rmpmsfp_net_sales_co_manufacturer,cogs_rmpmtg_standard_cogs_1,other_cogs_freight,other_cogs_import_duties,other_cogs_variances,other_cogs_standard_coverage_copacking,other_cogs_variances_actual_vs_standard_coverage_copacking,other_cogs_standard_coverage_royalties_3rd_party,other_cogs_variances_actual_vs_standard_coverage_royalties_3rd_party,other_cogs_standard_coverage_packaging_tax,other_cogs_variances_actual_vs_standard_coverage_packaging_tax,other_cogs_consumption_pm_co_packing,other_cogs_consumption_fp_co_packing,other_cogs_consumption_tg_icy_co_packing,other_cogs_consumption_tg_3rd_party_co_packing,3rd_party_log_cost_ship_to_3rd_party_road,3rd_party_log_cost_ship_to_3rd_party_sea,3rd_party_log_cost_ship_to_3rd_party_intermodal,3rd_party_log_cost_ship_to_3rd_party_other_transport,3rd_party_log_cost_ship_to_3rd_party_customs_doc,3rd_party_log_cost_ship_to_3rd_party_exceptional_log_costs,3rd_party_log_cost_ship_to_3rd_party_intra_oc,3rd_party_outbound_handling_costs_ship_to_3rd_party_vse,variable_costs_sales_force_not_on_payroll,sales_commissions,merchandising,sales_adv_wk_media_spend,sales_adv_wk_media_accr_comm,sales_adv_wk_media_accr_non_comm,sales_adv_non_wk_media_agency_fees_spend,sales_adv_non_wk_media_agency_fees_accr_comm,sales_adv_non_wk_media_agency_fees_accr_non_comm,sales_adv_non_wk_content_spend,sales_adv_non_wk_content_accr_comm,sales_adv_non_wk_content_accr_non_comm,purchases_expensed_visibility_mat_from_3rd_party,co_packing_services_indirect_promotion,purchases_expensed_visibility_design_cost,purchases_expensed_promotional_mat_from_3rd_party,consumer_promotions_services,purchases_expensed_consumer_activation_sampling,consumer_activation_services,brand_adv_wk_media_spend_retail,brand_adv_wk_media_spend_other,brand_adv_wk_media_accr_comm_retail,brand_adv_wk_media_accr_comm_other,brand_adv_wk_media_accr_non_comm_retail,brand_adv_wk_media_accr_non_comm_other,brand_adv_non_wk_media_ag_fees_spend_retail,brand_adv_non_wk_media_ag_fees_spend_other,brand_adv_non_wk_media_ag_fees_accr_comm_retail,brand_adv_non_wk_media_ag_fees_accr_comm_other,brand_adv_non_wk_media_ag_fees_accr_non_comm_retail,brand_adv_non_wk_media_ag_fees_accr_non_comm_other,brand_adv_non_wk_media_sel_media_fee_icy,brand_adv_non_wk_content_spend_retail,brand_adv_non_wk_content_spend_other,brand_adv_non_wk_content_accr_comm_retail,brand_adv_non_wk_content_accr_comm_other,brand_adv_non_wk_content_accr_non_comm_retail,brand_adv_non_wk_content_accr_non_comm_other,research_brand_marketing,research_market_data,research_data_analytics_tools,_errors,_warnings
BUD,2025,2,XX01,1023,DK,1023,DK01,10,10,DK01,1023,,,,,,,,,,1167.59,,DKK,EUR,293558.47,,,,31046.56,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,31495.4,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,,141265.9,,,,,,,,148.12,,,,,,2551.46,,,,,,,,,,3822.13,100.76,,,,,,,,,2072.83,,,228.72,,,,,,,,,,,,,,,,,,,,,,,,,,"List(List(legal_entity_allowed, Value 'XX01' in Column 'legal_entity' is not in the allowed list: [NL03, DY99, IT04, US03, RU01, NL04, PL01, CN02, DE01, PH03, US01, CA01, PH01, MX01, IT06, ES01, AE01, IT02, NL16, FR01, NL05, NL06, NL02, SG01, ID01, IT01, SK01, CZ01, NL12, PL02, IN01, PT01, VN01, NL08, IT03, NL15, IT07, DE02, ZA01, DE04, TR01, HK01, IT08, LU02, MY01, CH01, BD01, NL01, NG01, NP01, JP01, CN03, KR01, GB01, NL07, NL11, LU04, DE03, CH03, IT05, CH02, BR01, NL99, DE05, DK01, BR02, NL09, NL10, LU03, LK01, NL13, NL14, US02, GR01, LU01, IE01, CN01, PH02], List(legal_entity), null, is_in_list, 2026-02-02T13:34:11.311Z, Map()))",


In [0]:
%sql
select * from uc_curated_d.default.sharepoint_file_state

source_url,last_modified_ts,last_length_bytes
https://perfettivanmelle.sharepoint.com/sites/Budget_Forecast_Commerce_PandL/Shared%20Documents/XX01-Dummy%20Brand%20Ltd/Finance/Budgets/Act_Budget_Forecast_input_file_XX01_B2025.xlsx,2026-01-29T17:36:17.000Z,141504
