# PySpark and Spark SQL with Tables

In this notebook we will use the Boston dataset and store it the spark datawarehouse. We will then use the datawarehouse to access this data using spark sql.

In [1]:
import findspark
findspark.init()

from pyspark.sql import SparkSession;

spark = SparkSession.builder.master("local[4]").appName("ISM6562 Spark App01").enableHiveSupport().getOrCreate();

# Let's get the SparkContext object. It's the entry point to the Spark API. It's created when you create a sparksession
sc = spark.sparkContext  

# note: If you have multiple spark sessions running (like from a previous notebook you've run), 
# this spark session webUI will be on a different port than the default (4040). One way to 
# identify this part is with the following line. If there was only one spark session running, 
# this will be 4040. If it's higher, it means there are still other spark sesssions still running.
spark_session_port = spark.sparkContext.uiWebUrl.split(":")[-1]
print("Spark Session WebUI Port: " + spark_session_port)

23/04/09 19:16:09 WARN Utils: Your hostname, MBP-SMITH515.local resolves to a loopback address: 127.0.0.1; using 192.168.4.81 instead (on interface en0)
23/04/09 19:16:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/04/09 19:16:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


'4040'

In [None]:
# this will set the log level to ERROR. This will hide the INFO or WARNING messages that are printed out by default. If you want to see them, set this to INFO or WARN.
sc.setLogLevel("ERROR") 

Display the spark object - this provides the link to the Spark UI

In [2]:
spark

List the Spark datawarehouse. It should show the default database.

In [3]:
df=spark.sql("show databases")
df.show()

+---------+
|namespace|
+---------+
|  default|
+---------+



