In [1]:
import findspark
findspark.init()

In [4]:
from pyspark.sql import SparkSession
import time

spark = SparkSession.builder.appName("RealEstateAnalysis").getOrCreate()

In [8]:
csv_file_path = '/sparkbook/project1/realtor-data.csv'

homesales_df = spark.read.format("csv").option("header", "true").option("inferSchema", "true").load(csv_file_path)

                                                                                

In [17]:
# Display total number of records in dataframe
homesales_df.count()

1401066

In [16]:
homesales_df.show()

+--------+---+----+--------+-------------+-----------+--------+----------+--------------+--------+
|  status|bed|bath|acre_lot|         city|      state|zip_code|house_size|prev_sold_date|   price|
+--------+---+----+--------+-------------+-----------+--------+----------+--------------+--------+
|for_sale|3.0| 2.0|    0.12|     Adjuntas|Puerto Rico|   601.0|     920.0|          NULL|105000.0|
|for_sale|4.0| 2.0|    0.08|     Adjuntas|Puerto Rico|   601.0|    1527.0|          NULL| 80000.0|
|for_sale|2.0| 1.0|    0.15|   Juana Diaz|Puerto Rico|   795.0|     748.0|          NULL| 67000.0|
|for_sale|4.0| 2.0|     0.1|        Ponce|Puerto Rico|   731.0|    1800.0|          NULL|145000.0|
|for_sale|6.0| 2.0|    0.05|     Mayaguez|Puerto Rico|   680.0|      NULL|          NULL| 65000.0|
|for_sale|4.0| 3.0|    0.46|San Sebastian|Puerto Rico|   612.0|    2520.0|          NULL|179000.0|
|for_sale|3.0| 1.0|     0.2|       Ciales|Puerto Rico|   639.0|    2040.0|          NULL| 50000.0|
|for_sale|

In [20]:
#Creating a temporary view of the DataFrame

homesales_df.createOrReplaceTempView('property_sales')

In [36]:
# INSIGHT ANALYSIS

# 1. Price Distribution by Property Status and Bedroom Count:

spark.sql("""
SELECT status, bed, format_number(AVG(price), 0) AS avg_price, format_number(MIN(price), 2) AS min_price, format_number(MAX(price), 2) AS max_price
FROM property_sales
GROUP BY status, bed
ORDER BY status, bed;
""").show()

+--------+----+---------+----------+--------------+
|  status| bed|avg_price| min_price|     max_price|
+--------+----+---------+----------+--------------+
|for_sale|NULL|  396,401|      0.00| 75,000,000.00|
|for_sale| 1.0|  541,886|      1.00| 79,000,000.00|
|for_sale| 2.0|  665,548|      1.00| 16,750,000.00|
|for_sale| 3.0|  667,687|      1.00| 35,000,000.00|
|for_sale| 4.0|  911,782|      0.00| 66,000,000.00|
|for_sale| 5.0|1,519,866|    500.00|135,000,000.00|
|for_sale| 6.0|1,607,437|    900.00|169,000,000.00|
|for_sale| 7.0|2,185,907|  1,000.00| 59,995,000.00|
|for_sale| 8.0|1,864,235| 19,900.00| 69,000,000.00|
|for_sale| 9.0|2,647,265| 44,900.00|875,000,000.00|
|for_sale|10.0|2,400,350| 75,000.00|100,000,000.00|
|for_sale|11.0|2,284,349| 65,000.00| 44,950,000.00|
|for_sale|12.0|2,445,001| 49,900.00| 69,950,000.00|
|for_sale|13.0|2,856,889| 75,900.00| 40,000,000.00|
|for_sale|14.0|4,321,260|195,000.00| 78,000,000.00|
|for_sale|15.0|2,614,437|299,000.00| 15,500,000.00|
|for_sale|16

                                                                                

In [38]:
# 2. Average Lot Size and House Size by City and State:

spark.sql("""
SELECT city, state, ROUND(AVG(acre_lot), 2) AS avg_acre_lot, ROUND(AVG(house_size), 2) AS avg_house_size
FROM property_sales
GROUP BY city, state
ORDER BY avg_acre_lot DESC, avg_house_size DESC;""").show(10)

[Stage 76:>                                                         (0 + 8) / 8]

+-------------+-----------+------------+--------------+
|         city|      state|avg_acre_lot|avg_house_size|
+-------------+-----------+------------+--------------+
|  Fultonville|   New York|     5446.15|       1948.36|
|      Crystal|      Maine|      3900.0|          NULL|
|  T13 R8 Wels|      Maine|      3669.0|          NULL|
|    Whitehall|   New York|     2989.45|       1647.09|
|    Kingsbury|   New York|     2847.45|       1924.84|
|      Whiting| New Jersey|     2411.32|        1328.2|
|     Simsbury|Connecticut|      2179.8|       2215.14|
|     Hato Rey|Puerto Rico|      1700.0|        1700.0|
|Scotch Plains| New Jersey|     1619.94|       3523.22|
|   T3 R1 Nbpp|      Maine|      1503.0|          NULL|
+-------------+-----------+------------+--------------+
only showing top 10 rows



                                                                                

