# 3. ETL with Spark (Export / Transform / Load)

load the libraries we'll need

In [None]:
import os
# https://search.maven.org/artifact/com.memsql/memsql-spark-connector_2.11
# https://search.maven.org/artifact/com.microsoft.azure/azure-sqldb-spark

args = ('--packages'
  ' "com.memsql:memsql-spark-connector_2.11:3.0.0-spark-2.4.4'
  ',com.microsoft.azure:azure-sqldb-spark:1.0.2"'
  ' pyspark-shell')

os.environ['PYSPARK_SUBMIT_ARGS'] = args

get the spark context

In [None]:
!pip install findspark
import findspark
findspark.init()

import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession

sc = SparkContext(appName="ETLApp")
# or
# sc = SparkContext('local[*]')
# or
# conf = SparkConf()
# conf.setMaster("local").setAppName("SparkApp")
#sc = SparkContext.getOrCreate(conf=conf)

spark = SparkSession(sc)

## Load data from SQL Server

get the whole table

In [None]:
server = "sqlserver"
database = "tpch"
table = "dbo.line_item"
user = "sa"
password = "P@SSw0rd."

jdbcUrl = f"jdbc:sqlserver://{server}:1433;database={database}"
connectionProperties = {
  "user" : user,
  "password": password,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
mssqldata = spark.read.jdbc(url=jdbcUrl, table=table, properties=connectionProperties)

we haven't loaded the data yet, only crafted the pipeline

let's look at the data just in case

In [None]:
display(mssqldata)
mssqldata.limit(2).show()

we can also run SQL queries

parenthasis around query are required: https://docs.microsoft.com/en-us/azure/databricks/data/data-sources/sql-databases#push-down-a-query-to-the-database-engine

In [None]:
server = "sqlserver"
database = "tpch"
table = "dbo.line_item"
user = "sa"
password = "P@SSw0rd."

jdbcUrl = f"jdbc:sqlserver://{server}:1433;database={database}"
connectionProperties = {
  "user" : user,
  "password": password,
  "driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
pushdown_query="(SELECT top 10 * FROM dbo.line_item) line_item"
mssqldata2 = spark.read.jdbc(url=jdbcUrl, table=pushdown_query, properties=connectionProperties)

In [None]:
display(mssqldata2)
mssqldata2.collect()

can filter in spark too, but now we've pulled back the whole table and then thrown data away

In [None]:
selectedcols = mssqldata.select("l_partkey", "l_quantity", "extended_price").groupBy("l_partkey").avg("extended_price")
display(selectedcols)
selectedcols.collect()

## Save Data to MemSQL


let's first select some data to make sure we can connect correctly

In [None]:
# ddlEndpoint is MemSQL's word for server name
memsqldata = spark.read.format("memsql") \
    .option("ddlEndpoint", "memsql") \
    .option("user", "root") \
    .option("password", "") \
    .option("database", "tpch") \
    .load("line_item")

In [None]:
display(memsqldata)
memsqldata.collect()

now let's save all the sql server data into memsql

In [None]:
mssqldata.write \
    .format("memsql") \
    .option("ddlEndpoint", "memsql") \
    .option("user", "root") \
    .option("password", "") \
    .option("database", "foo") \
    .option("loadDataCompression", "LZ4") \
    .option("truncate", "false") \
    .mode("append") \
    .save("line_item") # format: database.table or option("database", "...
# creates table if not exit

look at data in MemSQL: http://localhost:8080
```
use tpch;
select * from line_item;
```