# Read and write from Spark to SQL using the MSSQL Spark Connector
A typical big data scenario a key usage pattern is high volume, velocity and variety data processing in Spark followed with batch or streaming writes to SQL for access to LOB applications. These usage patterns greatly benefit from a connector that utilizes key SQL optimizations and provides an efficient write to SQLServer master instance and SQL Server data pool in Big Data Clusters.

MSSQL Spark connector provides an efficient write SQLServer master instance and SQL Server data pool in Big Data Clusters.

Usage
----
- Familiar Spark DataSource V1 interface
- Referenced by the name "mssql" or fully qualified "com.microsoft.sqlserver.jdbc.spark"
- Use from supported Spark language bindings ( Python, Scala, Java, R)
- Optionally pass Bulk Copy parameters 

** Note : The image here may not be visible dues to markdown bug. Please change path here to full path to view the image.
<img src =
"../data-virtualization/MSSQL_Spark_Connector2.jpg" style="float: center;" alt="drawing" width="900">

More details
-----------

MSSQL Spark connector, uses [SQL Server Bulk copy APIS](https://docs.microsoft.com/en-us/sql/connect/jdbc/using-bulk-copy-with-the-jdbc-driver?view=sql-server-2017#sqlserverbulkcopyoptions) to implement an efficient write to SQL Server. The connector is based on Spark Data source APIs and provides a familiar JDBC interface for access

The Sample
---------

The following sample shows MSSQL JDBC Connector for read/write QLServer master instance and SQL Server data pool in Big Data Clusters. The sample is divided into 2 parts. The first part shows read/write to SQL Master instance and Part 2 shows read/write to Data Pools in Big Data Cluster. 

In the sample we' ll 
- Read a file from HDFS and do some basic processing 
- In Part 1, we'll write the dataframe to SQL server table and then read the table to a dataframe .
- In Part 2, we'll write the dataframe to SQL Server data pool external table and then read it back to a spark data frame. 

PreReq: 
- The sample uses a SQL database named "MyTestDatabase". Create this before you run this sample. The database can be created as follows
    ``` sql
    Create DATABASE MyTestDatabase
    GO 
    ``` 
- Download [AdultCensusIncome.csv]( https://amldockerdatasets.azureedge.net/AdultCensusIncome.csv ) to your local machine.  Create a hdfs folder named spark_data and upload the file there. 
- [For CTP2.5] Configure the spark session to use the MSSQL Connector jar. The jar can be found at /jar/spark-mssql-connector-assembly-1.0.0.jar post deployment of Big Data Cluster.

``` sh
    %%configure -f
    {"conf": {"spark.jars": "/jar/spark-mssql-connector-assembly-1.0.0.jar"}}
```


    

# Read CSV into a data frame
In this step we read the CSV into a data frame and do some basic cleanup steps. 




In [3]:
#spark = SparkSession.builder.getOrCreate()
sc.setLogLevel("INFO")

#Read a file and then write it to the SQL table
datafile = "/spark_data/AdultCensusIncome.csv"
df = spark.read.format('csv').options(header='true', inferSchema='true', ignoreLeadingWhiteSpace='true', ignoreTrailingWhiteSpace='true').load(datafile)
df.show(5)


#Process this data. Very simple data cleanup steps. Replacing "-" with "_" in column names
columns_new = [col.replace("-", "_") for col in df.columns]
df = df.toDF(*columns_new)
df.show(5)


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
0,application_1561072870175_0001,pyspark3,idle,Link,Link,✔


SparkSession available as 'spark'.


+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt|education|education-num|    marital-status|       occupation| relationship| race|   sex|capital-gain|capital-loss|hours-per-week|native-country|income|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 39|       State-gov| 77516|Bachelors|           13|     Never-married|     Adm-clerical|Not-in-family|White|  Male|        2174|           0|            40| United-States| <=50K|
| 50|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|           0|           0|            13| United-States| <=50K|
| 38|         Private|215646|  HS-grad|            9|          Divorced|Handlers-cleaners|Not-i

# (PART 1) Write and READ to SQL Table
- Write dataframe to SQL table to SQL Master
- Read SQL Table to Spark dataframe

In [5]:
#Write from Spark to SQL table using MSSQL Spark Connector
print("Use MSSQL connector to write to master SQL instance ")

servername = "jdbc:sqlserver://master-0.master-svc"
dbname = "MyTestDatabase"
url = servername + ";" + "databaseName=" + dbname + ";"

dbtable = "AdultCensus_test"
user = "*****"
password = "*****" # Please specify password here

#com.microsoft.sqlserver.jdbc.spark

try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", dbtable) \
    .option("user", user) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("MSSQL Connector write failed", error)

print("MSSQL Connector write(overwrite) succeeded  ")




Use MSSQL connector to write to master SQL instance 
MSSQL Connector write(overwrite) succeeded

In [6]:
#Use mode as append
try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", dbtable) \
    .option("user", user) \
    .option("password", password) \
    .save()
except ValueError as error :
    print("MSSQL Connector write failed", error)

print("MSSQL Connector write(append) succeeded  ")

MSSQL Connector write(append) succeeded

In [7]:
#Read from SQL table using MSSQ Connector
print("read data from SQL server table  ")
jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", dbtable) \
        .option("user", user) \
        .option("password", password).load()

jdbcDF.show(5)

read data from SQL server table  
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt|education|education_num|    marital_status|       occupation| relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+----------------+------+---------+-------------+------------------+-----------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 39|       State-gov| 77516|Bachelors|           13|     Never-married|     Adm-clerical|Not-in-family|White|  Male|        2174|           0|            40| United-States| <=50K|
| 50|Self-emp-not-inc| 83311|Bachelors|           13|Married-civ-spouse|  Exec-managerial|      Husband|White|  Male|           0|           0|            13| United-States| <=50K|
| 38|         Private|215646|  HS-grad|            9|        

# (PART 2) Write and READ to Data Pool external Tables in Big Data Cluster
- Write dataframe to SQL external table in Data Pools in Big Data Cluste
- Read SQL external Table to Spark dataframe

In [8]:
#Write from Spark to SQL table using MSSQL Spark Connector
print("Use MSSQL connector to write to master SQL instance ")

datapool_table = "AdultCensus_DataPoolTable"
datasource_name = "test_data_src"


try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("overwrite") \
    .option("url", url) \
    .option("dbtable", datapool_table) \
    .option("user", user) \
    .option("password", password) \
    .option("dataPoolDataSource",datasource_name)\
    .save()
except ValueError as error :
    print("MSSQL Connector write failed", error)

print("MSSQL Connector write(overwrite) to data pool external table succeeded")


Use MSSQL connector to write to master SQL instance 
MSSQL Connector write(overwrite) to data pool external table succeeded

In [9]:
try:
  df.write \
    .format("com.microsoft.sqlserver.jdbc.spark") \
    .mode("append") \
    .option("url", url) \
    .option("dbtable", datapool_table) \
    .option("user", user) \
    .option("password", password) \
    .option("dataPoolDataSource",datasource_name)\
    .save()
except ValueError as error :
    print("MSSQL Connector write failed", error)

print("MSSQL Connector write(append) to data pool external table succeeded")

MSSQL Connector write(append) to data pool external table succeeded

In [10]:
#Read from SQL table using MSSQ Connector
print("read data from SQL server table  ")
jdbcDF = spark.read \
        .format("com.microsoft.sqlserver.jdbc.spark") \
        .option("url", url) \
        .option("dbtable", datapool_table) \
        .option("user", user) \
        .option("password", password)\
        .load()

jdbcDF.show(5)

print("MSSQL Connector read from data pool external table succeeded")

read data from SQL server table  
+---+----------------+------+----------+-------------+------------------+---------------+-------------+-----+------+------------+------------+--------------+--------------+------+
|age|       workclass|fnlwgt| education|education_num|    marital_status|     occupation| relationship| race|   sex|capital_gain|capital_loss|hours_per_week|native_country|income|
+---+----------------+------+----------+-------------+------------------+---------------+-------------+-----+------+------------+------------+--------------+--------------+------+
| 46|Self-emp-not-inc|277946|Assoc-acdm|           12|         Separated|   Craft-repair|Not-in-family|White|  Male|           0|           0|            40| United-States| <=50K|
| 38|         Private| 91039| Bachelors|           13|Married-civ-spouse|          Sales|      Husband|White|  Male|       15024|           0|            60| United-States|  >50K|
| 18|         Private|156764|      11th|            7|     Never-m