# Pivotal Greenplum-Spark Connector
## PySpark Example

----

Pivotal Greenplum-Spark Connector documentation (notes below extracted from Pivotal documentation):

https://greenplum-spark.docs.pivotal.io/110/index.html

----

Steps to launching Jupyter Notebook with Greenplum-Spark connector available

1. Download greenplum-spark connector from Pivotal network https://network.pivotal.io/products/pivotal-gpdb (version used for this example greenplum-spark_2.11-1.1.0.jar)

2. Set environment variables - pyspark will launch Jupyter Notebook
```bash
# set environment variables
export PYSPARK_DRIVER_PYTHON='ipython'
export PYSPARK_DRIVER_PYTHON_OPTS='notebook --port=8888 --no-browser --ip=0.0.0.0 --notebook_dir=/notebooks'
```
3. Launch Jupyter Notebook
```bash
# Launch notebooks
# Set jar to location of greenplum-spark connector jar
pyspark --master spark://spark:7077 --jars=../spark-jars/greenplum-spark_2.11-1.1.0.jar
```

*Note - Wine data set used in example https://archive.ics.uci.edu/ml/datasets/wine*

----

#### Import Wine data set for example

In [10]:
# load wine dataset for example
import psycopg2

connString = "host='gpdb' dbname='gpadmin' user='gpadmin' password='pivotal' port=5432"
conn = psycopg2.connect(connString)
conn.autocommit = True
cur = conn.cursor()

# create external web table
query = """
    DROP EXTERNAL TABLE IF EXISTS public.wine_external;
    CREATE EXTERNAL WEB TABLE public.wine_external (
         cultivars integer
        ,alcohol float
        ,malic_acid float
        ,ash float
        ,alcalinity_of_ash float
        ,magnesium float
        ,total_phenols float
        ,flavanoids float
        ,nonflavanoid_phenols float
        ,proanthocyanins float
        ,color_intensity float
        ,hue float
        ,od280_od315 float
        ,proline integer
    ) LOCATION ('http://archive.ics.uci.edu/ml/machine-learning-databases/wine/wine.data')
    FORMAT 'CSV';
    
    DROP TABLE IF EXISTS public.wine;
    CREATE TABLE public.wine AS 
    SELECT * 
    FROM public.wine_external
    DISTRIBUTED BY (cultivars);
    
"""
cur.execute(query)

#### PySpark Example

In [11]:
# dependencies
import pyspark              # http://spark.apache.org/docs/latest/api/python/
#from pyspark.sql import SQLContext

Note that the .load() operation does not initiate the movement of data from Greenplum Database to Spark. 
Spark employs lazy evaluation for transformations; it does not compute the results until the application 
performs an action on the DataFrame, such as displaying or filtering the data or counting the number of rows.

https://greenplum-spark.docs.pivotal.io/110/read_from_gpdb.html

Options
* **url** format jdbc:postgresql://[hostname]:[port]/[database]
* **dbtable** table must be in GPDB search_path and have a distribution column (can not be distributed randomly)
* **partitionColumn** must be of type in [bigint, bigserial, integer, serial]

In [12]:
# create pointer to table 'pivotal.testing' in greenplum
#sqlContext = SQLContext(sc)
gpdf = sqlContext.read.format("io.pivotal.greenplum.spark.GreenplumRelationProvider").options(
    url="jdbc:postgresql://gpdb:5432/gpadmin",
    user="gpadmin",
    password="pivotal",
    dbtable="wine",
    partitionColumn="cultivars").load()


Note: By default, Spark recomputes a transformed DataFrame each time you run an action on it. 
If you have a large data set on which you want to perform multiple transformations, you may choose 
to keep the DataFrame in memory for performance reasons. You can use the DataSet.persist() method 
for this purpose. Keep in mind that there are memory implications to persisting large data sets.

In [13]:
gpdf.persist()

DataFrame[cultivars: int, alcohol: double, malic_acid: double, ash: double, alcalinity_of_ash: double, magnesium: double, total_phenols: double, flavanoids: double, nonflavanoid_phenols: double, proanthocyanins: double, color_intensity: double, hue: double, od280_od315: double, proline: int]

In [14]:
# Check out data types of columns
gpdf.printSchema()

root
 |-- cultivars: integer (nullable = true)
 |-- alcohol: double (nullable = true)
 |-- malic_acid: double (nullable = true)
 |-- ash: double (nullable = true)
 |-- alcalinity_of_ash: double (nullable = true)
 |-- magnesium: double (nullable = true)
 |-- total_phenols: double (nullable = true)
 |-- flavanoids: double (nullable = true)
 |-- nonflavanoid_phenols: double (nullable = true)
 |-- proanthocyanins: double (nullable = true)
 |-- color_intensity: double (nullable = true)
 |-- hue: double (nullable = true)
 |-- od280_od315: double (nullable = true)
 |-- proline: integer (nullable = true)