In [48]:
# 3. Price Per Square Foot Analysis by Zip Code:

spark.sql("""
SELECT int(zip_code), format_number(AVG(price/house_size), 2) AS avg_price_per_sqft
FROM property_sales
WHERE house_size > 0
GROUP BY zip_code
ORDER BY avg_price_per_sqft DESC;
""").show(10)

+--------+------------------+
|zip_code|avg_price_per_sqft|
+--------+------------------+
|    2215|            997.29|
|   10027|            995.18|
|    5842|             99.98|
|   13440|             99.66|
|     795|             99.57|
|   13144|             99.49|
|   13114|             99.45|
|    4357|             99.39|
|    8609|             99.26|
|   14218|             99.26|
+--------+------------------+
only showing top 10 rows



                                                                                

In [53]:
# 4. Year-over-Year Price Growth by State:

spark.sql("""
SELECT state, YEAR(prev_sold_date) AS year, format_number(AVG(price), 2) AS avg_price
FROM property_sales
where YEAR(prev_sold_date) is not null
GROUP BY state, year
ORDER BY state, year;""").show(10)



+-----------+----+------------+
|      state|year|   avg_price|
+-----------+----+------------+
|Connecticut|1961|2,100,000.00|
|Connecticut|1963|  339,000.00|
|Connecticut|1965|  313,083.33|
|Connecticut|1966|  328,185.71|
|Connecticut|1967|  544,366.67|
|Connecticut|1968|  184,900.00|
|Connecticut|1970|2,850,000.00|
|Connecticut|1971|  303,999.40|
|Connecticut|1972|  379,000.00|
|Connecticut|1973|  534,764.00|
+-----------+----+------------+
only showing top 10 rows



                                                                                

In [56]:
# 5. Market Velocity: Time Between Listings and Previous Sale:

spark.sql("""
SELECT AVG(DATEDIFF(current_date, prev_sold_date)) AS avg_days_on_market
FROM property_sales
WHERE prev_sold_date IS NOT NULL;""").show()

+------------------+
|avg_days_on_market|
+------------------+
| 5338.398466366245|
+------------------+



In [62]:
# 6. Impact of Bathrooms to Bedrooms Ratio on Price:

spark.sql("""
SELECT ROUND(bath/bed, 2) AS bath_bed_ratio, format_number(AVG(price), 2) AS avg_price
FROM property_sales
WHERE bed > 0 AND bath > 0
GROUP BY bath_bed_ratio
ORDER BY avg_price DESC;
""").show(10)

+--------------+------------+
|bath_bed_ratio|   avg_price|
+--------------+------------+
|          0.26|  985,000.00|
|          0.42|  981,799.99|
|          0.64|  975,482.14|
|          3.33|   95,000.00|
|          0.68|9,950,000.00|
|          1.89|9,875,000.00|
|          1.29|9,569,547.88|
|          1.88|9,500,000.00|
|          1.26|9,385,000.00|
|          0.65|9,162,166.67|
+--------------+------------+
only showing top 10 rows



                                                                                

In [66]:
# 7. Frequency of Properties for Sale vs. Ready to Build by Location
# and  determinining the run time for this query:
start_time = time.time()

spark.sql("""
SELECT city, state, status, COUNT(*) AS count
FROM property_sales
where city is not NULL and status is not NULL
GROUP BY city, state, status
ORDER BY city, state, status;
""").show()

print("Time taken is: %s seconds" %(time.time() - start_time))



+------------------+-------------+--------+-----+
|              city|        state|  status|count|
+------------------+-------------+--------+-----+
|             Abbot|        Maine|for_sale|   15|
|          Aberdeen|   New Jersey|for_sale|  210|
|          Abington|Massachusetts|for_sale|  166|
|          Abington| Pennsylvania|for_sale|   13|
|           Absecon|   New Jersey|for_sale|  353|
| Absecon Highlands|   New Jersey|for_sale|    4|
|            Accord|     New York|for_sale|  362|
|              Acra|     New York|for_sale|   50|
|             Acton|        Maine|for_sale|   53|
|             Acton|Massachusetts|for_sale|  798|
|          Acushnet|Massachusetts|for_sale|   97|
|           Acworth|New Hampshire|for_sale|  131|
|             Adams|Massachusetts|for_sale|  935|
|             Adams|     New York|for_sale|  124|
|      Adams Center|     New York|for_sale|   42|
|Adamstown Township|        Maine|for_sale|    7|
|   Addisleigh Park|     New York|for_sale|   53|


                                                                                

In [67]:
# 8. Cache the the temporary table property_sales.

spark.sql('CACHE TABLE property_sales')

                                                                                

DataFrame[]

In [69]:
# 9. Running the query 7 using the cached data, calculate runtime and compare it to uncached runtime.

start_time = time.time()

spark.sql("""
SELECT city, state, status, COUNT(*) AS count
FROM property_sales
where city is not NULL and status is not NULL
GROUP BY city, state, status
ORDER BY city, state, status;
""").show(10)