List the tables in the database. If this is a new install (you haven't run this notebook before), there shouldn't be any tables.

In [4]:
tables = spark.sql("show tables").show()

+---------+---------+-----------+
|namespace|tableName|isTemporary|
+---------+---------+-----------+
|  default|   boston|      false|
|  default|  boston3|      false|
+---------+---------+-----------+



Now, let's load the Boston dataset into the datawarehouse. We will use the spark dataframe API to load the data. We will then use the spark sql API to create a table from the dataframe.

In [5]:
boston = spark.read.csv('BostonHousing.csv', header=True, inferSchema=True);

# display the first 5 rows of the dataframe
boston.show(5);

+-------+----+-----+----+-----+-----+----+------+---+---+-------+-----+----+---------+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM| AGE|   DIS|RAD|TAX|PTRATIO|LSTAT|MEDV|CAT. MEDV|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+-----+----+---------+
|0.00632|18.0| 2.31|   0|0.538|6.575|65.2|  4.09|  1|296|   15.3| 4.98|24.0|        0|
|0.02731| 0.0| 7.07|   0|0.469|6.421|78.9|4.9671|  2|242|   17.8| 9.14|21.6|        0|
|0.02729| 0.0| 7.07|   0|0.469|7.185|61.1|4.9671|  2|242|   17.8| 4.03|34.7|        1|
|0.03237| 0.0| 2.18|   0|0.458|6.998|45.8|6.0622|  3|222|   18.7| 2.94|33.4|        1|
|0.06905| 0.0| 2.18|   0|0.458|7.147|54.2|6.0622|  3|222|   18.7| 5.33|36.2|        1|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+-----+----+---------+
only showing top 5 rows



DataFrames can also be saved as persistent tables into Hive metastore using the saveAsTable command. Notice that an existing Hive deployment is not necessary to use this feature. Spark will create a default local Hive metastore (using Derby) for you. Unlike the createOrReplaceTempView command, saveAsTable will materialize the contents of the DataFrame and create a pointer to the data in the Hive metastore. Persistent tables will still exist even after your Spark program has restarted, as long as you maintain your connection to the same metastore. A DataFrame for a persistent table can be created by calling the table method on a SparkSession with the name of the table.

see https://spark.apache.org/docs/2.2.0/sql-programming-guide.html#saving-to-persistent-tables

We can create a table from a dataframe using the spark sql API. This will create a table in memory. The table will be lost when the spark session is terminated.

In [6]:
boston.createOrReplaceTempView("boston_tmp_view")

We can use SQL to query the table. The result is a dataframe.

In [7]:
df = spark.sql("select * from boston_tmp_view")
df.show(5)

+-------+----+-----+----+-----+-----+----+------+---+---+-------+-----+----+---------+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM| AGE|   DIS|RAD|TAX|PTRATIO|LSTAT|MEDV|CAT. MEDV|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+-----+----+---------+
|0.00632|18.0| 2.31|   0|0.538|6.575|65.2|  4.09|  1|296|   15.3| 4.98|24.0|        0|
|0.02731| 0.0| 7.07|   0|0.469|6.421|78.9|4.9671|  2|242|   17.8| 9.14|21.6|        0|
|0.02729| 0.0| 7.07|   0|0.469|7.185|61.1|4.9671|  2|242|   17.8| 4.03|34.7|        1|
|0.03237| 0.0| 2.18|   0|0.458|6.998|45.8|6.0622|  3|222|   18.7| 2.94|33.4|        1|
|0.06905| 0.0| 2.18|   0|0.458|7.147|54.2|6.0622|  3|222|   18.7| 5.33|36.2|        1|
+-------+----+-----+----+-----+-----+----+------+---+---+-------+-----+----+---------+
only showing top 5 rows



Now, when we list the tables, we should see the boston table (and the temp table we created earlier).

In [8]:
tables = spark.sql("show tables").show()

+---------+---------------+-----------+
|namespace|      tableName|isTemporary|
+---------+---------------+-----------+
|  default|         boston|      false|
|  default|        boston3|      false|
|         |boston_tmp_view|       true|
+---------+---------------+-----------+



We can now use the spark sql API to query the table.

In [9]:
df = spark.sql("select * from boston")
df.show()

+-------+----+-----+----+-----+-----+-----+------+---+---+-------+-----+----+---------+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM|  AGE|   DIS|RAD|TAX|PTRATIO|LSTAT|MEDV|CAT. MEDV|
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+-----+----+---------+
|0.00632|18.0| 2.31|   0|0.538|6.575| 65.2|  4.09|  1|296|   15.3| 4.98|24.0|        0|
|0.02731| 0.0| 7.07|   0|0.469|6.421| 78.9|4.9671|  2|242|   17.8| 9.14|21.6|        0|
|0.02729| 0.0| 7.07|   0|0.469|7.185| 61.1|4.9671|  2|242|   17.8| 4.03|34.7|        1|
|0.03237| 0.0| 2.18|   0|0.458|6.998| 45.8|6.0622|  3|222|   18.7| 2.94|33.4|        1|
|0.06905| 0.0| 2.18|   0|0.458|7.147| 54.2|6.0622|  3|222|   18.7| 5.33|36.2|        1|
|0.02985| 0.0| 2.18|   0|0.458| 6.43| 58.7|6.0622|  3|222|   18.7| 5.21|28.7|        0|
|0.08829|12.5| 7.87|   0|0.524|6.012| 66.6|5.5605|  5|311|   15.2|12.43|22.9|        0|
|0.14455|12.5| 7.87|   0|0.524|6.172| 96.1|5.9505|  5|311|   15.2|19.15|27.1|        0|
|0.21124|12.5| 7.87|   0|0.524|5

To save the table to the spark datawarehouse, we use the saveAsTable command. This will create a table in the spark datawarehouse.

In [10]:
boston.write.saveAsTable("boston", mode='overwrite')

In [11]:
spark.catalog.listTables()

[Table(name='boston', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='boston3', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='boston_tmp_view', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [12]:
df = spark.sql("select * from boston")
df.show()

+-------+----+-----+----+-----+-----+-----+------+---+---+-------+-----+----+---------+
|   CRIM|  ZN|INDUS|CHAS|  NOX|   RM|  AGE|   DIS|RAD|TAX|PTRATIO|LSTAT|MEDV|CAT. MEDV|
+-------+----+-----+----+-----+-----+-----+------+---+---+-------+-----+----+---------+
|0.00632|18.0| 2.31|   0|0.538|6.575| 65.2|  4.09|  1|296|   15.3| 4.98|24.0|        0|
|0.02731| 0.0| 7.07|   0|0.469|6.421| 78.9|4.9671|  2|242|   17.8| 9.14|21.6|        0|
|0.02729| 0.0| 7.07|   0|0.469|7.185| 61.1|4.9671|  2|242|   17.8| 4.03|34.7|        1|
|0.03237| 0.0| 2.18|   0|0.458|6.998| 45.8|6.0622|  3|222|   18.7| 2.94|33.4|        1|
|0.06905| 0.0| 2.18|   0|0.458|7.147| 54.2|6.0622|  3|222|   18.7| 5.33|36.2|        1|
|0.02985| 0.0| 2.18|   0|0.458| 6.43| 58.7|6.0622|  3|222|   18.7| 5.21|28.7|        0|
|0.08829|12.5| 7.87|   0|0.524|6.012| 66.6|5.5605|  5|311|   15.2|12.43|22.9|        0|
|0.14455|12.5| 7.87|   0|0.524|6.172| 96.1|5.9505|  5|311|   15.2|19.15|27.1|        0|
|0.21124|12.5| 7.87|   0|0.524|5

If we wish to drop a table from the warehouse, we can use the drop command.

In [13]:
# For now, we will keep the table and access it in another notebook.
# spark.sql("DROP TABLE boston")

DataFrame[]

In [14]:
spark.catalog.listTables()

[Table(name='boston', database='default', description=None, tableType='MANAGED', isTemporary=False),
 Table(name='boston_tmp_view', database=None, description=None, tableType='TEMPORARY', isTemporary=True)]

In [15]:
# spark.stop()