# Materialize query results on Synapse Serverless SQL using CETAS

This notebook shows how to materialize query results in Synapse Serverless SQL using a straightforward pattern based on CREATE EXTERNAL TABLE AS SELECT (CETAS).

Each time the notebook is executed:
- The previous materialized external table (if it exists) is dropped.
- The corresponding data files in Azure Data Lake Storage are deleted.
- A new external table is created using CETAS based on a specified query.
- After successful creation, column-level statistics are automatically generated using `CREATE STATISTICS` for all columns in the new table.

This approach can be useful for materializing complex queries into reusable external tables.

> ⚠️ **Disclaimer:** Although this notebook was developed by a Microsoft employee, it is not an officially supported solution. Customers should review and test the code thoroughly before using it in production scenarios.


## 🔧 Configuration instructions

Before running this notebook, please review and update the configuration values and SQL query below:

1. `external_table`
- **What it is**: The name of the external table to be created.
- **Example**: `"Sales_Materialized"`

---

2. `workspace_name`
- **What it is**: The name of your Synapse workspace (without the `-ondemand.sql.azuresynapse.net` suffix).
- **Example**: `"myworkspace"`

---

3. `database_name`
- **What it is**: The name of the SQL database where the external table will be created.
- **Example**: `"reportingdb"`

---

4. `materialized_table_output_path`
- **What it is**: The ADLS Gen2 path (ABFSS URI) where CETAS will write the external table files.
- ⚠️ **Warning**: This folder will be fully deleted each time the notebook runs.
- **Example**: `"abfss://tables@<storageaccount>.dfs.core.windows.net/folder/materialized_table/"`

---

5. CETAS SQL (`cetas_sql`)
- **What it is**: Customize the `SELECT` statement and source path in the `OPENROWSET` clause.
- **Example**: 
```sql
  WITH (
  LOCATION = '/folder/',
  DATA_SOURCE = [data_source_name],
  FILE_FORMAT = [file_format_name]
  )
AS
  SELECT * FROM OPENROWSET(
      BULK 'https://<storageaccount>.dfs.core.windows.net/<container>/source/*.parquet',
      FORMAT = 'PARQUET'
  ) AS [result]
```


In [None]:
from notebookutils import mssparkutils
# -------------------------
# CONFIGURATION
# -------------------------
CONFIG = {
    "external_table": "",           # e.g., 'Sales_Materialized'
    "workspace_name": "",          # e.g., 'myworkspace'
    "database_name": "",               # e.g., 'reportingdb'

    # ADLS path where CETAS will write the output files.
    # ⚠️⚠️⚠️ IMPORTANT: All existing contents in this folder will be deleted before recreating the external table    
    "materialized_table_output_path": "abfss://tables@storageaccount.dfs.core.windows.net/folder/materialized_table/"
}

cetas_sql = f"""
CREATE EXTERNAL TABLE {CONFIG['external_table']}
    WITH (
    LOCATION = '/folder_name/',
    DATA_SOURCE = [data_source_name],
    FILE_FORMAT = [file_format_name]
    )
AS
    SELECT
       *
    FROM ...
"""


## Step 1: Define Utility Functions
These functions will handle JDBC connection, table operations, and file deletion in ADLS.


In [None]:
def get_jdbc_connection(config):
    """Establish a JDBC connection to Synapse Serverless using AAD token."""
    token = mssparkutils.credentials.getToken("DW")
    jdbc_url = (
        f"jdbc:sqlserver://{config['workspace_name']}-ondemand.sql.azuresynapse.net:1433;"
        f"database={config['database_name']};"
        "encrypt=true;"
        "trustServerCertificate=false;"
        "hostNameInCertificate=*.sql.azuresynapse.net;"
        "loginTimeout=30;"
    )
    props = spark._sc._jvm.java.util.Properties()
    props.setProperty("accessToken", token)
    props.setProperty("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver")
    return spark._sc._jvm.java.sql.DriverManager.getConnection(jdbc_url, props)


def table_exists(stmt, table_name):
    """Check whether the external table exists in Synapse metadata."""
    query = f"""
        SELECT 1
        FROM sys.external_tables
        WHERE name = '{table_name.split('.')[-1]}'
    """
    rs = stmt.executeQuery(query)
    return rs.next()