In [15]:
# Column names 
gpdf.columns

['cultivars',
 'alcohol',
 'malic_acid',
 'ash',
 'alcalinity_of_ash',
 'magnesium',
 'total_phenols',
 'flavanoids',
 'nonflavanoid_phenols',
 'proanthocyanins',
 'color_intensity',
 'hue',
 'od280_od315',
 'proline']

In [16]:
# row count
gpdf.count()

178

In [17]:
# show first 5 rows
gpdf.show(5, truncate=True)

+---------+-------+----------+----+-----------------+---------+-------------+----------+--------------------+---------------+---------------+----+-----------+-------+
|cultivars|alcohol|malic_acid| ash|alcalinity_of_ash|magnesium|total_phenols|flavanoids|nonflavanoid_phenols|proanthocyanins|color_intensity| hue|od280_od315|proline|
+---------+-------+----------+----+-----------------+---------+-------------+----------+--------------------+---------------+---------------+----+-----------+-------+
|        1|  14.23|      1.71|2.43|             15.6|    127.0|          2.8|      3.06|                0.28|           2.29|           5.64|1.04|       3.92|   1065|
|        1|   13.2|      1.78|2.14|             11.2|    100.0|         2.65|      2.76|                0.26|           1.28|           4.38|1.05|        3.4|   1050|
|        1|  13.16|      2.36|2.67|             18.6|    101.0|          2.8|      3.24|                 0.3|           2.81|           5.68|1.03|       3.17|   1185

In [18]:
# summary stats
# toPandas(): pySpark dataframe -> pandas dataframe
gpdf.describe().toPandas()

Unnamed: 0,summary,cultivars,alcohol,malic_acid,ash,alcalinity_of_ash,magnesium,total_phenols,flavanoids,nonflavanoid_phenols,proanthocyanins,color_intensity,hue,od280_od315,proline
0,count,178.0,178.0,178.0,178.0,178.0,178.0,178.0,178.0,178.0,178.0,178.0,178.0,178.0,178.0
1,mean,1.9382022471910112,13.00061797752809,2.3363483146067416,2.366516853932584,19.49494382022472,99.74157303370788,2.295112359550562,2.0292696629213487,0.3618539325842696,1.5908988764044945,5.058089882022473,0.9574494382022471,2.611685393258427,746.8932584269663
2,stddev,0.7750349899850562,0.8118265380058595,1.1171460976144625,0.274344009060815,3.339563767173506,14.282483515295665,0.625851048833989,0.998858685016947,0.1244533402966794,0.5723588626747612,2.318285871822412,0.2285715658298232,0.7099904287650507,314.90747427684926
3,min,1.0,11.03,0.74,1.36,10.6,70.0,0.98,0.34,0.13,0.41,1.28,0.48,1.27,278.0
4,max,3.0,14.83,5.8,3.23,30.0,162.0,3.88,5.08,0.66,3.58,13.0,1.71,4.0,1680.0


In [19]:
# select a subset of columns
gpdf.select(gpdf.columns[0:2]).show(5)

+---------+-------+
|cultivars|alcohol|
+---------+-------+
|        1|  14.23|
|        1|   13.2|
|        1|  13.16|
|        1|  14.37|
|        1|  13.24|
+---------+-------+
only showing top 5 rows



In [20]:
# Select first 5 columns, filter results to where cultivars = 1 and show top 5 when ranked by alcohol

# select columns -> filter rows -> order results by
gpdf.select(gpdf.columns[0:5]).filter("cultivars = 1").orderBy("alcohol").limit(5).toPandas()

Unnamed: 0,cultivars,alcohol,malic_acid,ash,alcalinity_of_ash
0,1,12.85,1.6,2.52,17.8
1,1,12.93,3.8,2.65,18.6
2,1,13.05,2.05,3.22,25.0
3,1,13.05,1.65,2.55,18.0
4,1,13.05,1.77,2.1,17.0


**Running Spark SQL query against DataFrame**

In [21]:
# Prepare temp table view for running SQL queries
gpdf.createGlobalTempView("wine")


In [22]:
# Select first 5 columns, filter results to where cultivars = 1 and show top 5 when ranked by alcohol

# prepare query
query = """
    SELECT {} 
    FROM global_temp.wine 
    WHERE cultivars = 1
    ORDER BY alcohol
""".format(','.join(gpdf.columns[0:5]))

# run query
spark.sql(query).limit(5).toPandas()

Unnamed: 0,cultivars,alcohol,malic_acid,ash,alcalinity_of_ash
0,1,12.85,1.6,2.52,17.8
1,1,12.93,3.8,2.65,18.6
2,1,13.05,2.05,3.22,25.0
3,1,13.05,1.65,2.55,18.0
4,1,13.05,1.77,2.1,17.0
