In [None]:
from pyspark.sql.functions import col
import pyspark.sql.types 

In [None]:
%%pyspark
# Reading data from Customer table, select columns that we will use.
CustomerDF = spark.sql("SELECT CustomerId, CustomerEstablishedDate, CustomerTypeId, LedgerId  FROM `WWI_Hack`.`Customer` ")
CustomerDF.show(10)

In [None]:
# Reading data from LegalEntityCustomer table, select columns that we will use.
LECustomerDF = spark.sql("SELECT CustomerId,LegalEntityName,LegalEntityDateOfEstablishment,StateOfLegalEntityEstablishment FROM `WWI_Hack`.`LegalEntityCustomer` ")
LECustomerDF.show(10)

In [None]:
# Joining customer data 
inner_join = CustomerDF.alias("a").join(LECustomerDF.alias("b"), CustomerDF.CustomerId == LECustomerDF.CustomerId).select("a.*","b.LegalEntityName","b.LegalEntityDateOfEstablishment","b.StateOfLegalEntityEstablishment")

In [None]:
# Check schema for mismatch with CustomerDim table
inner_join.printSchema()

In [None]:
# Casting TimestampType
inner_join=inner_join.withColumn("CustomerEstablishedDate",col("CustomerEstablishedDate").cast(TimestampType()))\
                     .withColumn("LegalEntityDateOfEstablishment",col("LegalEntityDateOfEstablishment").cast(TimestampType())

In [None]:
# Create a schema to match CustomerDim table
CustomerDimSchema =    [StructField('CustomerId',IntegerType(),False),\
                        StructField('CustomerEstablishedDate',TimestampType(),True),\
                        StructField('CustomerTypeId',IntegerType(),True),\
                        StructField('LedgerId',IntegerType(),True),\
                        StructField('LegalEntityName',StringType(),True),\
                        StructField('LegalEntityDateOfEstablishment',TimestampType(),True),\
                        StructField('StateOfLegalEntityEstablishment',StringType(),True)]
CustomerDim = sqlContext.createDataFrame(inner_join.rdd, StructType(CustomerDimSchema))

In [None]:
CustomerDim.show(10)

In [None]:
# Write  using AAD Auth to internal table
# Add required imports
import com.microsoft.spark.sqlanalytics
from com.microsoft.spark.sqlanalytics.Constants import Constants

# Configure and submit the request to write to Synapse Dedicated SQL Pool
# Sample below is using AAD-based authentication approach; See further examples to leverage SQL Basic auth.
(CustomerDim.write
 # If `Constants.SERVER` is not provided, the `<database_name>` from the three-part table name argument
 # to `synapsesql` method is used to infer the Synapse Dedicated SQL End Point.
 .option(Constants.SERVER, "synapsedwhdemo.sql.azuresynapse.net")
 # Choose a save mode that is apt for your use case.
 # Options for save modes are "error" or "errorifexists" (default), "overwrite", "append", "ignore".
 # refer to https://spark.apache.org/docs/latest/sql-data-sources-load-save-functions.html#save-modes
 .mode("overwrite")
 # Required parameter - Three-part table name to which data will be written
 .synapsesql("SqlPool01.WWI.CustomerDim"))

