In [1]:
import os

# Find the latest version of spark 3.x  from http://www.apache.org/dist/spark/ and enter as the spark version
# For example:
# spark_version = 'spark-3.4.0'
spark_version = 'spark-3.4.1'
os.environ['SPARK_VERSION']=spark_version

# Install Spark and Java
!apt-get update
!apt-get install openjdk-11-jdk-headless -qq > /dev/null
!wget -q http://www.apache.org/dist/spark/$SPARK_VERSION/$SPARK_VERSION-bin-hadoop3.tgz
!tar xf $SPARK_VERSION-bin-hadoop3.tgz
!pip install -q findspark

# Set Environment Variables
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-11-openjdk-amd64"
os.environ["SPARK_HOME"] = f"/content/{spark_version}-bin-hadoop3"

# Start a SparkSession
import findspark
findspark.init()

0% [Working]            Get:1 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
0% [Connecting to archive.ubuntu.com (91.189.91.81)] [1 InRelease 12.7 kB/110 k                                                                               Get:2 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
0% [Waiting for headers] [1 InRelease 17.1 kB/110 kB 15%] [2 InRelease 0 B/3,620% [Waiting for headers] [1 InRelease 34.4 kB/110 kB 31%] [Connecting to ppa.la                                                                               Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [1 InRelease 43.1 kB/110 kB 39%] [Connecting to ppa.la                                                                               Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
Get:5 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [119 kB]
Get:6 http://archive.ubuntu.com/



# 1. Read in the AWS S3 bucket into a DataFrame

### Importing Necessary Packages
First, we import the essential packages needed for the task.

```python
from pyspark.sql import SparkSession
import time
```

### Creating a SparkSession
SparkSession is the entry point to any Spark functionality. When you create a SparkSession, it initiates a Spark Application that all the code for that Session will run on.

```python
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
spark
```

### Defining the S3 Bucket URL
This URL points to the CSV file in the S3 bucket that you want to read into a DataFrame.

```python
url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"
```

### Adding the URL to Spark Context
Spark context is the client-side object that represents the connection to a Spark cluster and is used to configure and connect to your Spark cluster.

```python
spark.sparkContext.addFile(url)
```

### Reading the CSV file into a DataFrame
Spark DataFrame is a distributed collection of data organized into named columns. Here, we're reading the CSV file from the Spark context, specifying the separator as "," and treating the first row as headers.

```python
df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), sep = "," , header = True)
```

### Showing the DataFrame
This will print the first 20 rows of the DataFrame in the console.

```python
df.show()
```


In [2]:
# Import packages
from pyspark import SparkFiles
from pyspark.sql import SparkSession
import time

# Create a SparkSession
spark = SparkSession.builder.appName("SparkSQL").getOrCreate()
spark

In [3]:
# 1. Read in the AWS S3 bucket into a DataFrame.

url = "https://2u-data-curriculum-team.s3.amazonaws.com/dataviz-classroom/v1.2/22-big-data/home_sales_revised.csv"

spark.sparkContext.addFile(url)
df = spark.read.csv(SparkFiles.get("home_sales_revised.csv"), sep = "," , header = True)
df.show()

+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|                  id|      date|date_built| price|bedrooms|bathrooms|sqft_living|sqft_lot|floors|waterfront|view|
+--------------------+----------+----------+------+--------+---------+-----------+--------+------+----------+----+
|f8a53099-ba1c-47d...|2022-04-08|      2016|936923|       4|        3|       3167|   11733|     2|         1|  76|
|7530a2d8-1ae3-451...|2021-06-13|      2013|379628|       2|        2|       2235|   14384|     1|         0|  23|
|43de979c-0bf0-4c9...|2019-04-12|      2014|417866|       2|        2|       2127|   10575|     2|         0|   0|
|b672c137-b88c-48b...|2019-10-16|      2016|239895|       2|        2|       1631|   11149|     2|         0|   0|
|e0726d4d-d595-407...|2022-01-08|      2017|424418|       3|        2|       2249|   13878|     2|         0|   4|
|5aa00529-0533-46b...|2019-01-30|      2017|218712|       2|        3|       196

In [4]:
# 2. Create a temporary view of the DataFrame.
df.createOrReplaceTempView("sales")


Apache Spark is an open-source distributed computing system, and `spark.sql` is a part of Spark's SQL module. It allows you to run SQL queries programmatically, returning the result as a DataFrame. This can be quite powerful for data manipulation and querying in a big data context.

Here's a brief breakdown:

- `spark`: When you're working with Spark, you typically start by creating a SparkSession. This is usually stored in a variable named `spark`, and it's the entry point to any functionality in Spark.

- `sql`: This method on the SparkSession allows you to run SQL queries just as you would against a traditional database. You pass in your SQL query as a string, and it returns a DataFrame with the results.

So when you see `spark.sql('SELECT * FROM table')`, this is running a SQL query to select everything from the specified table and returning it as a DataFrame, allowing you to manipulate and analyze the data using Spark's distributed computing capabilities.

It's a handy tool to have, especially if you're already comfortable with SQL, as it allows you to leverage those skills in a distributed computing environment like Spark. It can be particularly useful in data analysis tasks like the ones you're learning in your bootcamp.

## **What is the average price for a four-bedroom house sold in each year rounded to two decimal places?**

In [5]:
# 3. What is the average price for a four bedroom house sold in each year rounded to two decimal places?
spark.sql('''
SELECT
  YEAR(date) AS year,
  ROUND(AVG(price),2 ) AS average_price
FROM
  sales
WHERE
  bedrooms = 4
GROUP BY
  YEAR(date)

''').show()

+----+-------------+
|year|average_price|
+----+-------------+
|2022|    296363.88|
|2019|     300263.7|
|2020|    298353.78|
|2021|    301819.44|
+----+-------------+



## **What is the average price of a home for each year built that have 3 bedrooms, 3 bathrooms, with two floors, and are greater than or equal to 2,000 square feet rounded to two decimal places?**

In [6]:
# 4. What is the average price of a home for each year the home was built that have 3 bedrooms and 3 bathrooms rounded to two decimal
spark.sql('''
SELECT
  YEAR(date) AS year,
  ROUND(AVG(price), 2) AS average_price
FROM
  sales
WHERE
  bedrooms = 4 AND bathrooms = 3
GROUP BY
  YEAR(date)
''').show()

+----+-------------+
|year|average_price|
+----+-------------+
|2022|    290780.14|
|2019|    292438.24|
|2020|    289659.97|
|2021|    292627.68|
+----+-------------+



## **What is the "view" rating for the average price of a home, rounded to two decimal places, where the homes are greater than or equal to $350,000? Although this is a small dataset, determine the run time for this query.**

In [7]:
# 5. What is the average price of a home for each year built that have 3 bedrooms, 3 bathrooms, with two floors,
# and are greater than or equal to 2,000 square feet rounded to two decimal places?

spark.sql('''
SELECT
  YEAR(date) AS year,
  ROUND(AVG(price), 2) AS average_price
FROM
  sales
WHERE
  bedrooms = 3 AND bathrooms = 3 AND floors = 2 AND sqft_living >= 2000
GROUP BY
  YEAR(date)
''').show()

+----+-------------+
|year|average_price|
+----+-------------+
|2022|    290242.99|
|2019|    289859.14|
|2020|    292289.09|
|2021|    296330.96|
+----+-------------+



In [8]:
# 6. What is the "view" rating for the average price of a home, rounded to two decimal places, where the homes are greater than
# or equal to $350,000? Although this is a small dataset, determine the run time for this query.

start_time = time.time()
spark.sql('''
SELECT
  view as rating,
  ROUND(AVG(price), 2) AS average_price
FROM
  sales
GROUP BY
  view
HAVING
  AVG(price) >= 350000
ORDER BY
  view desc

''').show()

print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+
|rating|average_price|
+------+-------------+
|    99|   1061201.42|
|    98|   1053739.33|
|    97|   1129040.15|
|    96|   1017815.92|
|    95|    1054325.6|
|    94|    1033536.2|
|    93|   1026006.06|
|    92|    970402.55|
|    91|   1137372.73|
|    90|   1062654.16|
|    89|   1107839.15|
|    88|   1031719.35|
|    87|    1072285.2|
|    86|   1070444.25|
|    85|   1056336.74|
|    84|   1117233.13|
|    83|   1033965.93|
|    82|    1063498.0|
|    81|   1053472.79|
|    80|    991767.38|
+------+-------------+
only showing top 20 rows

--- 0.9553377628326416 seconds ---


In [9]:
# 7. Cache the the temporary table home_sales.
spark.catalog.cacheTable("sales")

**Caching** is a technique used in computing to store data in a place where it can be accessed more quickly. It's like keeping your favorite snacks in a nearby drawer instead of in the kitchen – they're easier to grab when you want them.


**What is Cached in spark:** In the context of Spark, caching means storing the result of a DataFrame or temporary view in memory. This allows Spark to access the data more quickly than if it had to read it from disk every time.

In [10]:
# 8. Check if the table is cached.
spark.catalog.isCached('sales')

True

In [11]:
# 9. Using the cached data, run the query that filters out the view ratings with average price
#  greater than or equal to $350,000. Determine the runtime and compare it to uncached runtime.

start_time = time.time()
spark.sql('''
SELECT
  view as rating,
  ROUND(AVG(price), 2) AS average_price
FROM
  sales
GROUP BY
  view
HAVING
  AVG(price) >= 350000
ORDER BY
  view desc


''').show()

print("--- %s seconds ---" % (time.time() - start_time))


+------+-------------+
|rating|average_price|
+------+-------------+
|    99|   1061201.42|
|    98|   1053739.33|
|    97|   1129040.15|
|    96|   1017815.92|
|    95|    1054325.6|
|    94|    1033536.2|
|    93|   1026006.06|
|    92|    970402.55|
|    91|   1137372.73|
|    90|   1062654.16|
|    89|   1107839.15|
|    88|   1031719.35|
|    87|    1072285.2|
|    86|   1070444.25|
|    85|   1056336.74|
|    84|   1117233.13|
|    83|   1033965.93|
|    82|    1063498.0|
|    81|   1053472.79|
|    80|    991767.38|
+------+-------------+
only showing top 20 rows

--- 1.73494553565979 seconds ---




### 10. Partition the Data by "date_built"
Here, we are writing the DataFrame `df` to a parquet file and partitioning it by the "date_built" field. Partitioning the data this way can optimize read performance for queries involving this field.
```python
df.write.partitionBy("date_built").mode("overwrite").parquet("parqet_sales")
```

### 11. Read the Parquet Formatted Data
Next, we read the parquet file that was just written and assign it to a new DataFrame. We can then display the first few rows with the `head()` method to quickly check the data.
```python
parqet_sales_df = spark.read.parquet("parqet_sales")
parqet_sales_df.head()
```

### 12. Create a Temporary Table for the Parquet Data
Finally, we create a temporary table view of the parquet data. This allows us to query the DataFrame using SQL syntax, offering more flexibility and convenience for analysis.
```python
parqet_sales_df.createOrReplaceTempView('parqet_sales')


In [12]:
# 10. Partition by the "date_built" field on the formatted parquet home sales data
df.write.partitionBy("date_built").mode("overwrite").parquet("parqet_sales")

In [13]:
# 11. Read the parquet formatted data.
parqet_sales_df = spark.read.parquet("parqet_sales")
parqet_sales_df.head()

Row(id='2ed8d509-7372-46d5-a9dd-9281a95467d4', date='2021-08-06', price='258710', bedrooms='3', bathrooms='3', sqft_living='1918', sqft_lot='9666', floors='1', waterfront='0', view='25', date_built=2015)

In [14]:
# 12. Create a temporary table for the parquet data.
parqet_sales_df.createOrReplaceTempView('parqet_sales')

In [15]:
# 13. Run the query that filters out the view ratings with average price of greater than or equal to $350,000
# with the parquet DataFrame. Round your average to two decimal places.
# Determine the runtime and compare it to the cached version.
start_time = time.time()
spark.sql('''
SELECT
  view as rating,
  ROUND(AVG(price), 2) AS average_price
FROM
  sales
GROUP BY
  view
HAVING
  AVG(price) >= 350000
ORDER BY
  view desc

''').show()

print("--- %s seconds ---" % (time.time() - start_time))

+------+-------------+
|rating|average_price|
+------+-------------+
|    99|   1061201.42|
|    98|   1053739.33|
|    97|   1129040.15|
|    96|   1017815.92|
|    95|    1054325.6|
|    94|    1033536.2|
|    93|   1026006.06|
|    92|    970402.55|
|    91|   1137372.73|
|    90|   1062654.16|
|    89|   1107839.15|
|    88|   1031719.35|
|    87|    1072285.2|
|    86|   1070444.25|
|    85|   1056336.74|
|    84|   1117233.13|
|    83|   1033965.93|
|    82|    1063498.0|
|    81|   1053472.79|
|    80|    991767.38|
+------+-------------+
only showing top 20 rows

--- 0.3086867332458496 seconds ---



### 14. Uncache the parqet_sales Temporary Table
Here, we use a SQL command to uncache the temporary table `parqet_sales`. Uncaching a table removes it from memory, which can free up resources.
```python
spark.sql("uncache table parqet_sales")
```

### 15. Check if the parqet_sales Table is No Longer Cached
We then check whether the `parqet_sales` table is still cached or not. If the table is still cached, it will print a warning message. If not, it will print "all clear," confirming that the resources have been freed.
```python
if spark.catalog.isCached("parqet_sales"):
  print("a table is still cached")
else:
  print("all clear")
```


In [18]:
# 14. Uncache the home_sales temporary table.
spark.sql("uncache table parqet_sales")

DataFrame[]

In [19]:
# 15. Check if the home_sales is no longer cached
if spark.catalog.isCached("parqet_sales"):
  print("a table is till cached")
else:
  print("all clear")


all clear
