## Convert SQL Server table definitions to Spark SQL table definitions for Fabric Lakehouse

### 1. Set the SQL Server DDL to be converted

In [None]:
# SQL Server DDL Statements (Multiple Tables Supported)
# Paste the target DDL into sql_input
sql_input = """
CREATE TABLE [dbo].[Sales] (
    [EnrollmentId] INT IDENTITY(1,1) PRIMARY KEY,
    [CustomerName] VARCHAR(100) NOT NULL,
    [Amount] DECIMAL(10,2),
    [CreatedDate] DATETIME,
    [IsActive] BIT,
    CONSTRAINT PK_Sales PRIMARY KEY (EnrollmentId)
);

CREATE TABLE [dbo2].[Products] (
    [ProductId] INT IDENTITY(1,1),
    [ProductName] VARCHAR(255) NOT NULL,
    [Price] DECIMAL(8,2),
    [CreatedOn] DATETIME DEFAULT GETDATE()
);
"""

### 2. Data Type Mapping

In [None]:
# SQL Server -> Spark SQL type mapping definition
# Edit as needed
SQLSERVER_TO_SPARK_TYPE_MAP = {
    # Numeric types
    "TINYINT": "INT",
    "SMALLINT": "INT",
    "INT": "INT",
    "BIGINT": "BIGINT",
    "DECIMAL": "DECIMAL",       # Precision will be added later
    "NUMERIC": "DECIMAL",       # Same as above
    "FLOAT": "DOUBLE",
    "REAL": "FLOAT",            # Change it to DOUBLE if you value accuracy

    # Currency type
    "MONEY": "DECIMAL(19,4)",
    "SMALLMONEY": "DECIMAL(10,4)",

    # String type
    "CHAR": "STRING",
    "NCHAR": "STRING",
    "VARCHAR": "STRING",
    "NVARCHAR": "STRING",
    "TEXT": "STRING",
    "NTEXT": "STRING",

    # Date/Time Types
    "DATE": "DATE",
    "TIME": "STRING",              # Spark does not have a TIME type
    "DATETIME": "TIMESTAMP",
    "DATETIME2": "TIMESTAMP",
    "SMALLDATETIME": "TIMESTAMP",
    "DATETIMEOFFSET": "TIMESTAMP",  # Timezone is ignored

    # Logical types
    "BIT": "BOOLEAN",

    # Binary types
    "BINARY": "BINARY",
    "VARBINARY": "BINARY",
    "IMAGE": "BINARY",

    # Other special types
    "UNIQUEIDENTIFIER": "STRING",     # Can be used as UUID
    "XML": "STRING",
    "SQL_VARIANT": "STRING",          # Substituted with string
    #"CURSOR": "STRING",               # Not available. It is recommended to issue a warning
    #"TABLE": "STRING",                # Not available. It is recommended to issue a warning
    "HIERARCHYID": "STRING",          # Special type. Usually held by ID
    "GEOGRAPHY": "STRING",            # Spatial data, sometimes stored in WKT format
    "GEOMETRY": "STRING",
    "ROWVERSION": "BINARY",           # Alternatively can convert it to BIGINT
    "TIMESTAMP": "BINARY"             # SQL Server Timestamp is for version
}


### 3. Check data type mapping

In [None]:
def detect_unmapped_types(sql_text: str, type_map: dict) -> set:
    # Remove comments
    sql_text = re.sub(r'--.*', '', sql_text)
    sql_text = re.sub(r'/\*.*?\*/', '', sql_text, flags=re.DOTALL)

    # Multiple CREATE TABLE support
    create_blocks = re.findall(r'CREATE\s+TABLE.*?\((.*?)\);', sql_text, flags=re.IGNORECASE | re.DOTALL)
    used_types = set()

    for block in create_blocks:
        lines = block.splitlines()
        for line in lines:
            line = line.strip().rstrip(',')

            # Exclude table constraints
            if not line or re.match(r'CONSTRAINT|PRIMARY\s+KEY|FOREIGN\s+KEY', line, flags=re.IGNORECASE):
                continue

            # Extract column name + data type
            match = re.match(r'\[?\w+\]?\s+([a-zA-Z0-9_]+)', line)
            if match:
                dtype = match.group(1).upper()
                used_types.add(dtype)

    # Extract Unmapped Types
    unmapped = {t for t in used_types if t not in type_map}
    return unmapped


