Skip to content

BUG: cursor.bulkcopy() fails on Fabric Data Warehouse (ActiveDirectoryServicePrincipal) #623

@matheuskknd

Description

@matheuskknd

BUG: cursor.bulkcopy() fails with OS error 64 and BCP subprocess fails with federated auth token error against Microsoft Fabric Data Warehouse (ActiveDirectoryServicePrincipal)

Related Issues

Before opening this, I checked open issues and found #600, which describes a similar cursor.bulkcopy() failure. However, this report is distinct: the target is a Microsoft Fabric Data Warehouse endpoint using ActiveDirectoryServicePrincipal authentication (OAuth/Entra ID), not a traditional SQL Server instance with SQL Login. The errors and root-cause vectors are different, so I'm opening a separate report.


Describe the Bug

Two related bulk-insert paths fail when targeting a Microsoft Fabric Data Warehouse endpoint authenticated with ActiveDirectoryServicePrincipal, while regular SQL execution over the same mssql_python.connect() connection works without any issues.

Failure 1 — cursor.bulkcopy()

cursor.bulkcopy(...) raises a RuntimeError after ~23 s:

RuntimeError: Failed to connect to SQL Server: IO error: O nome da rede especificado não está mais disponível. (os error 64)

OS error 64 is ERROR_NETNAME_DELETED (Windows), indicating the Rust/native internal connection opened by bulkcopy loses the network session. Based on the team's own analysis in #600 (cursor.bulkcopy() opens a separate internal PyCoreConnection), the hypothesis is that this new internal connection cannot complete the federated authentication (Entra ID token exchange) required by the Fabric DW endpoint, which then drops the TCP session after the handshake timeout.

Failure 2 — bcp subprocess (subprocess.run(['bcp', ...]))

The BCP utility — invoked as a subprocess fallback — fails with exit code 1:

SQLState = 37000, NativeError = 24803
Error = [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]
Authentication token is missing in the federated authentication message.

When -G is used, BCP 14.0.3008.27+ and ODBC Driver 18 are documented to support Microsoft Entra authentication for Fabric. However, the specific ActiveDirectoryServicePrincipal (Service Principal) flow with non-interactive credentials (-U client_id -P client_secret) against Fabric Data Warehouse endpoints appears to be either unsupported or inadequately supported in BCP versions 17.0.4045.5 and 2018.186.02.01. The error suggests BCP could not obtain or transmit a valid OAuth token for the Fabric endpoint's federated authentication requirement, despite the ODBC Driver 18 handling this correctly when called directly by mssql-python.

What Works

  • Regular cursor.execute() / cursor.executemany() / cursor.fetchall() via mssql_python.connect()works perfectly despite being very slow for data insertion (it takes minutes for few thousand rows of two VARCHAR(10) columns).
  • The same SQL operations via pyodbc with the identical connection string — works perfectly (almost equally slow).

To Reproduce

Standalone repro script

The script below is fully self-contained. Redact SERVER_PREFIX, USER_NAME and PASSWORD (or supply them via the MSSQL_CONN_STR environment variable) before running.

# -*- coding: utf-8 -*-
"""
Standalone repro for mssql_python + Fabric Data Warehouse connectivity.

Goal:
- Prove that regular SQL execution works with mssql_python.
- Show that bulk operations may fail in the same session context:
  1) cursor.bulkcopy(...)
  2) bcp invoked via subprocess.run
"""

from __future__ import annotations

import csv
import os
import re
import subprocess
import sys
import tempfile
import time
import traceback
from typing import Dict, Iterable, List, Tuple

import mssql_python

SERVER_PREFIX: str = "***"
WAREHOUSE_DB: str = "wr_sample"
USER_NAME: str = "***"
PASSWORD: str = "***"

DEFAULT_CONN_STR = ("Driver=ODBC Driver 18 for SQL Server;"
                    f"Server={SERVER_PREFIX}.datawarehouse.fabric.microsoft.com;"
                    f"Database={WAREHOUSE_DB};"
                    "Encrypt=yes;"
                    "MultiSubnetFailover=no;"
                    "Authentication=ActiveDirectoryServicePrincipal;"
                    "TrustServerCertificate=no;"
                    "ApplicationIntent=ReadWrite;"
                    f"UID={USER_NAME};"
                    f"PWD={PASSWORD};")