def drop_external_table(stmt, table_name):
    """Drop the specified external table."""
    stmt.execute(f"DROP EXTERNAL TABLE {table_name}")
    print(f"✅ Dropped external table `{table_name}`.")


def delete_adls_path(path):
    """Delete the contents of the given ADLS path."""
    if mssparkutils.fs.rm(path, recurse=True):
        print(f"✅ Deleted ADLS path: {path}")
    else:
        print(f"⚠️ Path not found or already deleted: {path}")


def create_external_table(stmt, config):
    """Create a new external table using CETAS with the specified configuration."""
    location_folder = config['materialized_table_output_path'].rstrip('/').split('/')[-1] 
    stmt.execute(cetas_sql)
    print(f"✅ Created external table `{config['external_table']}`.")

def create_statistics_for_external_table(config):
    """Generate and execute CREATE STATISTICS statements for each column of the external table."""
    conn = None
    stmt = None
    try:
        conn = get_jdbc_connection(config)
        stmt = conn.createStatement()

        # Step 1: Generate the CREATE STATISTICS statements
        table_name = config["external_table"].split('.')[-1]
        stats_sql = f"""
        SELECT
           'CREATE STATISTICS [' + 'Stats_' + c.name + '] ON [' + schema_name(o.schema_id) + '].[' + object_name(o.object_id) + '] ([' + c.name + ']) WITH FULLSCAN;' AS cmd_create
        FROM sys.objects AS o
        INNER JOIN sys.columns AS c 
           ON o.object_id = c.object_id
        LEFT JOIN sys.stats_columns AS sc 
           ON sc.object_id = c.object_id AND sc.column_id = c.column_id
        LEFT JOIN sys.stats AS s
           ON s.object_id = sc.object_id AND s.stats_id = sc.stats_id
        WHERE o.name = '{table_name}'
        """

        rs = stmt.executeQuery(stats_sql)
        commands = []

        while rs.next():
            commands.append(rs.getString(1))

        if not commands:
            print(f"ℹ️ No statistics to create for table `{table_name}`.")
            return

        # Step 2: Execute each CREATE STATISTICS command
        print(f"📊 Creating statistics for {len(commands)} columns in `{table_name}`...")
        for cmd in commands:
            try:
                stmt.execute(cmd)
                print(f"✅ Executed: {cmd}")
            except Exception as e:
                print(f"❌ Failed: {cmd}\n   Reason: {e}")

    except Exception as e:
        print(f"❌ Error while generating statistics: {e}")
        raise
    finally:
        if stmt:
            stmt.close()
        if conn:
            conn.close()
        print("🔚 Statistics creation step completed.")



## Step 2: Main Logic Function
This function orchestrates all the steps: drop table, delete files, and recreate the table.


In [None]:
def refresh_external_table(config):
    """Main orchestration function to refresh the external table using CETAS and optionally create column statistics."""
    conn = None
    stmt = None
    table_created = False  # track if CETAS was successful

    try:
        # Step 1: Connect and drop table if it exists
        conn = get_jdbc_connection(config)
        stmt = conn.createStatement()

        if table_exists(stmt, config["external_table"]):
            drop_external_table(stmt, config["external_table"])
        else:
            print(f"ℹ️ Table `{config['external_table']}` does not exist. Skipping DROP.")

        # Step 2: Remove previous output files from ADLS
        delete_adls_path(config["materialized_table_output_path"])

        # Step 3: Recreate external table using CETAS
        create_external_table(stmt, config)
        table_created = True  # only set this if no exception was raised

    except Exception as e:
        print(f"❌ Error during external table refresh: {e}")
        raise

    finally:
        if stmt:
            stmt.close()
        if conn:
            conn.close()
        print("✅ External table refresh step completed.\n")

    # Step 4: Create statistics only if CETAS succeeded
    if table_created:
        create_statistics_for_external_table(config)
    else:
        print("⚠️ Skipping statistics creation because the external table was not created.")


## Step 3: Execute
Run the refresh function to apply all changes.


In [None]:
refresh_external_table(CONFIG)