## JDBC - SQL Server - pyspark DF

In [2]:

jdbcHostname = "some_server"   
jdbcDatabase = "db"
jdbcPort = 1433
jdbcUsername = dbutils.secrets.get(scope = "", key = "")     # need to specify scope and key
jdbcPassword = dbutils.secrets.get(scope = "", key = "")      # need to specify scope and key
MSSQLdriver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
jdbcUrl = "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : MSSQLdriver
}

### TRY USING load()/save() methods over jdbc() method, it is easy to read!!!

### READ TO DF
  - https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameReader

In [5]:
# READ TO DF


# JDBC method
pushdown_query = "(select top 1 * FROM DBtest.dbo.test_table) some_alias"            # NOTE the alias, it is important!
df = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)
display(df)

# jdbc(url, table, column=None, lowerBound=None, upperBound=None, numPartitions=None, predicates=None, properties=None)
df = spark.read.jdbc(url=jdbcUrl, table="employees", column="emp_no", lowerBound=1, upperBound=100000, numPartitions=100)
display(df)

  
# OR

# load method    
# NOTE the "fetchsize" option!!!!
# OTHER OPTION THAT CAN IMPACT SPEED IS "numPartitions" (for both READ and WRITE)  -- this can cause issues on the server if used inappropriately!!


#NOTE dont NEED to add any trailing or leading spaces when splitting a long line of code!!

df = spark.read\
.format("jdbc")\
.option("url", jdbcUrl)\
.option("driver", MSSQLdriver)\
.option("dbtable", "schema.tablename")\
.option("user", "username")\
.option("password", "password")\
.option("customSchema", "id DECIMAL(38, 0), name STRING")\
.option("lowerBound", min)\
.option("upperBound", max)\
.option("numPartitions", numPartitions)\
.option("partitionColumn", primaryKey)\
.option("fetchsize", 10000)
.load()

## understand bounds to load data in parallel

In [6]:
# EXAMPLE  USING "query" INSTEAD OF "dbtable"

queryREAD =  """
SELECT 
     col1
     ,col2
     ,col3
  FROM DBtest.dbo.test_table
  where col3 = 'test'
"""

df = spark.read\
.format("jdbc")\
.option("url", jdbcUrl)\
.option("driver", MSSQLdriver)\
.option("query", queryREAD)\
.option("user", jdbcUsername)\
.option("password", jdbcPassword)\
.option("fetchsize", 10000).load()
  
display(df)

## WITH SPACING  -- ONE space before \  and TWO leading spaces before each subsequent line

df = spark.read \
  .format("jdbc") \
  .option("url", jdbcUrl) \
  .option("driver", MSSQLdriver) \
  .option("query", queryREAD) \
  .option("user", jdbcUsername) \
  .option("password", jdbcPassword) \
  .option("fetchsize", 10000) \
  .load()
  
display(df)



### WRITE FROM DF  
-  https://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.DataFrameWriter

In [8]:
# WRITE FROM DF  

# truncate, True means same as Truncate in SQL, keeps schema!!!!
# truncate, False means DROP and RECREATE  the table, loses schema!!!!

someDF.write\
.mode("overwrite")\
.option("truncate", True)\
.format("jdbc")\
.option("url", jdbcUrl)\
.option("dbtable", "dbo.test_tab2")\
.option("user", jdbcUsername)\
.option("password", jdbcPassword)\
.option("driver", MSSQLdriver)\
.option("batchsize", 10000)\
.save()


# OTHER OPTIONS
    #  .option("createTableColumnTypes", "col1 INT, col2 VARCHAR(128), col3 VARCHAR(128)")



# jdbc method
  #  jdbc(url, table, mode=None, properties=None)

### REFERENCE:
   - https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html
   -  https://github.com/apache/spark/blob/master/examples/src/main/python/sql/datasource.py
    
### OVERWRITE/Truncate  
 -    https://github.com/apache/spark/pull/14086
  
    - MODE
      - mode – specifies the behavior of the save operation when data already exists.
          - append: Append contents of this DataFrame to existing data.
          - overwrite: Overwrite existing data.
          - ignore: Silently ignore this operation if data already exists.
          - error or errorifexists (default case): Throw an exception if data already exists.

  
### ALL JDBC PROPERTIES   -- READ THIS DOC!!!!!
  -  SEE: https://spark.apache.org/docs/latest/sql-data-sources-jdbc.html

In [10]:
# PUSH DOWN OPTIMIZATIONS
  # SEE:  https://docs.databricks.com/spark/latest/data-sources/sql-databases.html#establish-connectivity-to-sql-server 

df = spark.read.jdbc(jdbcUrl, "diamonds", connectionProperties).select("carat", "cut", "price").where("cut = 'Good'").explain(true)

## PostgreSQL 

In [None]:
## Either IP or FQDN work fine!

jdbc_server_IP = 'xxx.xx.xx.x'
jdbcHostname = "xxxxxx.postgres.database.azure.com"   
jdbcDatabase = "xxxxx"
jdbcPort = 5432
jdbcUsername = dbutils.secrets.get(scope = "azure-kv", key = "xxxxx")
jdbcPassword = dbutils.secrets.get(scope = "azure-kv", key = "xxxxx")
Postgres_driver = "org.postgresql.Driver"
jdbcUrl = "jdbc:postgresql://{0}:{1}/{2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
jdbcUrl_IP = "jdbc:postgresql://{0}:{1}/{2}".format(jdbc_server_IP, jdbcPort, jdbcDatabase)
connectionProperties = {
  "user" : jdbcUsername,
  "password" : jdbcPassword,
  "driver" : Postgres_driver
}

In [None]:
query =  """
SELECT 
     col1
     ,col2
     ,col3
  FROM DBtest.dbo.test_table
  where col3 = 'test'
"""

df1 = spark.read\
.format("jdbc")\
.option("url", jdbcUrl)\
.option("driver", Postgres_driver)\
.option("dbtable", query)\
.option("user", jdbcUsername)\
.option("password", jdbcPassword)\
.option("fetchsize", 10000)\
.option("numPartitions", 8)\
.load()