def _fix_common_connstr_typos(conn_str: str) -> str:
    """Normalizes common mistakes like UID:xxx/PWD:yyy to UID=xxx/PWD=yyy."""
    fixed = conn_str
    fixed = re.sub(r"(?i)(^|;)UID:", r"\1UID=", fixed)
    fixed = re.sub(r"(?i)(^|;)PWD:", r"\1PWD=", fixed)
    return fixed


def _sanitize_for_mssql_python(conn_str: str) -> str:
    """Remove Driver= from connection string (mssql_python controls it)."""
    return re.sub(r"(?i)(^|;)Driver=[^;]*;?", r"\1", conn_str).rstrip(";")


def _parse_conn_str(conn_str: str) -> Dict[str, str]:
    """Simple ODBC key=value parser sufficient for this repro."""
    params: Dict[str, str] = {}
    for chunk in conn_str.split(";"):
        part = chunk.strip()
        if not part:
            continue
        if "=" not in part:
            continue
        key, value = part.split("=", 1)
        params[key.strip().lower()] = value.strip()
    return params


def _print_header(title: str) -> None:
    print("\n" + "=" * 90)
    print(title)
    print("=" * 90)


def _print_ok(message: str) -> None:
    print(f"[OK] {message}")


def _print_fail(message: str) -> None:
    print(f"[FAIL] {message}")


def _exec_and_fetchall(cursor: mssql_python.Cursor, sql: str) -> List[tuple]:
    cursor.execute(sql)
    if cursor.description is None:
        return []
    return [tuple(row) for row in cursor.fetchall()]


def _run_pure_sql(cursor: mssql_python.Cursor, db: str, table_name: str) -> None:
    _print_header("STEP 1 - Pure SQL through mssql_python")

    statements = [
        f"USE {db}",
        f"DROP TABLE IF EXISTS {table_name}",
        f"CREATE TABLE {table_name} (ID INT)",
        f"INSERT INTO {table_name} VALUES (1)",
        f"SELECT * FROM {table_name}",
        f"SELECT 1 WHERE OBJECT_ID('{table_name}', 'U') IS NOT NULL",
    ]

    for idx, sql in enumerate(statements, start=1):
        print(f"\n[{idx}] SQL: {sql}")
        rows = _exec_and_fetchall(cursor, sql)
        if rows:
            print("    rows:", rows)

    _print_ok("Pure SQL path executed successfully.")


def _run_bulkcopy(cursor: mssql_python.Cursor, table_name: str) -> None:
    _print_header("STEP 2 - cursor.bulkcopy(...) in same connection")

    data: Iterable[Tuple[int]] = [(2,), (3,)]
    t0 = time.perf_counter()
    try:
        result = cursor.bulkcopy(
            table_name,
            data=data,
            batch_size=0,
            timeout=30,
            column_mappings=["ID"],
            keep_identity=False,
            check_constraints=False,
            table_lock=True,
            keep_nulls=True,
            fire_triggers=False,
            use_internal_transaction=True,
        )
        dt = time.perf_counter() - t0
        _print_ok(f"bulkcopy returned in {dt:.3f}s: {result}")
    except Exception as exc:  # noqa: BLE001
        dt = time.perf_counter() - t0
        _print_fail(f"bulkcopy crashed in {dt:.3f}s")
        print(f"    type: {type(exc).__name__}")
        print(f"    msg : {exc}")
        print("    traceback:")
        print(traceback.format_exc())


def _build_bcp_cmd(params: Dict[str, str], table_name: str, csv_path: str) -> List[str]:
    server = params.get("server", "")
    auth = params.get("authentication", "")

    cmd = [
        "bcp", table_name, "in", csv_path,
        "-S", server,
        "-c", "-t,", "-h", "TABLOCK",
    ]

    if auth.lower() in {"sqlpassword", "activedirectoryserviceprincipal"}:
        uid = params.get("uid")
        pwd = params.get("pwd")
        if uid:
            cmd.extend(["-U", uid])
        if pwd:
            cmd.extend(["-P", pwd])
    else:
        cmd.append("-T")

    return cmd


