# Spark SQL Analytics with the Iguazio Data Science Platform

- [Overview](#overview)
- [Set Up](#setup)
- [Initiate a Spark Session and Configure Spark](#initiate-a-spark-session-and-configure-spark)
  - [Modify the Spark Configuration (Optional)](#modify-the-spark-cfg)
- [Load Data into a Spark DataFrame](#load-data)
  - [Load Data from Amazon S3](#load-data-from-amazon-s3)
  - [Load Data from an External Table](#load-data-from-external-table)
  - [Load Data from a-Semi-Structured File](#load-data-from-semi-structured-file)
  - [Load Data from an Unstructured File](#load-data-from-unstructured-file)
  - [Overwrite the Table Schema](#overwrite-table-schema)
- [Use Spark SQL](#spark-sql)
  - [Spark SQL on an Object](#spark-sql-on-object)
  - [Spark SQL on a Table](#spark-sql-on-table)
  - [Spark SQL on Platform NoSQL Data](#spark-sql-on-platform-nosql-data)
  - [Spark SQL Join](#spark-sql-join)
  - [Spark SQL on a Parquet File](#spark-sql-on-parquet)
  - [Spark SQL on a Partitioned Table](#spark-sql-on-partitioned-table)
- [Perform Conditional Data Updates](#conditional-update)
- [Cleanup](#Cleanup)

<a id="overview"></a>
## Overview

Spark SQL is an Apache Spark module for working with structured data.
IT lets you query structured data inside Spark programs by using either SQL or a familiar DataFrame API.
DataFrames and SQL provide a common way to access a variety of data sources.

In this notebook, you'll learn how to use Spark SQL and DataFrames to access objects, tables, and unstructured data that persist in the data containers of the [Iguazio Data Science Platform](https://www.iguazio.com/) (**the platform**).

The platform's Spark drivers implement the data-source API and support predicate push down: the queries are passed to the platform's data store, which returns only the relevant data.
This allow accelerated and high-speed access from Spark to data stored in the platform.

For more, details read the [Spark SQL and DataFrames documentation](https://spark.apache.org/docs/2.3.1/sql-programming-guide.html) and the overview in platform's [Spark APIs Reference](https://www.iguazio.com/docs/reference/latest-release/api-reference/spark-apis/overview/).

<a id="setup"></a>
## Set Up

Before preparing the data, you need to define some environment variables that will be used in the following steps of this tutorial:

In [1]:
import os

## Iguazio Data Science Platform Variables
# Directory for stocks
%env DIR1 = examples

## AWS Credentials and Bucket Variables
# TODO: Replace the <...> placedholders with your specific data.
%env AWS_USER = '<your_aws_user>'
%env AWS_PASSWORD = '<your_aws_password>'
%env BUCKET = '<your_s3_bucket_name_here>'
%env ACCESSKEY = '<your_accesskey.'
%env SECRETKEY = '<your_secretkey>'

env: DIR1=examples
env: AWS_USER='<your_aws_user>'
env: AWS_PASSWORD='<your_aws_password>'
env: BUCKET='<your_s3_bucket_name_here>'
env: ACCESSKEY='<your_accesskey.'
env: SECRETKEY='<your_secretkey>'


<a id="initiate-a-spark-session-and-configure-spark"></a>
## Initiate a Spark Session and Configure Spark

Begin by initiating a new Spark session and checking the default Spark configuration for this session:

In [2]:
%%time

from pyspark.conf import SparkConf
from pyspark.sql import SparkSession
from pyspark.sql.functions import concat, col

# Initiate a new Spark Session
spark = SparkSession.builder.appName("Spark Session with Default Configurations").getOrCreate()

# Retrieve and view all the default Spark configurations:
# conf = spark.sparkContext._conf.getAll()
# print(conf)
conf = spark.sparkContext._conf

print('\n\n## Initial Spark Configuration ##\n')

print('spark.app.name = ', conf.get("spark.app.name"))
print('spark.driver.cores = ', conf.get("spark.driver.cores"))
print('spark.driver.memory = ', conf.get("spark.driver.memory"))
print('spark.executor.cores = ', conf.get("spark.executor.cores"))
print('spark.executor.memory = ', conf.get("spark.executor.memory"))
print('spark.cores.max = ', conf.get("spark.cores.max"))

print('spark.python.profile = ', conf.get("spark.python.profile"))
print('spark.pyspark.python = ', conf.get("spark.pyspark.python"))
print('spark.hadoop.fs.s3a.impl = ', conf.get("spark.hadoop.fs.s3a.impl"))



## Initial Spark Configuration ##

spark.app.name =  Spark Session with Default Configurations
spark.driver.cores =  None
spark.driver.memory =  None
spark.executor.cores =  1
spark.executor.memory =  2G
spark.cores.max =  4
spark.python.profile =  None
spark.pyspark.python =  None
spark.hadoop.fs.s3a.impl =  None
CPU times: user 189 ms, sys: 34 ms, total: 223 ms
Wall time: 3.82 s


<a id="modify-the-spark-cfg"></a>
### Modify the Spark Configuration (Optional)

You may need to modify the default Spark configuration to match your specific requirements and resources and optimize performance.
The nature of your datasets and data models, the data-access methods that you select to use, and your hardware resources are all relevant factors in selecting your configuration.
The [Test the SQL Performance on a Partitioned NoSQL Table with Different Spark Configurations](#test-sql-perf-on-partitioned-nosql-table-w-different-spark-cfgs) section of this tutorial demonstrates how to test Spark SQL performance on a partitioned NoSQL table in the platform with different Spark configurations.

The following Spark configuration properties are especially worth noting:
- `spark.driver.cores`
- `spark.driver.memory`
- `spark.executor.cores`
- `spark.executor.memory`
- `spark.cores.max`
- `spark.python.profile`
- `spark.pyspark.python`

To access data in an AWS S3 bucket, perform the following configurations; replace the `<...>` placeholders with your specific data:
- `spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem`
- `spark.hadoop.fs.s3a.access.key=<your access key>`
- `spark.hadoop.fs.s3a.secret.key=<your secret key>`
- `spark.hadoop.fs.s3a.fast.upload=true`

For detailed information about configuring Spark, read the [Spark documentation](https://spark.apache.org/docs/2.3.1/configuration.html#dynamically-loading-spark-properties).<br>
For further performance services and support, contact Iguazio's [customer-success team](https://www.iguazio.com/support/).

The following code demonstrates how to modify the Spark configuration.

In [3]:
# Modify the default Spark configurations, as needed.
# The following example uses a single m5.2xlarge application node (8 CPUs, 32 GB):
'''
conf = spark.sparkContext._conf\
    .set("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")
'''
conf = spark.sparkContext._conf\
    .setAll([('spark.app.name', 'Spark SQL for Analytics'), \
             # ('spark.driver.cores', '2'), \ # Only in cluster mode.
             ('spark.driver.memory','2g'), 
             ('spark.executor.cores', '2'), \
             ('spark.executor.memory', '4g'), \
             ('spark.cores.max', '3'), \
             ('spark.python.profile', 'true'), \
             ('spark.pyspark.python', 'true'), \
             ("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")])

# Stop the current Spark Session
spark.sparkContext.stop()

# Create a Spark Session with new configurations
spark = SparkSession.builder.config(conf=conf).appName("Spark SQL Analytics - ipynb").getOrCreate()

print('\n\n## Modified Spark Configuration ##\n')

print('spark.app.name = ', conf.get("spark.app.name"))
print('spark.driver.cores = ', conf.get("spark.driver.cores"))
print('spark.driver.memory = ', conf.get("spark.driver.memory"))
print('spark.executor.cores = ', conf.get("spark.executor.cores"))
print('spark.executor.memory = ', conf.get("spark.executor.memory"))
print('spark.cores.max = ', conf.get("spark.cores.max"))

print('spark.python.profile = ', conf.get("spark.python.profile"))
print('spark.pyspark.python = ', conf.get("spark.pyspark.python"))
print('spark.hadoop.fs.s3a.impl = ', conf.get("spark.hadoop.fs.s3a.impl"))

print('\n')



## Modified Spark Configuration ##

spark.app.name =  Spark SQL for Analytics
spark.driver.cores =  None
spark.driver.memory =  2g
spark.executor.cores =  2
spark.executor.memory =  4g
spark.cores.max =  3
spark.python.profile =  true
spark.pyspark.python =  true
spark.hadoop.fs.s3a.impl =  org.apache.hadoop.fs.s3a.S3AFileSystem




<a id="load-data"></a>
## Load Data into a Spark DataFrame

The Spark Data Sources API supports a pluggable mechanism for integration with structured data sources.  It is a unified API designed to support two major operations:

1. Loading structured data from an external data source into Spark.
2. Storing structured data from Spark into an external data source.

<a id="load-data-from-amazon-s3"></a>
### Load Data from Amazon S3

Load a file from an Amazon S3 bucket into a Spark DataFrame.<br>
The URL of the S3 file should be of the form `s3a://bucket/path/to/file`.

In [4]:
# !pip install botocore

In [None]:
import botocore.session

session = botocore.session.get_session()
credentials = session.get_credentials()

# Create a Spark Session
spark = SparkSession
    .builder
    .config(
        'spark.driver.extraClassPath', 
        '/spark/3rd_party/aws-java-sdk-1.11.335.jar:'
        '/spark/3rd_party/hadoop-aws-2.8.4.jar')
    .config('fs.s3a.access.key', credentials.access_key)
    .config('fs.s3a.secret.key', credentials.secret_key)
    .appName("Load from S3")
    .getOrCreate()
    
# S3 object 's3a://bucket/path/to/file'
s3Obj = "s3a://iguazio-sample-data.s3.amazonaws.com/2018-03-26_BINS_XETR08.csv"

# Load to Spark DF
df = spark.read \
    .format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(s3Obj)

# Stop Spark Session
spark.stop()

#### Copy a File from an AWS S3 Bucket to the Platform

Alternatively, you can first copy the data to a platform data container.

##### Create a Directory in a Platform Data Container

Create a directory (`DIR1`) in your user home directory in the "users" platform data container (`V3IO_HOME`).

In [5]:
!mkdir -p /v3io/${V3IO_HOME}/${DIR1}

##### Copy a CSV file from an AWS S3 Bucket to the Platform

Copy a CSV file from an Amazon Simple Storage (S3) bucket to a **stocks.csv** file in a platform data container.

In [6]:
!curl -L "iguazio-sample-data.s3.amazonaws.com/2018-03-26_BINS_XETR08.csv" > /v3io/${V3IO_HOME}/${DIR1}/stocks.csv

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  861k  100  861k    0     0   975k      0 --:--:-- --:--:-- --:--:--  974k


##### List Files in a Platform Data-Container Directory

In [7]:
!ls -altr /v3io/${V3IO_HOME}/${DIR1}

total 0
drwxrwxrwx 2 50 nogroup      0 Mar 24 22:05 family_tab
drwxrwxrwx 2 50 nogroup      0 Mar 24 22:06 family
drwxrwxrwx 2 50 nogroup      0 Mar 24 22:27 mytable
drwxrwxrwx 2 50 nogroup      0 Mar 24 22:29 stocks_kv
drwxr-xr-x 2 50 nogroup      0 Mar 24 22:30 stocks_parq
drwxr-xr-x 2 50 nogroup      0 Mar 24 22:31 examples
drwxrwxrwx 2 50 nogroup      0 Mar 24 22:31 weather
-rw-r--r-- 1 50 nogroup 882055 Mar 24 22:43 stocks.csv


### Define Platform File-Path Variables

In [8]:
file_path = os.path.join(os.getenv('V3IO_HOME_URL')+'/examples')
file = os.path.join(file_path+'/stocks.csv')

### Load a File from a Platform Data Container into a Spark DataFrame

Read the CSV file that you saved to the platform data container into a Spark DataFrame.<br>
The following code example uses the `inferSchema` option to automatically infer the schema of the read data (recommended).
Alternatively, you can define the schema manually:

```python
schema = StructType([
    StructField("<field name>", <field type>, <is Null>),
   ...])
df = spark.read.schema(schema)
...
```

In [9]:
%%time

df = spark.read\
    .format("csv")\
    .option("header", "true")\
    .option("inferSchema", "true")\
    .load(file)

CPU times: user 2.56 ms, sys: 123 µs, total: 2.69 ms
Wall time: 556 ms


### Print the Schema

In [10]:
df.printSchema()

root
 |-- ISIN: string (nullable = true)
 |-- Mnemonic: string (nullable = true)
 |-- SecurityDesc: string (nullable = true)
 |-- SecurityType: string (nullable = true)
 |-- Currency: string (nullable = true)
 |-- SecurityID: integer (nullable = true)
 |-- Date: timestamp (nullable = true)
 |-- Time: string (nullable = true)
 |-- StartPrice: double (nullable = true)
 |-- MaxPrice: double (nullable = true)
 |-- MinPrice: double (nullable = true)
 |-- EndPrice: double (nullable = true)
 |-- TradedVolume: integer (nullable = true)
 |-- NumberOfTrades: integer (nullable = true)



### List Columns

In [11]:
df.columns

['ISIN',
 'Mnemonic',
 'SecurityDesc',
 'SecurityType',
 'Currency',
 'SecurityID',
 'Date',
 'Time',
 'StartPrice',
 'MaxPrice',
 'MinPrice',
 'EndPrice',
 'TradedVolume',
 'NumberOfTrades']

<a id="load-data-from-external-table"></a>
### Load Data from an External Table

In this section,  let's walk through two examples:

1. Use the PyMySQL Python MySQL client library and a pandas DataFrame to load data from a MySQL database.
2. Use Spark JDBC to read a table from AWS Redshift.


For more details read [read-external-db](read-external-db.ipynb) and [Spark JDBC to Databases](SparkJDBCtoDBs.ipynb)

<a id="load-data-from-external-table-mysql"></a>
#### Use MySQL as an External Data Source

##### Create a MySQL Database Connection

Read from a MySQL database as a bulk operation using pandas DataFrames.

> **AWS Cloud Note:** If you're running the notebook code from the AWS cloud, note that AWS S3 provides **eventual consistency**.
Therefore, it takes time for users using the persisted data and software package.

In [13]:
import os
import pymysql
import pandas as pd 

conn = pymysql.connect(
    host=os.getenv('DB_HOST','mysql-rfam-public.ebi.ac.uk'),
    port=int(4497),
    user=os.getenv('DB_USER','rfamro'),
    passwd=os.getenv('DB_PASSWORD',''),
    db=os.getenv('DB_NAME','Rfam'),
    charset='utf8mb4')

pdfMySQL = pd.read_sql_query("select rfam_acc,rfam_id,auto_wiki,description,author,seed_source FROM family",
    conn) 

pdfMySQL.tail(3)

Unnamed: 0,rfam_acc,rfam_id,auto_wiki,description,author,seed_source
3013,RF03113,Poribacteria-1,2702,Poribacteria-1 RNA,Weinberg Z,Weinberg Z
3014,RF03114,RT-1,2572,RT-1 RNA,Weinberg Z,Weinberg Z
3015,RF03115,KDPG-aldolase,2703,KDPG-aldolase RNA,Weinberg Z,Weinberg Z


##### Create a Spark DataFrame from a pandas DataFrame

In [15]:
dfMySQL = spark.createDataFrame(pdfMySQL)

##### Display Table Records

Display a few records of the "family" table that was read into the `dfMySQL` DataFrame in the previous steps.

In [16]:
dfMySQL.show(5)

+--------+---------+---------+-------------------+--------------------+--------------------+
|rfam_acc|  rfam_id|auto_wiki|        description|              author|         seed_source|
+--------+---------+---------+-------------------+--------------------+--------------------+
| RF00001|  5S_rRNA|     1302|   5S ribosomal RNA|Griffiths-Jones S...|Szymanski et al, ...|
| RF00002|5_8S_rRNA|     1303| 5.8S ribosomal RNA|Griffiths-Jones S...|Wuyts et al, Euro...|
| RF00003|       U1|     1304|U1 spliceosomal RNA|Griffiths-Jones S...|Zwieb C, The uRNA...|
| RF00004|       U2|     1305|U2 spliceosomal RNA|Griffiths-Jones S...|The uRNA database...|
| RF00005|     tRNA|     1306|               tRNA|Eddy SR, Griffith...|             Eddy SR|
+--------+---------+---------+-------------------+--------------------+--------------------+
only showing top 5 rows



##### Print the Table Schema

Print the schema of the "family" table that was read into the `dfMySQL` DataFrame.

In [17]:
dfMySQL.printSchema()

root
 |-- rfam_acc: string (nullable = true)
 |-- rfam_id: string (nullable = true)
 |-- auto_wiki: long (nullable = true)
 |-- description: string (nullable = true)
 |-- author: string (nullable = true)
 |-- seed_source: string (nullable = true)



##### Register as a Table for Spark SQL Queries

Define a temporary Spark view for running Spark SQL queries on the "family" table that was read into the `dfMySQL` DataFrame.

In [18]:
dfMySQL.createOrReplaceTempView("family")

##### Count Table Records

Use Spark SQL to count the number records in the "family" table.

In [19]:
spark.sql("SELECT COUNT(*) FROM family").show()

+--------+
|count(1)|
+--------+
|    3016|
+--------+



##### Check for a Unique Key

Check whether the `auto_wiki` column can serve as a unique key (attribute) of the "family" table.

In [20]:
spark.sql("SELECT COUNT(distinct(auto_wiki)) FROM family").show()

+-------------------------+
|count(DISTINCT auto_wiki)|
+-------------------------+
|                     1345|
+-------------------------+



<a id="load-data-from-external-table-amazon-redshift"></a>
#### Use Amazon Redshift as an External Data Source

The **spark-redshift** library is a data source API for [Amazon Redshift](https://aws.amazon.com/redshift/).

**Spark driver to Redshift:** The Spark driver connects to Redshift via JDBC using a username and password.
Redshift doesn't support the use of IAM roles to authenticate this connection.

**Spark to AWS S3:** S3 acts as a middleman to store bulk data when reading from or writing to Redshift.

#####  Create an Amazon S3 Bucket

Create an Amazon S3 bucket named "redshift-spark".

In [21]:
tmpS3Dir = "s3n://redshift-spark/tmp/"

##### Set Up Your Redshift Environment

In [22]:
redshiftDBName = '<your_redshift_DB_name>'
redshiftTableName = '<your_redshift_Table_name>'
redshiftUserId = '<your_redshift_User_ID>'
redshiftPassword = '<your_redshift_Password>'
redshifturl = '<your_redshift_URL>'
jdbcURL = s"jdbc:redshift://$redshifturl/$redshiftDBName?user=$redshiftUserId&password=$redshiftPassword"

##### Load a Redshift Table into a Spark DataFrame

The `.format("com.databricks.spark.redshift")` line tells the Spark Data Sources API that you're using the **spark-redshift** package.<br>
Enable **spark-redshift** to use the **tmpS3Dir** temporary location in the S3 bucket to store temporary files generated by **spark-redshift**.

In [23]:
dfRDSHFT = spark.read
	.format("com.databricks.spark.redshift")
	.option("url",jdbcURL )
	.option("tempdir", tmpS3Dir)
	.option("dbtable", redshiftTableName)
	.load()

##### Check the Table

Print the table schema and show a few records.<br>
`spark-redshift` automatically reads the schema from the Redshift table and maps its types back to Spark SQL's types.

In [24]:
dfRDSHFT.printSchema()
dfRDSHFT.show(3)

##### Persist the Redshift Table Data into the Platform's NoSQL Store

In [25]:
dfRDSHFT = spark.write\
    .format("io.iguaz.v3io.spark.sql.kv")\
    .mode("append")\
    .option("key", key)\
    .option("sorting-key", sorting-key)\
    .option("allow-overwrite-schema", "true")\
    .save(os.path.join(os.getenv('V3IO_HOME'))+'/rdshfttable/')

<a id="load-data-from-semi-structured-file"></a>
### Load Data from a-Semi-Structured File

In [26]:
# Replace PATH_TO_A_JSON by the full URL of a JSON file, and remove the comment sign.
# dfJSON = spark.read.json("PATH_TO_A_JSON")

jsonFile = os.path.join(os.getenv('V3IO_HOME_URL')+'/examples/mLines.json')

dfJSON = spark.read \
    .option("multiline", "true") \
    .json(jsonFile )

In [27]:
dfJSON.printSchema()

root
 |-- array: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- dict: struct (nullable = true)
 |    |-- key: string (nullable = true)
 |-- int: long (nullable = true)
 |-- string: string (nullable = true)



<a id="load-data-from-unstructured-file"></a>
### Load Data from an Unstructured File

> **Note:** Beginning with version 2.4, Spark supports loading images.

In [None]:
# Replace PATH_TO_AN_IMAGE by the full URL of a JSON file, and remove the comment sign.
# dfImage = spark.read.format("image").option("dropInvalid", true).load("PATH_TO_AN_IMAGE")

imageFile = os.path.join(os.getenv('V3IO_HOME_URL')+'/examples/CoffeeTime.jpg')

dfImage = spark.read.format("image").option("dropInvalid", "true").load(imageFile)

dfImage.select("image.origin", "image.width", "image.height").show(truncate=false)

<a id="overwrite-table-schema"></a>
### Overwrite the Table Schema

The following example creates a table named mytable with AttrA and AttrB attributes of type string and an AttrC attribute of type long, and then overwrites the table schema to change the type of AttrC to double:

In [28]:
dfOWSchema = spark.createDataFrame([
    ("a", "z", 123),
    ("b", "y", 456)
], ["AttrA", "AttrB", "AttrC"])
dfOWSchema.write.format("io.iguaz.v3io.spark.sql.kv") \
    .mode("overwrite") \
    .option("key", "AttrA") \
    .save(os.path.join(file_path)+'/mytable/')
    
dfOWSchema = spark.createDataFrame([
    ("c", "x", 32.12),
    ("d", "v", 45.2)
], ["AttrA", "AttrB", "AttrC"])
dfOWSchema.write.format("io.iguaz.v3io.spark.sql.kv") \
    .mode("append") \
    .option("key", "AttrA") \
    .option("allow-overwrite-schema", "true") \
    .save(os.path.join(file_path)+'/mytable/')

<a id="spark-sql"></a>
## Use Spark SQL

Now, some Spark SQL queries to analyze the dataset that was loaded into `df` Spark DataFrame.<br>
The first SQL queries list a few lines of selected columns in the dataset and retrieve some statistics of numerical columns.

<a id="spark-sql-on-object"></a>
### Spark SQL on an Object

In [29]:
df.select("ISIN", "Mnemonic", "SecurityDesc", "SecurityType").show(3)

+------------+--------+--------------------+------------+
|        ISIN|Mnemonic|        SecurityDesc|SecurityType|
+------------+--------+--------------------+------------+
|CH0038389992|    BBZA|BB BIOTECH NAM.  ...|Common stock|
|CH0038863350|    NESR|NESTLE NAM.      ...|Common stock|
|LU0378438732|    C001|COMSTAGE-DAX UCIT...|         ETF|
+------------+--------+--------------------+------------+
only showing top 3 rows



#### Retrieve Data from the First Rows

In [30]:
df.select("ISIN", "Mnemonic", "SecurityDesc").head(3)

[Row(ISIN='CH0038389992', Mnemonic='BBZA', SecurityDesc='BB BIOTECH NAM.   SF 0,20'),
 Row(ISIN='CH0038863350', Mnemonic='NESR', SecurityDesc='NESTLE NAM.        SF-,10'),
 Row(ISIN='LU0378438732', Mnemonic='C001', SecurityDesc='COMSTAGE-DAX UCITS ETF I')]

#### Summary and Descriptive Statistics

The function `describe` returns a DataFrame containing information such as the number of non-null entries (`count`), mean, standard deviation (`stddev`), and the minimum (`min`) and maximum (`max`) values for each numerical column.

In [32]:
df.describe("TradedVolume").show()

+-------+------------------+
|summary|      TradedVolume|
+-------+------------------+
|  count|              7401|
|   mean|3035.7574652074045|
| stddev|18191.489026530675|
|    min|                 0|
|    max|            839200|
+-------+------------------+



<a id="spark-sql-on-table"></a>
### Spark SQL on a Table

#### Register a Table View for Further Analytics

In [33]:
df.createOrReplaceTempView("stock")

#### Select a Few Columns and Only Print a Few Lines

In [34]:
q = spark.sql("SELECT ISIN, SecurityDesc, SecurityID FROM stock limit 3").show()

+------------+--------------------+----------+
|        ISIN|        SecurityDesc|SecurityID|
+------------+--------------------+----------+
|CH0038389992|BB BIOTECH NAM.  ...|   2504244|
|CH0038863350|NESTLE NAM.      ...|   2504245|
|LU0378438732|COMSTAGE-DAX UCIT...|   2504271|
+------------+--------------------+----------+



#### Analyze Data to Identify Unique-Key Columns

In [35]:
q1 = spark.sql("SELECT COUNT(ISIN) FROM stock").show()
q2 = spark.sql("SELECT COUNT(DISTINCT(ISIN)) FROM stock").show()

+-----------+
|count(ISIN)|
+-----------+
|       7401|
+-----------+

+--------------------+
|count(DISTINCT ISIN)|
+--------------------+
|                 737|
+--------------------+



In [36]:
q4 = spark.sql("SELECT COUNT(SecurityID) FROM stock").show()
q5 = spark.sql("SELECT COUNT(DISTINCT(SecurityID)) FROM stock").show()

+-----------------+
|count(SecurityID)|
+-----------------+
|             7401|
+-----------------+

+--------------------------+
|count(DISTINCT SecurityID)|
+--------------------------+
|                       737|
+--------------------------+



A combination of `ISIN`, `Date`, and `Time` can serve as a unqiue key:

In [37]:
q6 = spark.sql("SELECT COUNT(DISTINCT(ISIN, Date, Time)) FROM stock").show()

+----------------------------------------------------------------+
|count(DISTINCT named_struct(ISIN, ISIN, Date, Date, Time, Time))|
+----------------------------------------------------------------+
|                                                            7401|
+----------------------------------------------------------------+



#### Concatenate Date and Time Columns

In [38]:
df.select(concat(col("Date"), col("Time"))).head(2)

[Row(concat(Date, Time)='2018-03-26 00:00:0008:00'),
 Row(concat(Date, Time)='2018-03-26 00:00:0008:00')]

In [40]:
df.withColumn("datetime", concat(col("Date"), col("Time")))\
    .select("Date", "Time", "datetime").head(3)

[Row(Date=datetime.datetime(2018, 3, 26, 0, 0), Time='08:00', datetime='2018-03-26 00:00:0008:00'),
 Row(Date=datetime.datetime(2018, 3, 26, 0, 0), Time='08:00', datetime='2018-03-26 00:00:0008:00'),
 Row(Date=datetime.datetime(2018, 3, 26, 0, 0), Time='08:00', datetime='2018-03-26 00:00:0008:00')]

#### Register Another Table with a Unique Key

In [42]:
df.withColumn("datetime", concat(df["Date"], df["Time"])).createOrReplaceTempView("stock_UUID")

#### Verify that the Key is Unique

In [43]:
q7 = spark.sql("SELECT COUNT(DISTINCT(ISIN, datetime)) FROM stock_UUID").show()

+------------------------------------------------------------+
|count(DISTINCT named_struct(ISIN, ISIN, datetime, datetime))|
+------------------------------------------------------------+
|                                                        7401|
+------------------------------------------------------------+



#### Get Distinct Values on a Column

In [44]:
%%time
q8 = spark.sql("SELECT COUNT(DISTINCT(datetime)) FROM stock_UUID").show()

+------------------------+
|count(DISTINCT datetime)|
+------------------------+
|                      60|
+------------------------+

CPU times: user 0 ns, sys: 2.05 ms, total: 2.05 ms
Wall time: 488 ms


Results show that **all data in this dataset is of the same date.**

In [45]:
%time
q9 = spark.sql("SELECT COUNT(DISTINCT(Time)) FROM stock_UUID").show()

CPU times: user 2 µs, sys: 0 ns, total: 2 µs
Wall time: 4.53 µs
+--------------------+
|count(DISTINCT Time)|
+--------------------+
|                  60|
+--------------------+



<a id="spark-sql-on-platform-nosql-data"></a>
### Spark SQL on Platform NoSQL Data

#### Persist Data from a Spark DataFrame to a Platform NoSQL Table

The following code demonstrates how to write data from a Spark DataFrame to a NoSQL table in the persistent memory in a platform data container.

Note:
- The data-source format for the platform's NoSQL data store is `io.iguaz.v3io.spark.sql.kv`.
- The path to the NoSQL table that is associated with the DataFrame should be defined as a fully qualified path of the format `v3io://<data container>/<table path>` &mdash; where `<data container>` is the name of the table's parent data container and `<table path>` is the relatve path to the data within the specified container.
- You must use the `key` option to define the table's primary key attribute (column). Note that the value of the primary-key attributes must be unique.<br>
  You can also ptionally use the platform's custom `sorting-key` option to define a sorting-key attribute for the table (which enablese performing range scans).<br>
  For more information, see the [platform documentation](https://www.iguazio.com/docs/concepts/latest-release/containers-collections-objects/#sharding-n-sorting-keys).

In [46]:
%%time

# Define thepath to your NoSQL table
kvStore = os.path.join(file_path+'/stocks_kv')

# UUID: key.sorting-key
# key: ISIN
# sorting-key: Date + Time
df.withColumn("datetime", concat(df["Date"], df["Time"]))\
    .write\
    .format("io.iguaz.v3io.spark.sql.kv")\
    .mode("append")\
    .option("key", "ISIN")\
    .option("sorting-key", "datetime")\
    .option("allow-overwrite-schema", "true")\
    .save(kvStore)

CPU times: user 867 µs, sys: 2.45 ms, total: 3.32 ms
Wall time: 1.83 s


<a id="read-data-from-nosql-table-to-spark-df"></a>
#### Load Data a NoSQL Table into a Spark DataFrame

In [47]:
df2 = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(kvStore)

In [48]:
%%time
df2.select("ISIN", "datetime").head(1)

CPU times: user 4.52 ms, sys: 71 µs, total: 4.59 ms
Wall time: 165 ms


[Row(ISIN='GB00B128C026', datetime='2018-03-26 00:00:0008:52')]

In [50]:
df2.createOrReplaceTempView("stock_kv")

In [51]:
%%time

q10 = spark.sql("SELECT ISIN, SUM(TradedVolume) FROM stock_kv GROUP BY ISIN").show(5)

+------------+-----------------+
|        ISIN|sum(TradedVolume)|
+------------+-----------------+
|DE000A0H08M3|             5394|
|LU0488317024|               35|
|DE000A1K0375|             2000|
|DE0005570808|             3350|
|FI0009000681|            51473|
+------------+-----------------+
only showing top 5 rows

CPU times: user 728 µs, sys: 1.33 ms, total: 2.06 ms
Wall time: 827 ms


<a id="write-data-to-partitioned-nosql-table"></a>
#### Persist Data to a Partitioned NoSQL Table

Partions are firstly by `Date`, and then by `Time`. <br>

In [52]:
%%time

# Set Partitioned KV store name
kvStorePartition = os.path.join(file_path+'/stocks_kv_partition')

# UUID = key.sorting-key
# key: ISIN
# partition : Date, time
df.write\
    .format("io.iguaz.v3io.spark.sql.kv")\
    .mode("append")\
    .option("key", "ISIN")\
    .option("partition", "Date, Time")\
    .save(kvStorePartition)

CPU times: user 2.24 ms, sys: 68 µs, total: 2.3 ms
Wall time: 2.09 s


<a id="read-data-from-another-nosql-table-to-spark-df"></a>
#### Load Data from Another NoSQL Table into a Spark DataFrame

In [53]:
df3 = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(kvStorePartition)

In [54]:
%%time

df3.select("ISIN", "Date", "Time").show(3)

+------------+-------------------+-----+
|        ISIN|               Date| Time|
+------------+-------------------+-----+
|DE0006202005|2018-03-26 00:00:00|08:00|
|DE0005439004|2018-03-26 00:00:00|08:00|
|DE0005419105|2018-03-26 00:00:00|08:00|
+------------+-------------------+-----+
only showing top 3 rows

CPU times: user 2.1 ms, sys: 1.06 ms, total: 3.16 ms
Wall time: 188 ms


<a id="test-sql-perf-on-partitioned-nosql-table-w-different-spark-cfgs"></a>
#### Test the SQL Performance on a Partitioned NoSQL Table with Different Spark Configurarions

Start out by running the following code to test the performance when using Spark SQL to access a partitioned NoSQL table in the platform with the default [Spark configuration](#initiate-a-spark-session-and-configure-spark).

> **Note:** The default Spark configuration doesn't provide good support for the partition data model of the test table, and therefore the test query is expected to hang.

In [55]:
df3.createOrReplaceTempView("stock_kv_partintion")

In [None]:
%%time
# 1*m5.2xlarge: Spark 4 executors, 2 cores, and 1 GB per executor; never returns results
# 1*m5.2xlarge: Spark 2 executors, 2 cores, and 4 GB per executor; never returns results
%debug
q11 = spark.sql("SELECT ISIN, SUM(TradedVolume) FROM stock_kv_partintion GROUP BY ISIN").show(5)
q11 = spark.sql("SELECT ISIN, SUM(TradedVolume) FROM stock_kv_partintion GROUP BY ISIN").explain

<a id="test-sql-perf-on-partitioned-nosql-table-spark-cfg-experiments"></a>
##### Experiment with Different Spark Configurations

The following experiments run the same query with different Spark configurations.
To perform the experiments, you need to modify your Spark configuration according to the information in the comments of each job.
For information on how to modify the Spark configuration, see the [Modify the Spark Configuration](#modify-the-spark-cfg) section of this tutorial.

In [51]:
%%time
# 1*m5.2xlarge: Spark 4 executor, 2core and 1G per executor

q12 = spark.sql("SELECT Date, SUM(TradedVolume) FROM stock_kv_partintion GROUP BY Date").show()

+-------------------+-----------------+
|               Date|sum(TradedVolume)|
+-------------------+-----------------+
|2018-03-26 00:00:00|         22467641|
+-------------------+-----------------+

CPU times: user 638 µs, sys: 1.3 ms, total: 1.93 ms
Wall time: 971 ms


In [56]:
%%time
# 1*m5.2xlarge: Spark 2 executor, 2core and 4G per executor,

q12 = spark.sql("SELECT Date, SUM(TradedVolume) FROM stock_kv_partintion GROUP BY Date").show()

+-------------------+-----------------+
|               Date|sum(TradedVolume)|
+-------------------+-----------------+
|2018-03-26 00:00:00|         22467641|
+-------------------+-----------------+

CPU times: user 1.87 ms, sys: 268 µs, total: 2.14 ms
Wall time: 1.22 s


In [57]:
%%time
# 1*m5.2xlarge: Spark 4 executor, 1core and 4G per executor,

q12 = spark.sql("SELECT Date, SUM(TradedVolume) FROM stock_kv_partintion GROUP BY Date").show()

+-------------------+-----------------+
|               Date|sum(TradedVolume)|
+-------------------+-----------------+
|2018-03-26 00:00:00|         22467641|
+-------------------+-----------------+

CPU times: user 786 µs, sys: 1.35 ms, total: 2.13 ms
Wall time: 1.17 s


In [58]:
%%time
# 1*m5.2xlarge: Spark 3 executor, 1core and 8G per executor,

q12 = spark.sql("SELECT Date, SUM(TradedVolume) FROM stock_kv_partintion GROUP BY Date").show()

+-------------------+-----------------+
|               Date|sum(TradedVolume)|
+-------------------+-----------------+
|2018-03-26 00:00:00|         22467641|
+-------------------+-----------------+

CPU times: user 2.04 ms, sys: 0 ns, total: 2.04 ms
Wall time: 716 ms


In [59]:
%%time
# 1*m5.2xlarge: Spark 6 executor, 1core and 4G per executor,

q12 = spark.sql("SELECT Date, SUM(TradedVolume) FROM stock_kv_partintion GROUP BY Date").show()

+-------------------+-----------------+
|               Date|sum(TradedVolume)|
+-------------------+-----------------+
|2018-03-26 00:00:00|         22467641|
+-------------------+-----------------+

CPU times: user 2.38 ms, sys: 0 ns, total: 2.38 ms
Wall time: 797 ms


In [60]:
%%time
# 1*m5.2xlarge: Spark 3 executor, 1core and 8G per executor,

q12 = spark.sql("SELECT Date, SUM(TradedVolume) FROM stock_kv_partintion GROUP BY Date").show()

+-------------------+-----------------+
|               Date|sum(TradedVolume)|
+-------------------+-----------------+
|2018-03-26 00:00:00|         22467641|
+-------------------+-----------------+

CPU times: user 950 µs, sys: 1.04 ms, total: 1.99 ms
Wall time: 798 ms


In [61]:
%%time
# 1*m5.2xlarge: Spark 1 executor, 1core and 20G per executor,

q12 = spark.sql("SELECT Date, SUM(TradedVolume) FROM stock_kv_partintion GROUP BY Date").show()

+-------------------+-----------------+
|               Date|sum(TradedVolume)|
+-------------------+-----------------+
|2018-03-26 00:00:00|         22467641|
+-------------------+-----------------+

CPU times: user 1.55 ms, sys: 512 µs, total: 2.06 ms
Wall time: 808 ms


<a id="spark-sql-join"></a>
### Spark SQL Join

In [62]:
dfL = spark.createDataFrame([("2504271", "LU0378438732")], ["SecurityID", "ISIN"])
dfR = spark.createDataFrame([("2504271", "JOIN in Spark SQL")], ["SecurityID", "SQL Query"])

In [63]:
dfJoin = dfL.join(dfR, dfL.SecurityID == dfR.SecurityID).show()

+----------+------------+----------+-----------------+
|SecurityID|        ISIN|SecurityID|        SQL Query|
+----------+------------+----------+-----------------+
|   2504271|LU0378438732|   2504271|JOIN in Spark SQL|
+----------+------------+----------+-----------------+



In [64]:
dfL.createOrReplaceTempView("t1")
dfR.createOrReplaceTempView("t2")

In [65]:
qJoin = spark.sql("SELECT * FROM t1, t2 where t1.SecurityID=t2.SecurityID").show()

+----------+------------+----------+-----------------+
|SecurityID|        ISIN|SecurityID|        SQL Query|
+----------+------------+----------+-----------------+
|   2504271|LU0378438732|   2504271|JOIN in Spark SQL|
+----------+------------+----------+-----------------+



<a id="spark-sql-on-parquet"></a>
### Spark SQL on a Parquet File

#### Persist Data into Iguazio Data Container in Parquet format

Use the same stock dataset to store in Parquet format.

In [66]:
%%time

parqFile = os.path.join(file_path+'/stocks_parq')

df.write\
    .mode("overwrite")\
    .parquet(parqFile)

CPU times: user 1.4 ms, sys: 463 µs, total: 1.86 ms
Wall time: 542 ms


In [67]:
dfPARQ = spark.read.parquet(parqFile)

In [68]:
dfPARQ.select("ISIN", "Date").head()

Row(ISIN='CH0038389992', Date=datetime.datetime(2018, 3, 26, 0, 0))

<a id="spark-sql-on-partitioned-table"></a>
### Spark SQL on a Partitioned Table

#### Create a Partitioned Table

This examples creates a partitioned "weather" table.  The `option("partition", "year, month, day")` write option partitions the table by the year, month, and day item attributes. As demonstrated in the following image, if you browse the container in the dashboard after running the example, you'll see that the weather directory has **year=&lt;value&gt;/month=&lt;value&gt;/day=&lt;value&gt;** partition directories that match the written items. If you select any of the nested day partition directories, you can see the written items and their attributes. For example, the first item (with attribute values 2016, 3, 25, 6, 16, 0.00, 55) is saved to a 20163256 file in a **weather/year=2016/month=3/day=25** partition directory.

In [70]:
table_path = os.path.join(os.getenv('V3IO_HOME_URL')+'/examples/weather/')

df = spark.createDataFrame([
    (2016,  3, 25, 17, 18, 0.2, 62),
    (2016,  7, 24,  7, 19, 0.0, 52),
    (2016, 12, 24,  9, 10, 0.1, 47),
    (2017,  5,  7, 14, 21, 0.0, 70),
    (2017, 11,  1, 10, 15, 0.0, 34),
    (2017, 12, 12, 16, 12, 0.0, 47),
    (2017, 12, 24, 17, 11, 1.0, 50),
    (2018,  1, 18, 17, 10, 2.0, 45),
    (2018,  5, 20, 21, 20, 0.0, 59),
    (2018, 11,  1, 11, 11, 0.1, 65)
], ["year", "month", "day", "hour", "degrees_cel", "rain_ml", "humidity_per"])

df_with_key = df.withColumn(
    "time", concat(df["year"], df["month"], df["day"], df["hour"]))

df_with_key.write.format("io.iguaz.v3io.spark.sql.kv") \
    .mode("overwrite") \
    .option("key", "time") \
    .option("partition", "year, month, day, hour") \
    .save(table_path)

#### Reading from partition table

Following is the output of the example's show commands for each read. The filtered results are gathered by scanning only the partition directories that match the filter criteria.

##### Perform A Full Table Scan

In [71]:
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path)
readDF.show()

+----+-----+---+----+-----------+-------+------------+----------+
|year|month|day|hour|degrees_cel|rain_ml|humidity_per|      time|
+----+-----+---+----+-----------+-------+------------+----------+
|2016|   12| 24|   9|         10|    0.1|          47| 201612249|
|2016|    3| 25|  17|         18|    0.2|          62| 201632517|
|2016|    7| 24|   7|         19|    0.0|          52|  20167247|
|2017|   11|  1|  10|         15|    0.0|          34| 201711110|
|2017|   12| 12|  16|         12|    0.0|          47|2017121216|
|2017|   12| 24|  17|         11|    1.0|          50|2017122417|
|2017|    5|  7|  14|         21|    0.0|          70|  20175714|
|2018|    1| 18|  17|         10|    2.0|          45| 201811817|
|2018|   11|  1|  11|         11|    0.1|          65| 201811111|
|2018|    5| 20|  21|         20|    0.0|          59| 201852021|
+----+-----+---+----+-----------+-------+------------+----------+



##### Retrieve all data in the last six months of each year:

Filter: month > 6

In [72]:
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
    .filter("month > 6").show()

+----+-----+---+----+-----------+-------+------------+----------+
|year|month|day|hour|degrees_cel|rain_ml|humidity_per|      time|
+----+-----+---+----+-----------+-------+------------+----------+
|2016|   12| 24|   9|         10|    0.1|          47| 201612249|
|2016|    7| 24|   7|         19|    0.0|          52|  20167247|
|2017|   11|  1|  10|         15|    0.0|          34| 201711110|
|2017|   12| 12|  16|         12|    0.0|          47|2017121216|
|2017|   12| 24|  17|         11|    1.0|          50|2017122417|
|2018|   11|  1|  11|         11|    0.1|          65| 201811111|
+----+-----+---+----+-----------+-------+------------+----------+



##### Retrieve all hours in Dec 24 of each year:

Filter: month == 12 AND day == 24 

In [73]:
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
    .filter("month == 12 AND day == 24") \
    .show()

+----+-----+---+----+-----------+-------+------------+----------+
|year|month|day|hour|degrees_cel|rain_ml|humidity_per|      time|
+----+-----+---+----+-----------+-------+------------+----------+
|2016|   12| 24|   9|         10|    0.1|          47| 201612249|
|2017|   12| 24|  17|         11|    1.0|          50|2017122417|
+----+-----+---+----+-----------+-------+------------+----------+



##### Retrieve data during 08:00&ndash;20:00 each day in the last six months of each year

In [74]:
readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv").load(table_path) \
    .filter("month < 7 AND hour >= 8 AND hour <= 20") \
    .show()

+----+-----+---+----+-----------+-------+------------+---------+
|year|month|day|hour|degrees_cel|rain_ml|humidity_per|     time|
+----+-----+---+----+-----------+-------+------------+---------+
|2016|    3| 25|  17|         18|    0.2|          62|201632517|
|2017|    5|  7|  14|         21|    0.0|          70| 20175714|
|2018|    1| 18|  17|         10|    2.0|          45|201811817|
+----+-----+---+----+-----------+-------+------------+---------+



<a id="conditional-update"></a>
## Perform Conditional Data Updates

This example demonstrates how to conditionally update NoSQL table items by using a conditional write option.
Each `write` command in the example is followed by matching `read` and `show` commands to read and display the value of the updated item in the target table after the write operation.

<a id="conditional-update-generate-data"></a>
### Generate Data

The first write command writes an item (row) to a "cars" table. The item's `reg_license` primary-key (identity-column) attribute is set to 7843321, the mode attribute is set to "Honda", and the odometer attribute is set to `29321`. The `overwrite` save mode is used to overwrite the table if it already exists and create it otherwise. Reading the item from the table produces this output:

In [75]:
writeDF = spark.createDataFrame([("7843321", "Honda", 29321)],
                                ["reg_license", "model", "odometer"])

writeDF.write.format("io.iguaz.v3io.spark.sql.kv") \
    .option("key", "reg_license") \
    .mode("overwrite") \
    .save(os.path.join(os.getenv('V3IO_HOME_URL'))+'/cars/')

readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv") \
    .load(os.path.join(os.getenv('V3IO_HOME_URL'))+'/cars/') \
    .show()

+-----------+-----+--------+
|reg_license|model|odometer|
+-----------+-----+--------+
|    7843321|Honda|   29321|
+-----------+-----+--------+



<a id="conditional-update-perform-update"></a>
### Conditionally Update the Data

Update the odometer to `31718` on the condition that the new odometer value is greater than the old value.
This ensures that the `odometer` attribute (column) reflects the most updated value of the odometer.

In [76]:
writeDF = spark.createDataFrame([("7843321", "Honda", 31718)],
                                ["reg_license", "model", "odometer"])

writeDF.write.format("io.iguaz.v3io.spark.sql.kv") \
    .option("key", "reg_license") \
    .option("condition", "${odometer} > odometer") \
    .mode("append") \
    .save(os.path.join(os.getenv('V3IO_HOME_URL'))+'/cars/')

readDF = spark.read.format("io.iguaz.v3io.spark.sql.kv") \
    .load(os.path.join(os.getenv('V3IO_HOME_URL'))+'/cars/') \
    .show()

+-----------+-----+--------+
|reg_license|model|odometer|
+-----------+-----+--------+
|    7843321|Honda|   31718|
+-----------+-----+--------+



<font color=green> **Congratulations!**</font> You've completed the Spark SQL Analytics with the Iguazio Data Science Platform tutorial.

## Cleanup

Prior to exiting, let's do housekeeping to release disk space, computation and memory resources taken by this session.

### Remove Data

When you are done, uncomment the remove command in the following code to remove the example directory:

In [77]:
# Uncomment the following line to remove the examples directory:
# rm -rf /v3io/${V3IO_HOME}/examples/*

<a id="stop-spark-session"></a>
### Stop the Spark Session

Run the following command to release the computation and memory resources that are being consumed by your Spark session:

In [78]:
spark.stop()