## Overview

This notebook will show you how to create and query a table or DataFrame that you uploaded to DBFS. [DBFS](https://docs.databricks.com/user-guide/dbfs-databricks-file-system.html) is a Databricks File System that allows you to store data for querying inside of Databricks. This notebook assumes that you have a file already inside of DBFS that you would like to read from.

This notebook is written in **Python** so the default cell type is Python. However, you can use different languages by using the `%LANGUAGE` syntax. Python, Scala, SQL, and R are all supported.

In [0]:
pip install findspark

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
pip install spark

Python interpreter will be restarted.
Python interpreter will be restarted.


In [0]:
# Import packages
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").config("spark.driver.memory", "2g").getOrCreate()

In [0]:

# Import findspark and initialize. 
import findspark
findspark.init()

In [0]:
# Import packages
from pyspark.sql import SparkSession
import time


In [0]:
 #Create a SparkSession
spark = SparkSession.builder\
    .appName("SparkSQL")\
    .config("spark.sql.debug.maxToStringFields", 2000)\
    .config("spark.driver.memory", "2g")\
    .getOrCreate()

In [0]:
#File location and type
file_location = "/FileStore/tables/home_sales_revised_.csv"
file_type = "csv"

#CSV options
infer_schema = "false"
first_row_is_header = "false"
delimiter = ","

# The applied options are for CSV files. For other file types, these will be ignored.
df = spark.read.format(file_type) \
  .option("inferSchema", infer_schema) \
  .option("header", first_row_is_header) \
  .option("sep", delimiter) \
  .load(file_location)

display(df)

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10
id,date,date_built,price,bedrooms,bathrooms,sqft_living,sqft_lot,floors,waterfront,view
f8a53099-ba1c-47d6-9c31-7398aa8f6089,08/04/2022,2016,936923,4,3,3167,11733,2,1,76
7530a2d8-1ae3-4517-9f4a-befe060c4353,13/06/2021,2013,379628,2,2,2235,14384,1,0,23
43de979c-0bf0-4c9f-85ef-96dc27b258d5,12/04/2019,2014,417866,2,2,2127,10575,2,0,0
b672c137-b88c-48bf-9f18-d0a4ac62fb8b,16/10/2019,2016,239895,2,2,1631,11149,2,0,0
e0726d4d-d595-4074-8283-4139a54d0d63,08/01/2022,2017,424418,3,2,2249,13878,2,0,4
5aa00529-0533-46ba-870c-9e881580ef35,30/01/2019,2017,218712,2,3,1965,14375,2,0,7
131492a1-72e2-4a84-bf97-0db14973bfdb,08/02/2020,2017,419199,2,3,2062,8876,2,0,6
8d54a71b-c520-44e5-8ba1-5a84be03ad35,21/07/2019,2010,323956,2,3,1506,11816,1,0,25
e81aacfe-17fe-46b1-a52a-4753d1622b4a,16/06/2020,2016,181925,3,3,2137,11709,2,0,22


In [0]:
# Create a view or table

temp_table_name = "home_sales_revised__csv"

df.createOrReplaceTempView(temp_table_name)

display(df)

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10
id,date,date_built,price,bedrooms,bathrooms,sqft_living,sqft_lot,floors,waterfront,view
f8a53099-ba1c-47d6-9c31-7398aa8f6089,08/04/2022,2016,936923,4,3,3167,11733,2,1,76
7530a2d8-1ae3-4517-9f4a-befe060c4353,13/06/2021,2013,379628,2,2,2235,14384,1,0,23
43de979c-0bf0-4c9f-85ef-96dc27b258d5,12/04/2019,2014,417866,2,2,2127,10575,2,0,0
b672c137-b88c-48bf-9f18-d0a4ac62fb8b,16/10/2019,2016,239895,2,2,1631,11149,2,0,0
e0726d4d-d595-4074-8283-4139a54d0d63,08/01/2022,2017,424418,3,2,2249,13878,2,0,4
5aa00529-0533-46ba-870c-9e881580ef35,30/01/2019,2017,218712,2,3,1965,14375,2,0,7
131492a1-72e2-4a84-bf97-0db14973bfdb,08/02/2020,2017,419199,2,3,2062,8876,2,0,6
8d54a71b-c520-44e5-8ba1-5a84be03ad35,21/07/2019,2010,323956,2,3,1506,11816,1,0,25
e81aacfe-17fe-46b1-a52a-4753d1622b4a,16/06/2020,2016,181925,3,3,2137,11709,2,0,22


In [0]:
%sql

/* Query the created temp table in a SQL cell */

select * from `home_sales_revised__csv`

_c0,_c1,_c2,_c3,_c4,_c5,_c6,_c7,_c8,_c9,_c10
id,date,date_built,price,bedrooms,bathrooms,sqft_living,sqft_lot,floors,waterfront,view
f8a53099-ba1c-47d6-9c31-7398aa8f6089,08/04/2022,2016,936923,4,3,3167,11733,2,1,76
7530a2d8-1ae3-4517-9f4a-befe060c4353,13/06/2021,2013,379628,2,2,2235,14384,1,0,23
43de979c-0bf0-4c9f-85ef-96dc27b258d5,12/04/2019,2014,417866,2,2,2127,10575,2,0,0
b672c137-b88c-48bf-9f18-d0a4ac62fb8b,16/10/2019,2016,239895,2,2,1631,11149,2,0,0
e0726d4d-d595-4074-8283-4139a54d0d63,08/01/2022,2017,424418,3,2,2249,13878,2,0,4
5aa00529-0533-46ba-870c-9e881580ef35,30/01/2019,2017,218712,2,3,1965,14375,2,0,7
131492a1-72e2-4a84-bf97-0db14973bfdb,08/02/2020,2017,419199,2,3,2062,8876,2,0,6
8d54a71b-c520-44e5-8ba1-5a84be03ad35,21/07/2019,2010,323956,2,3,1506,11816,1,0,25
e81aacfe-17fe-46b1-a52a-4753d1622b4a,16/06/2020,2016,181925,3,3,2137,11709,2,0,22


In [0]:
# We can convert the DataFrame to a view and run SQL queries.
df.createOrReplaceTempView("home_sales")

In [0]:

#3. What is the average price for a four bedroom house sold in each year rounded to two decimal places?

spark.sql("""
SELECT _c1, ROUND(avg(_c3),2)
FROM home_sales
WHERE home_sales._c4 = "4"
group by _c1;
  """).show()


+----------+------------------+
|       _c1|round(avg(_c3), 2)|
+----------+------------------+
|04/12/2020|          208269.0|
|05/11/2020|         292661.91|
|20/09/2021|         309416.67|
|25/05/2021|          328821.0|
|26/05/2019|         322000.14|
|26/09/2019|          290803.8|
|02/11/2020|          379040.7|
|19/01/2020|         324130.22|
|30/01/2019|         287003.09|
|11/06/2021|          258564.4|
|28/03/2020|         357600.56|
|23/11/2020|         281204.75|
|13/02/2019|          305581.8|
|07/03/2020|         318471.63|
|07/09/2021|          349983.4|
|25/02/2020|          260332.5|
|04/01/2020|         267569.67|
|27/10/2019|          321836.8|
|18/07/2021|         304883.05|
|08/11/2020|         266979.83|
+----------+------------------+
only showing top 20 rows



In [0]:

%sql
SELECT _c1, ROUND(avg(_c3),2)
FROM home_sales
WHERE home_sales._c4 = "4"
group by _c1, _c3, _c4;
--LIMIT 5;

_c1,"round(avg(_c3), 2)"
21/07/2020,288365.0
13/08/2021,406635.0
11/12/2020,169829.0
28/05/2020,168631.0
26/05/2020,278236.0
07/12/2019,396123.0
09/09/2019,387254.0
01/08/2019,244296.0
01/04/2022,443639.0
24/03/2019,437983.0


In [0]:
# 4. What is the average price of a home for each year the home was built that have 3 bedrooms and 3 bathrooms rounded to two decimal places?
spark.sql("""
SELECT _c2, ROUND(avg(_c3),2)
FROM home_sales
WHERE home_sales._c4 = "3" AND home_sales._c5 = "3"
group by _c2;
  """).show()


  


  

+----+------------------+
| _c2|round(avg(_c3), 2)|
+----+------------------+
|2016|         290555.07|
|2012|         293683.19|
|2017|         292676.79|
|2014|         290852.27|
|2013|         295962.27|
|2011|         291117.47|
|2015|          288770.3|
|2010|         292859.62|
+----+------------------+



In [0]:

%sql
SELECT _c2, ROUND(avg(_c3),2)
FROM home_sales
WHERE home_sales._c4 = "3" AND home_sales._c5 = "3"
group by _c2 ;

_c2,"round(avg(_c3), 2)"
2016,290555.07
2012,293683.19
2017,292676.79
2014,290852.27
2013,295962.27
2011,291117.47
2015,288770.3
2010,292859.62


In [0]:
#5. What is the average price of a home for each year built that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet rounded to two decimal places?

spark.sql("""
SELECT _c2, ROUND(avg(_c3),2)
FROM home_sales
WHERE home_sales._c4 = "3" AND home_sales._c5 = "3" AND home_sales._c8 = "2" AND home_sales._c6>="2000"
group by _c2;
  """).show()

+----+------------------+
| _c2|round(avg(_c3), 2)|
+----+------------------+
|2016|          293965.1|
|2012|         307539.97|
|2017|         280317.58|
|2014|         298264.72|
|2013|         303676.79|
|2011|         276553.81|
|2015|         297609.97|
|2010|         285010.22|
+----+------------------+



In [0]:
%sql
SELECT _c2, ROUND(avg(_c3),2)
FROM home_sales
WHERE home_sales._c4 = "3" AND home_sales._c5 = "3" AND home_sales._c8 = "2" AND home_sales._c6>="2000"
group by _c2;


_c2,"round(avg(_c3), 2)"
2016,293965.1
2012,307539.97
2017,280317.58
2014,298264.72
2013,303676.79
2011,276553.81
2015,297609.97
2010,285010.22


In [0]:
 #6. What is the "view" rating for the average price of a home, rounded to two decimal places, where the homes are greater than
# or equal to $350,000? Although this is a small dataset, determine the run time for this query.

# Import packages
from pyspark.sql import SparkSession
import time
start_time = time.time()
spark.sql("""
SELECT _c10, ROUND(avg(_c3),2) 
FROM home_sales
WHERE home_sales._c3 >="350,000" 
group by _c10 ;
  """).show()
print("--- %s run time in seconds ---" % (time.time() - start_time))

+----+------------------+
|_c10|round(avg(_c3), 2)|
+----+------------------+
|   7|         403005.77|
|  51|         788128.21|
|  15|          404673.3|
|  54|         798684.82|
|  11|         399548.12|
|  29|         397771.65|
|  69|         750537.94|
|  42|          396964.5|
|  87|         719078.95|
|  73|         752861.18|
|  64|         767036.67|
|   3|          398867.6|
|  30|          397862.0|
|  34|         401419.75|
|  59|          791453.0|
|   8|         398592.71|
|  28|         402124.62|
|  22|         402022.68|
|  85|          742853.8|
|  16|         399586.53|
+----+------------------+
only showing top 20 rows

--- 0.6105177402496338 run time in seconds ---


In [0]:
%sql
SELECT _c10, ROUND(avg(_c3),2)
FROM home_sales
WHERE home_sales._c3 >="350,000" 
group by _c10;


_c10,"round(avg(_c3), 2)"
7,403005.77
51,788128.21
15,404673.3
54,798684.82
11,399548.12
29,397771.65
69,750537.94
42,396964.5
87,719078.95
73,752861.18


In [0]:
# 7. Cache the the temporary table home_sales.

#spark.sql("cache table delayed")
sqlContext.cacheTable("home_sales")

In [0]:
 #Cache the the temporary table home_sales.
spark.sql("cache table home_sales")

Out[19]: DataFrame[]

In [0]:
# 8. Check if the table is cached.
spark.catalog.isCached('home_sales')

Out[20]: True

In [0]:
# 9. Using the cached data, run the query that filters out the view ratings with average price 
#  greater than or equal to $350,000. Determine the runtime and compare it to uncached runtime.

from pyspark.sql import SparkSession
import time
start_time = time.time()
spark.sql("""
SELECT _c10, ROUND(avg(_c3),2)
FROM home_sales
WHERE home_sales._c3 >="350,000" 
group by _c10;
  """).show()
print("--- %s seconds ---" % (time.time() - start_time))

+----+------------------+
|_c10|round(avg(_c3), 2)|
+----+------------------+
|   7|         403005.77|
|  51|         788128.21|
|  15|          404673.3|
|  54|         798684.82|
|  11|         399548.12|
|  29|         397771.65|
|  69|         750537.94|
|  42|          396964.5|
|  87|         719078.95|
|  73|         752861.18|
|  64|         767036.67|
|   3|          398867.6|
|  30|          397862.0|
|  34|         401419.75|
|  59|          791453.0|
|   8|         398592.71|
|  28|         402124.62|
|  22|         402022.68|
|  85|          742853.8|
|  16|         399586.53|
+----+------------------+
only showing top 20 rows

--- 0.3798329830169678 seconds ---


In [0]:

# Write out the data in parquet format
df.write.parquet('parquet_descriptions',mode='overwrite')

In [0]:

# Read in our new parquet formatted data
p_df=spark.read.parquet('/parquet_descriptions')

In [0]:
p_df.createOrReplaceTempView('P_home_sales')

In [0]:
# Start the runtime
start_time = time.time()

# Run the same query here
spark.sql("""
SELECT _c1, ROUND(avg(_c3),2),count(_c4)
FROM P_home_sales
WHERE P_home_sales._c4 = "4"
group by _c1;
  """).show()

# Print out the runtime
print("--- %s seconds ---" % (time.time() - start_time))

+----------+------------------+----------+
|       _c1|round(avg(_c3), 2)|count(_c4)|
+----------+------------------+----------+
|04/12/2020|          208269.0|         9|
|05/11/2020|         292661.91|        11|
|20/09/2021|         309416.67|         6|
|25/05/2021|          328821.0|         9|
|26/05/2019|         322000.14|         7|
|26/09/2019|          290803.8|         5|
|02/11/2020|          379040.7|        10|
|19/01/2020|         324130.22|         9|
|30/01/2019|         287003.09|        11|
|11/06/2021|          258564.4|        10|
|28/03/2020|         357600.56|         9|
|23/11/2020|         281204.75|         4|
|13/02/2019|          305581.8|        10|
|07/03/2020|         318471.63|         8|
|07/09/2021|          349983.4|        10|
|25/02/2020|          260332.5|         8|
|04/01/2020|         267569.67|         6|
|27/10/2019|          321836.8|         5|
|18/07/2021|         304883.05|        21|
|08/11/2020|         266979.83|         6|
+----------

In [0]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data 
# Write out your parquet data, partitioning on the Origin column
df.write.partitionBy("_c2").mode("overwrite").parquet('P_home_sales')

In [0]:
# 11. Read the formatted parquet data.
p_df_p=spark.read.parquet('/P_home_sales')

In [0]:
# 12. Create a temporary table for the parquet data.
p_df_p.createOrReplaceTempView('P_home_sales_P')

In [0]:
# 13. Run the query that filters out the view ratings with average price of greater than or eqaul to $350,000 
# with the parquet DataFrame. Round your average to two decimal places. 
# Determine the runtime and compare it to the cached version. 

start_time = time.time()
# Import packages
from pyspark.sql import SparkSession
import time
start_time = time.time()
spark.sql("""
SELECT _c10, ROUND(avg(_c3),2)
FROM P_home_sales_P
WHERE P_home_sales_P._c3 >="350,000" 
group by _c10;
  """).show()
print("--- %s seconds ---" % (time.time() - start_time))



+----+------------------+
|_c10|round(avg(_c3), 2)|
+----+------------------+
|   7|         403005.77|
|  51|         788128.21|
|  15|          404673.3|
|  54|         798684.82|
|  11|         399548.12|
|  29|         397771.65|
|  69|         750537.94|
|  42|          396964.5|
|  73|         752861.18|
|  87|         719078.95|
|  64|         767036.67|
|   3|          398867.6|
|  30|          397862.0|
|  34|         401419.75|
|  59|          791453.0|
|   8|         398592.71|
|  28|         402124.62|
|  22|         402022.68|
|  85|          742853.8|
|  35|         401934.21|
+----+------------------+
only showing top 20 rows

--- 1.7983012199401855 seconds ---


In [0]:
# 14. Uncache the home_sales temporary table.
spark.sql("uncache table home_sales")

Out[30]: DataFrame[]

In [0]:
# 15. Check if the home_sales is no longer cached
spark.catalog.isCached("home_sales")

Out[31]: False