print("Time taken with data cached is: %s seconds" %(time.time() - start_time))

+-----------------+-------------+--------+-----+
|             city|        state|  status|count|
+-----------------+-------------+--------+-----+
|            Abbot|        Maine|for_sale|   15|
|         Aberdeen|   New Jersey|for_sale|  210|
|         Abington|Massachusetts|for_sale|  166|
|         Abington| Pennsylvania|for_sale|   13|
|          Absecon|   New Jersey|for_sale|  353|
|Absecon Highlands|   New Jersey|for_sale|    4|
|           Accord|     New York|for_sale|  362|
|             Acra|     New York|for_sale|   50|
|            Acton|        Maine|for_sale|   53|
|            Acton|Massachusetts|for_sale|  798|
+-----------------+-------------+--------+-----+
only showing top 10 rows

Time taken with data cached is: 0.3111600875854492 seconds


In [70]:
# 10. Partitioning by the "state" field on the formatted parquet home sales data

homesales_df.write.partitionBy('state').parquet('parquet_property_sales', mode = 'overwrite')

                                                                                

In [73]:
# 11. Reading the formatted parquet data

parquet_df = spark.read.parquet('parquet_property_sales')

In [75]:
parquet_df.show(5)

+--------+---+----+--------+----------+--------+----------+--------------+--------+--------+
|  status|bed|bath|acre_lot|      city|zip_code|house_size|prev_sold_date|   price|   state|
+--------+---+----+--------+----------+--------+----------+--------------+--------+--------+
|for_sale|5.0| 3.0|    0.14|   Merrick| 11566.0|      NULL|    1993-02-22|894990.0|New York|
|for_sale|4.0| 3.0|    0.15|Hicksville| 11801.0|      NULL|          NULL|799999.0|New York|
|for_sale|5.0| 4.0|    0.18| Plainview| 11803.0|    2807.0|    1986-01-31|998000.0|New York|
|for_sale|4.0| 3.0|    0.18| Plainview| 11803.0|      NULL|    1986-04-04|925000.0|New York|
|for_sale|4.0| 4.0|    0.14|   Merrick| 11566.0|      NULL|          NULL|875000.0|New York|
+--------+---+----+--------+----------+--------+----------+--------------+--------+--------+
only showing top 5 rows



In [80]:
# 12. Runing the query 7 with the parquet DataFrame
# Determining the runtime and comparing it to the cached version.

start_time = time.time()

"""
SELECT city, state, status, COUNT(*) AS count
FROM parquet_df
where city is not NULL and status is not NULL
GROUP BY city, state, status
ORDER BY city, state, status;
"""

print("Time taken with parquet data is: %s seconds" %(time.time() - start_time))

# 0.00004172325 seconds

Time taken with parquet data is: 4.172325134277344e-05 seconds


In [82]:
spark.sql('UNCACHE TABLE property_sales')

DataFrame[]

In [83]:
# 13. Checking if the property_sales is no longer cached
if spark.catalog.isCached('property_sales'):
  print('home_sales is cached')
else:
  print('home_sales is not cached')

home_sales is not cached


In [86]:
# 14. #code snippet that demonstrates how to fetch executor information from the Spark REST API.

import requests
import json

def fetch_executor_details():
    # URL for the Spark REST API endpoint for executors
    spark_rest_api_url = 'http://localhost:4040/api/v1/applications'
    
    # Fetch list of applications
    apps_response = requests.get(spark_rest_api_url)
    apps = apps_response.json()
    
    if not apps:
        print("No applications found.")
        return
    
    # details from the first application
    app_id = apps[0]['id']
    
    # Construct URL to fetch executor details for the application
    executors_url = f'{spark_rest_api_url}/{app_id}/executors'
    
    # Fetch executor details
    executors_response = requests.get(executors_url)
    executors = executors_response.json()
    
    # Pretty print the executor details
    print(json.dumps(executors, indent=4))

# Call the function to fetch and print executor details
fetch_executor_details()

[
    {
        "id": "driver",
        "hostPort": "172.25.195.172:38923",
        "isActive": true,
        "rddBlocks": 0,
        "memoryUsed": 35109,
        "diskUsed": 0,
        "totalCores": 8,
        "maxTasks": 8,
        "activeTasks": 0,
        "failedTasks": 0,
        "completedTasks": 514,
        "totalTasks": 514,
        "totalDuration": 8981395,
        "totalGCTime": 2699,
        "totalInputBytes": 5298459785,
        "totalShuffleRead": 7474579,
        "totalShuffleWrite": 7474579,
        "isBlacklisted": false,
        "maxMemory": 455501414,
        "addTime": "2024-02-27T06:05:53.529GMT",
        "executorLogs": {},
        "memoryMetrics": {
            "usedOnHeapStorageMemory": 35109,
            "usedOffHeapStorageMemory": 0,
            "totalOnHeapStorageMemory": 455501414,
            "totalOffHeapStorageMemory": 0
        },
        "blacklistedInStages": [],
        "peakMemoryMetrics": {
            "JVMHeapMemory": 435197760,
            "JVMOff

In [None]:
spark.stop()