In [1]:
schema="SalesLT"
table="CustomerAddress"
structure = """[
        {
            "physicalName": "CustomerID",
            "type": "Int32",
            "logicalType": "Int32",
            "name": "CustomerID",
            "physicalType": "int",
            "precision": 10,
            "scale": 255,
            "DotNetType": "System.Int32, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089"
        },
        {
            "physicalName": "AddressID",
            "type": "Int32",
            "logicalType": "Int32",
            "name": "AddressID",
            "physicalType": "int",
            "precision": 10,
            "scale": 255,
            "DotNetType": "System.Int32, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089"
        },
        {
            "physicalName": "AddressType",
            "type": "String",
            "logicalType": "String",
            "name": "AddressType",
            "physicalType": "nvarchar",
            "precision": 255,
            "scale": 255,
            "DotNetType": "System.String, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089"
        },
        {
            "physicalName": "rowguid",
            "type": "Guid",
            "logicalType": "Guid",
            "name": "rowguid",
            "physicalType": "uniqueidentifier",
            "precision": 255,
            "scale": 255,
            "DotNetType": "System.Guid, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089"
        },
        {
            "physicalName": "ModifiedDate",
            "type": "DateTime",
            "logicalType": "DateTime",
            "name": "ModifiedDate",
            "physicalType": "datetime",
            "precision": 23,
            "scale": 3,
            "DotNetType": "System.DateTime, mscorlib, Version=4.0.0.0, Culture=neutral, PublicKeyToken=b77a5c561934e089"
        }
    ]"""

In [2]:
from pyspark.sql.types import *

def convert_sql_datatype_to_structfield(sql_datatype, column_name):
    if sql_datatype == 'bigint':
        return StructField(column_name, LongType(), True)
    elif sql_datatype == 'binary':
        return StructField(column_name, BinaryType(), True)
    elif sql_datatype == 'bit':
        return StructField(column_name, BooleanType(), True)
    elif sql_datatype == 'char':
        return StructField(column_name, StringType(), True)
    elif sql_datatype == 'date':
        return StructField(column_name, DateType(), True)
    elif sql_datatype == 'datetime':
        return StructField(column_name, TimestampType(), True)
    elif sql_datatype == 'decimal':
        return StructField(column_name, DecimalType(), True)
    elif sql_datatype == 'float':
        return StructField(column_name, FloatType(), True)
    elif sql_datatype == 'int':
        return StructField(column_name, IntegerType(), True)
    elif sql_datatype == 'money':
        return StructField(column_name, DecimalType(), True)
    elif sql_datatype == 'nchar':
        return StructField(column_name, StringType(), True)
    elif sql_datatype == 'ntext':
        return StructField(column_name, StringType(), True)
    elif sql_datatype == 'numeric':
        return StructField(column_name, DecimalType(), True)
    elif sql_datatype == 'nvarchar':
        return StructField(column_name, StringType(), True)
    elif sql_datatype == 'real':
        return StructField(column_name, FloatType(), True)
    elif sql_datatype == 'smalldatetime':
        return StructField(column_name, TimestampType(), True)
    elif sql_datatype == 'smallint':
        return StructField(column_name, ShortType(), True)
    elif sql_datatype == 'smallmoney':
        return StructField(column_name, DecimalType(), True)
    elif sql_datatype == 'text':
        return StructField(column_name, StringType(), True)
    elif sql_datatype == 'time':
        return StructField(column_name, StringType(), True)
    elif sql_datatype == 'timestamp':
        return StructField(column_name, TimestampType(), True)
    elif sql_datatype == 'tinyint':
        return StructField(column_name, ByteType(), True)
    elif sql_datatype == 'uniqueidentifier':
        return StructField(column_name, StringType(), True)
    elif sql_datatype == 'varbinary':
        return StructField(column_name, BinaryType(), True)
    elif sql_datatype == 'xml':
        return StructField(column_name, BinaryType(), True)        
    elif sql_datatype == 'varchar':
        return StructField(column_name, StringType(), True)
    else:
        raise ValueError("Invalid SQL data type: {}".format(sql_datatype))


In [3]:
import json
import pandas as pd

df_pandas = pd.read_json(structure)
# create DataFrame
df_spark = spark.createDataFrame(df_pandas)

display(df_spark)

In [4]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, DoubleType, BooleanType, DecimalType, DateType, TimestampType

# Create the StructType by iterating over the columns of the table
fields = []
for row in df_spark.collect():
    column_name = row.name
    column_type = row.physicalType
    fields.append(convert_sql_datatype_to_structfield(column_type, column_name))

struct_type = StructType(fields)

# Print the StructType
print(struct_type)


In [5]:
#Return Struct Values
mssparkutils.notebook.exit(struct_type)  