def _run_bcp_subprocess(params: Dict[str, str], table_name: str) -> None:
    _print_header("STEP 3 - subprocess.run(['bcp', ...]) in same target")

    with tempfile.NamedTemporaryFile(
        mode="w", suffix=".csv", encoding="utf-8", newline="", delete=False
    ) as tmp:
        csv_path = tmp.name
        writer = csv.writer(tmp)
        writer.writerow([4])
        writer.writerow([5])

    cmd = _build_bcp_cmd(params, table_name, csv_path)
    print("BCP command:")
    print(" ".join(cmd))

    try:
        completed = subprocess.run(cmd, capture_output=True, text=True, timeout=90)
        print(f"returncode={completed.returncode}")
        print("stdout:")
        print(completed.stdout.strip() or "<empty>")
        print("stderr:")
        print(completed.stderr.strip() or "<empty>")

        if completed.returncode == 0:
            _print_ok("bcp finished successfully.")
        else:
            _print_fail("bcp failed (non-zero exit code).")
    except FileNotFoundError:
        _print_fail("bcp executable not found in PATH.")
    except Exception as exc:  # noqa: BLE001
        _print_fail(f"bcp subprocess crashed: {type(exc).__name__}: {exc}")
        print(traceback.format_exc())
    finally:
        try:
            os.unlink(csv_path)
        except OSError:
            pass


def _run_pyodbc_test(conn_str: str, db: str, table_name: str) -> None:
    """Test the same SQL operations using pyodbc library."""
    _print_header("STEP 4 - Pure SQL through pyodbc (for comparison)")

    try:
        import pyodbc

        _print_ok("pyodbc imported successfully.")

        conn_pyodbc = None
        cursor_pyodbc = None

        try:
            conn_pyodbc = pyodbc.connect(conn_str, autocommit=True)
            cursor_pyodbc = conn_pyodbc.cursor()
            _print_ok("Connected with pyodbc.")

            statements = [
                f"USE {db}",
                f"DROP TABLE IF EXISTS {table_name}",
                f"CREATE TABLE {table_name} (ID INT)",
                f"INSERT INTO {table_name} VALUES (1)",
                f"SELECT * FROM {table_name}",
                f"SELECT 1 WHERE OBJECT_ID('{table_name}', 'U') IS NOT NULL",
            ]

            for idx, sql in enumerate(statements, start=1):
                print(f"\n[{idx}] SQL: {sql}")
                cursor_pyodbc.execute(sql)
                try:
                    rows = cursor_pyodbc.fetchall()
                    if rows:
                        print(f"    rows: {rows}")
                except pyodbc.ProgrammingError:
                    pass

            _print_ok("Pure SQL path with pyodbc executed successfully.")

        except Exception as exc:  # noqa: BLE001
            _print_fail("pyodbc execution failed.")
            print(f"    type: {type(exc).__name__}")
            print(f"    msg : {exc}")
            print("    traceback:")
            print(traceback.format_exc())

        finally:
            if cursor_pyodbc is not None:
                try:
                    cursor_pyodbc.execute(f"DROP TABLE IF EXISTS {table_name}")
                    _print_ok(f"Cleanup (pyodbc): dropped {table_name}")
                except Exception as cleanup_exc:  # noqa: BLE001
                    _print_fail(f"Cleanup (pyodbc) failed: {cleanup_exc}")

            if conn_pyodbc is not None:
                try:
                    conn_pyodbc.close()
                except Exception:
                    pass

    except ImportError:
        _print_fail("pyodbc not installed. Skipping pyodbc test.")


def main() -> int:
    conn_str = os.environ.get("MSSQL_CONN_STR", DEFAULT_CONN_STR).strip()
    conn_str = _fix_common_connstr_typos(conn_str)

    params = _parse_conn_str(conn_str)
    database = params.get("database", WAREHOUSE_DB)
    table_name = "dbo.AF_TMP_13456x"

    conn_str_orig = conn_str
    conn_str_mssql = _sanitize_for_mssql_python(conn_str)

    _print_header("CONNECTION INPUT")
    print("Server:", params.get("server", "<missing>"))
    print("Database:", database)
    print("Authentication:", params.get("authentication", "<missing>"))
    print("ApplicationIntent:", params.get("applicationintent", "<missing>"))

    conn: mssql_python.Connection | None = None
    cursor: mssql_python.Cursor | None = None

    try:
        conn = mssql_python.connect(conn_str_mssql, autocommit=True)
        cursor = conn.cursor()
        _print_ok("Connected with mssql_python.")

        _run_pure_sql(cursor, database, table_name)
        _run_bulkcopy(cursor, table_name)
        _run_bcp_subprocess(params, table_name)
        rows = _exec_and_fetchall(cursor, f"SELECT * FROM {table_name} ORDER BY ID")
        print("Rows currently in table:", rows)

        _run_pyodbc_test(conn_str_orig, database, table_name)

        _print_header("FINAL CHECK")

    except Exception as exc:  # noqa: BLE001
        _print_fail("Connection or SQL stage failed before bulk tests completed.")
        print(f"type: {type(exc).__name__}")
        print(f"msg : {exc}")
        print(traceback.format_exc())
        return 1
    finally:
        if cursor is not None:
            try:
                cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
                _print_ok(f"Cleanup: dropped {table_name}")
            except Exception as cleanup_exc:  # noqa: BLE001
                _print_fail(f"Cleanup failed: {cleanup_exc}")

        if conn is not None:
            try:
                conn.close()
            except Exception:
                pass

    return 0


