# Prepare driver

In [1]:
! wget -P /home/jovyan https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.5/postgresql-42.2.5.jar

--2019-07-21 14:39:45--  https://repo1.maven.org/maven2/org/postgresql/postgresql/42.2.5/postgresql-42.2.5.jar
Resolving repo1.maven.org (repo1.maven.org)... 151.101.84.209
Connecting to repo1.maven.org (repo1.maven.org)|151.101.84.209|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 825943 (807K) [application/java-archive]
Saving to: ‘/home/jovyan/postgresql-42.2.5.jar’


2019-07-21 14:39:46 (1.36 MB/s) - ‘/home/jovyan/postgresql-42.2.5.jar’ saved [825943/825943]



# Start spark application

In [2]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--driver-class-path /home/jovyan/postgresql-42.2.5.jar --jars /home/jovyan/postgresql-42.2.5.jar pyspark-shell'

import pyspark

spark = pyspark.sql.SparkSession.builder \
        .master("local[1]") \
        .appName("snapshot") \
        .getOrCreate()

print("Application started")

Application started


# Warm-up spark

In [3]:
spark.sparkContext.range(1000).sum()
print("Spark application is ready for work")

Spark application is ready for work


# Read some PostgreSQL Data

In [4]:
customers = spark.read.format('jdbc').options(
        url = "jdbc:postgresql://postgres:5432/postgres?user=postgres&password=postgres&currentSchema=inventory",
        database='postgres',
        dbtable='customers'
    ).load()

products = spark.read.format('jdbc').options(
        url = "jdbc:postgresql://postgres:5432/postgres?user=postgres&password=postgres&currentSchema=inventory",
        database='postgres',
        dbtable='products'
    ).load()

orders = spark.read.format('jdbc').options(
        url = "jdbc:postgresql://postgres:5432/postgres?user=postgres&password=postgres&currentSchema=inventory",
        database='postgres',
        dbtable='orders'
    ).load()

customers.registerTempTable("customers")
products.registerTempTable("products")
orders.registerTempTable("orders")

print("Customers table")
customers.show(5)
print("Orders table")
orders.show(5)
print("Products table")
products.show(5)


Customers table
+----+----------+---------+--------------------+
|  id|first_name|last_name|               email|
+----+----------+---------+--------------------+
|1437|    Alexis|  Mendoza|   omiller@yahoo.com|
|1149|   William|    Baker|kennedychristina@...|
|1004|      Anne|Kretchmar|  annek@noanswer.org|
|1438|      Beth|    Smith|christinalandry@g...|
|1439|    Denise|     Page| debra41@hotmail.com|
+----+----------+---------+--------------------+
only showing top 5 rows

Orders table
+-----+----------+---------+--------+----------+
|   id|order_date|purchaser|quantity|product_id|
+-----+----------+---------+--------+----------+
|10001|2016-01-16|     1001|       1|       102|
|10002|2016-01-17|     1002|       2|       105|
|10003|2016-02-19|     1002|       2|       106|
|10004|2016-02-21|     1003|       1|       107|
+-----+----------+---------+--------+----------+

Products table
+---+------------------+--------------------+------+
| id|              name|         description

# Read and join the data

In [30]:
# please write your query here
query = """
    select 
        Customers.id, 
        Customers.first_name, 
        Customers.last_name, 
        sum(Products.weight) total_weight,
        select *, current_timestamp() as load_dttm
    from 
        Customers
    full join
        Orders on 
        Orders.purchaser = Customers.id
        full join
            Products on
            Products.id = Orders.product_id
    where 
        Customers.id < 1006
    group by
        Customers.id, Customers.first_name, Customers.last_name) tmp
        """
result = spark.sql(query)
result.write.format("parquet").mode('overwrite').save("/home/jovyan/weight_report")

ParseException: "\nmismatched input '(' expecting <EOF>(line 3, pos 4)\n\n== SQL ==\n\n    select *, current_timestamp() as load_dttm\n    (select \n----^^^\n        Customers.id, \n        Customers.first_name, \n        Customers.last_name, \n        sum(Products.weight) total_weight\n    from \n        Customers\n    full join\n        Orders on \n        Orders.purchaser = Customers.id\n        full join\n            Products on\n            Products.id = Orders.product_id\n    where \n        Customers.id < 1006\n    group by\n        Customers.id, Customers.first_name, Customers.last_name) tmp\n        \n"

In [27]:
spark.read \
    .format("parquet").load("/home/jovyan/weight_report").show()

+----+----------+---------+------------+
|  id|first_name|last_name|total_weight|
+----+----------+---------+------------+
|1003|    Edward|   Walker|         5.3|
|1002|    George|   Bailey|       1.875|
|1001|     Sally|   Thomas|         8.1|
|1004|      Anne|Kretchmar|        null|
+----+----------+---------+------------+