# Check if unmapped types are included
unmapped = detect_unmapped_types(sql_input, SQLSERVER_TO_SPARK_TYPE_MAP)
if unmapped:
    print("Unmapped SQL Server data types found:")
    for dtype in unmapped:
        print(f"  - {dtype}")
else:
    print("All data types are mapped.")

### 4. Define the conversion function

In [None]:
import re

def remove_comments(sql_text):
    sql_text = re.sub(r'--.*', '', sql_text)
    sql_text = re.sub(r'/\*.*?\*/', '', sql_text, flags=re.DOTALL)
    return sql_text

def extract_create_statements(sql_text):
    return re.findall(r'CREATE\s+TABLE\s+.*?\(.*?\);', sql_text, re.IGNORECASE | re.DOTALL)

def clean_column_line(line, type_map):
    line = line.strip().rstrip(',')
    if not line or line.upper().startswith("CONSTRAINT") or "PRIMARY KEY" in line.upper():
        return None

    line = re.sub(r'\bIDENTITY\s*\(\d+,\s*\d+\)', '', line, flags=re.IGNORECASE)
    line = re.sub(r'\bDEFAULT\b\s+[^\s,]+', '', line, flags=re.IGNORECASE)
    line = re.sub(r'\bNOT\s+NULL\b|\bNULL\b', '', line, flags=re.IGNORECASE)

    match = re.match(r'\[?(\w+)\]?\s+([A-Z]+)(\(\d+(,\d+)?\))?', line, re.IGNORECASE)
    if not match:
        return None

    col_name = match.group(1)
    col_type = match.group(2).upper()
    precision = match.group(3) or ""

    spark_type = type_map.get(col_type, "STRING")
    if spark_type == "DECIMAL" and precision:
        spark_type += precision

    return f"  {col_name} {spark_type}"

def convert_sqlserver_to_spark(sql_text: str, type_map: dict, table_format="delta", remove_schema=False) -> list:
    sql_text = remove_comments(sql_text)
    create_statements = extract_create_statements(sql_text)
    results = []

    for stmt in create_statements:
        table_match = re.search(r'CREATE\s+TABLE\s+(\[?\w+\]?\.?\[?\w+\]?)', stmt, re.IGNORECASE)
        full_table_name = table_match.group(1).replace('[', '').replace(']', '') if table_match else "converted_table"

        if remove_schema and '.' in full_table_name:
            table_name = full_table_name.split('.')[-1]
        else:
            table_name = full_table_name

        inner = re.search(r'\((.*)\)', stmt, re.DOTALL)
        if not inner:
            continue

        column_lines = inner.group(1).splitlines()
        spark_columns = []

        for line in column_lines:
            converted = clean_column_line(line, type_map)
            if converted:
                spark_columns.append(converted)

        spark_ddl = f"CREATE TABLE {table_name} (\n" + ",\n".join(spark_columns) + f"\n) USING {table_format};"
        results.append((table_name, spark_ddl))

    return results

### 5. Create table with converted DDL

In [None]:
# If you want to run the DDL with Spark SQL, uncomment the last line.
# If you are using Lakehouse with a schema, change the argument to "remove_schema=False".
converted = convert_sqlserver_to_spark(sql_input, type_map=SQLSERVER_TO_SPARK_TYPE_MAP, table_format="delta", remove_schema=True)

for table_name, spark_sql in converted:
    print(f"\n--- Creating table: {table_name} ---\n")
    print(spark_sql)
    #spark.sql(spark_sql)