if __name__ == "__main__":
    sys.exit(main())

requirements_bulk_copy_issue.txt (pinned dependencies used in the repro)

#
# This file is autogenerated by pip-compile with Python 3.12
# by the following command:
#
#    pip-compile --output-file=requirements_bulk_copy_issue.txt --strip-extras requirements_bulk_copy_issue.in
#
azure-core==1.41.0
    # via azure-identity
azure-identity==1.25.3
    # via mssql-python
certifi==2026.5.20
    # via requests
cffi==2.0.0
    # via cryptography
charset-normalizer==3.4.7
    # via requests
cryptography==48.0.0
    # via
    #   azure-identity
    #   msal
    #   pyjwt
idna==3.18
    # via requests
msal==1.37.0
    # via
    #   azure-identity
    #   msal-extensions
msal-extensions==1.3.1
    # via azure-identity
mssql-python==1.8.0
    # via -r requirements_bulk_copy_issue.in
pycparser==3.0
    # via cffi
pyjwt==2.13.0
    # via
    #   msal
    #   pyjwt
pyodbc==5.3.0
    # via -r requirements_bulk_copy_issue.in
requests==2.34.2
    # via
    #   azure-core
    #   msal
typing-extensions==4.15.0
    # via
    #   azure-core
    #   azure-identity
urllib3==2.7.0
    # via requests

Full Program Output

==========================================================================================
CONNECTION INPUT
==========================================================================================
Server: ***.datawarehouse.fabric.microsoft.com
Database: wr_sample
Authentication: ActiveDirectoryServicePrincipal
ApplicationIntent: ReadWrite
[OK] Connected with mssql_python.

==========================================================================================
STEP 1 - Pure SQL through mssql_python
==========================================================================================

[1] SQL: USE wr_sample

[2] SQL: DROP TABLE IF EXISTS dbo.AF_TMP_13456x

[3] SQL: CREATE TABLE dbo.AF_TMP_13456x (ID INT)

[4] SQL: INSERT INTO dbo.AF_TMP_13456x VALUES (1)

[5] SQL: SELECT * FROM dbo.AF_TMP_13456x
    rows: [(1,)]

[6] SQL: SELECT 1 WHERE OBJECT_ID('dbo.AF_TMP_13456x', 'U') IS NOT NULL
    rows: [(1,)]
[OK] Pure SQL path executed successfully.

==========================================================================================
STEP 2 - cursor.bulkcopy(...) in same connection
==========================================================================================
[FAIL] bulkcopy crashed in 23.683s
    type: RuntimeError
    msg : Failed to connect to SQL Server: IO error: O nome da rede especificado não está mais disponível. (os error 64)
    traceback:
