#Azure SQL to CosmosDB Data Modeling and Migration
https://github.com/shan-sharma/pyspark-azuresql-cosmos

In [0]:
#Connection Parameters
server_name = "jdbc:sqlserver://<your-server-name>.database.windows.net" #Add <your-server-name>
database_name = "AdventureWorks2017"
url = server_name + ";" + "databaseName=" + database_name + ";"

username = "<your-user-name>" # Add <your-user-name>
password = "<your-password>" # Add <your-password>

In [0]:
#table_name = "dbo.Product"

query = """
(
	SELECT  so.SalesOrderId AS id
	       ,so.CustomerId AS CustomerId
	       ,so.OrderDate AS OrderDate
	       ,so.ShipDate AS ShipDate
	       ,(
	SELECT  sod.SalesOrderDetailId AS SalesOrderDetailId
	       ,sod.Sku AS Sku
	       ,sod.Name AS Name
	       ,sod.Price AS Price
	       ,sod.Quantity AS Quantity
	FROM SalesOrderDetail sod
	WHERE so.SalesOrderId = sod.SalesOrderId for json auto) AS OrderDetails 
	FROM SalesOrder so
) AS query"""

In [0]:
jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", query) \
        .option("user", username) \
        .option("password", password).load()

In [0]:
from pyspark.sql.types import *

orderDetailsSchema = ArrayType(StructType([
    StructField("SalesOrderDetailId",StringType()),
    StructField("Sku",StringType()),
    StructField("Name",StringType()),
    StructField("Price",DoubleType()),
    StructField("Quantity",IntegerType())
]))

In [0]:
from pyspark.sql import functions as F

writeToCosmosDF = jdbcDF.select(
  F.col("id").cast("string"),
  F.col("CustomerId").cast("string"),
  F.col("OrderDate").cast("timestamp"),
  F.col("ShipDate").cast("timestamp"),
  F.from_json(F.col("OrderDetails"), orderDetailsSchema).alias("OrderDetails")
)

In [0]:
cosmosEndpoint = "https://<your-cosmos-server-name>.documents.azure.com:443/" #Add <your-cosmos-server-name>
cosmosMasterKey = "" #Add PrimaryKey for authorization
cosmosDatabaseName = "AnalyticsStore"
cosmosContainerName = "OrderDetailEmbedd"

cfg = {
  "spark.cosmos.accountEndpoint" : cosmosEndpoint,
  "spark.cosmos.accountKey" : cosmosMasterKey,
  "spark.cosmos.database" : cosmosDatabaseName,
  "spark.cosmos.container" : cosmosContainerName,
}



In [0]:
# Configure Catalog Api to be used
spark.conf.set("spark.sql.catalog.cosmosCatalog", "com.azure.cosmos.spark.CosmosCatalog")
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountEndpoint", cosmosEndpoint)
spark.conf.set("spark.sql.catalog.cosmosCatalog.spark.cosmos.accountKey", cosmosMasterKey)

# create a cosmos database using catalog api
spark.sql("CREATE DATABASE IF NOT EXISTS cosmosCatalog.{};".format(cosmosDatabaseName))

# create a cosmos container using catalog api
spark.sql("CREATE TABLE IF NOT EXISTS cosmosCatalog.{}.{} using cosmos.oltp TBLPROPERTIES(partitionKeyPath = '/id', manualThroughput = '1100')".format(cosmosDatabaseName, cosmosContainerName))

In [0]:
(writeToCosmosDF.write\
   .format("cosmos.oltp")\
   .options(**cfg)\
   .mode("APPEND")\
   .save())