Traceback (most recent call last):
  File "C:\repo\samples\mssql_python_bulkcopy_vs_bcp_datalake_minimal.py", line 119, in _run_bulkcopy
    result = cursor.bulkcopy(
             ^^^^^^^^^^^^^^^^
  File "c:\repo\.venv\Lib\site-packages\mssql_python\cursor.py", line 3036, in bulkcopy
    raise type(e)(str(e)) from None
RuntimeError: Failed to connect to SQL Server: IO error: O nome da rede especificado não está mais disponível. (os error 64)

==========================================================================================
STEP 3 - subprocess.run(['bcp', ...]) in same target
==========================================================================================
BCP command:
bcp dbo.AF_TMP_13456x in C:\Users\myuser\AppData\Local\Temp\tmpgw2cuj0g.csv -S ***.datawarehouse.fabric.microsoft.com -c -t, -h TABLOCK -U *** -P ***
returncode=1
stdout:
SQLState = 37000, NativeError = 24803
Error = [Microsoft][ODBC Driver 18 for SQL Server][SQL Server]Authentication token is missing in the federated authentication message.
stderr:
<empty>
[FAIL] bcp failed (non-zero exit code).
Rows currently in table: [(1,)]

==========================================================================================
STEP 4 - Pure SQL through pyodbc (for comparison)
==========================================================================================
[OK] pyodbc imported successfully.
[OK] Connected with pyodbc.

[1] SQL: USE wr_sample

[2] SQL: DROP TABLE IF EXISTS dbo.AF_TMP_13456x

[3] SQL: CREATE TABLE dbo.AF_TMP_13456x (ID INT)

[4] SQL: INSERT INTO dbo.AF_TMP_13456x VALUES (1)

[5] SQL: SELECT * FROM dbo.AF_TMP_13456x
    rows: [(1,)]

[6] SQL: SELECT 1 WHERE OBJECT_ID('dbo.AF_TMP_13456x', 'U') IS NOT NULL
    rows: [(1,)]
[OK] Pure SQL path with pyodbc executed successfully.
[OK] Cleanup (pyodbc): dropped dbo.AF_TMP_13456x

==========================================================================================
FINAL CHECK
==========================================================================================
[OK] Cleanup: dropped dbo.AF_TMP_13456x

Expected Behavior

  • cursor.bulkcopy() should successfully bulk-insert rows into a Fabric Data Warehouse table when the parent connection was authenticated with ActiveDirectoryServicePrincipal. The internal connection opened by bulkcopy should reuse or re-acquire the OAuth token from the same credential context, rather than opening a raw TCP session that is torn down by the Fabric endpoint during the federated auth handshake.
  • BCP subprocess — if this path is intended as a fallback, it should either use an authentication mode compatible with Fabric DW (e.g., via an Azure AD access token passed through -G / token-based auth), or document clearly that BCP-based bulk copy is not supported for ActiveDirectoryServicePrincipal against Fabric endpoints.

Further Technical Details

Field Value
mssql-python version 1.8.0
pyodbc version 5.3.0
Python version 3.12.8
Operating system Windows 10 / Windows 11 (x64)
ODBC Driver ODBC Driver 18 for SQL Server
BCP version (1st binary) 17.0.4045.5
BCP version (2nd binary) 2018.186.02.01
Target server type Microsoft Fabric Data Warehouse
Authentication method ActiveDirectoryServicePrincipal (Entra ID, client credentials flow)
cursor.bulkcopy() error RuntimeError: Failed to connect to SQL Server: IO error: ... (os error 64)
BCP subprocess error SQLState = 37000, NativeError = 24803 — Authentication token is missing in the federated authentication message.

Additional Context

  • Normal cursor.execute() and cursor.fetchall() over the same mssql_python connection succeed without issue — so the credential and connection string are valid.
  • pyodbc with the identical connection string (including Driver=) also works for all DML/query operations.
  • The cursor.bulkcopy() failure takes ~23 s before raising, which matches a TCP-level timeout — consistent with the Fabric DW endpoint dropping the unauthenticated internal connection.
  • The OS error 64 (ERROR_NETNAME_DELETED) is a Windows network error meaning the remote side closed the network session, further supporting the theory that the internal bulk-copy connection never completes the OAuth/federated token exchange.
  • BCP NativeError 24803 ("Authentication token is missing in the federated authentication message") is a well-known Fabric error when a client connects without a valid Entra ID token. While ODBC Driver 18 + BCP support Authentication=ActiveDirectoryServicePrincipal in theory (via -G -U -P), the actual OAuth token negotiation and acquisition for Service Principal credentials against Fabric Data Warehouse appears to fail or be unsupported in the tested BCP versions. The ODBC layer (used by pyodbc and mssql-python) handles this correctly, suggesting the problem is specific to BCP's token handling for this scenario.

Metadata

Metadata

Assignees

Labels

triage neededFor new issues, not triaged yet.

Type

No type
No fields configured for issues without a type.

Projects

No projects

Milestone

No milestone

Relationships

None yet

Development

No branches or pull requests

Issue actions