In [7]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as func
import matplotlib.pyplot as plt
import seaborn as sns
import os
import sys
from pyspark.sql import functions as F  #filtering
import geopandas as gpd
import folium
import pandas as pd
import matplotlib.pyplot as plt

In [8]:
# starting a Spark session
spark = (
    SparkSession.builder.appName('Parkres Further Analysis')
    .config("spark.sql.repl.eagerEval.enabled", True)
    .config("spark.sql.session.timeZone", "Etc/UTC")
    .config("spark.driver.memory", "2g")
    .config("spark.executor.memory", "4g")
    .getOrCreate()
)

# Read the dataset from a CSV file using PySpark
parkres = spark.read.csv('../data/curated/parkres/parkres.csv', header=True, inferSchema=True)

# Drop the extra index column (_c0) if it exists
parkres = parkres.drop('_c0')

# Show the first few rows of the dataset to confirm
parkres.show(5)

                                                                                

+--------------------+--------------------+--------+--------------------+
|                name|            sa2_name|postcode|            geometry|
+--------------------+--------------------+--------+--------------------+
|Lilydale-Warburto...|        Yarra Valley|    3139|POLYGON ((1034153...|
|Nangana Bushland ...|        Yarra Valley|    3139|POLYGON ((1022203...|
|Nillumbik G139 Bu...|Wattle Glen - Dia...|    3089|POLYGON ((989912....|
|Lilydale-Warburto...|Lilydale - Coldst...|    3140|POLYGON ((1005216...|
|Plenty Gorge Park...|  Plenty - Yarrambat|    3088|POLYGON ((983018....|
+--------------------+--------------------+--------+--------------------+
only showing top 5 rows



In [9]:
# read the domain parquet dataset
domain = spark.read.parquet('../data/landing/domain_data/domain.parquet')
domain.limit(5)

url,price,address,property_type,latitude,longitude,Beds,Baths,Parking,bond,extracted_price
https://www.domai...,"$1,400.00","10 Allara Court, ...",Townhouse,-37.77427300000001,145.1811258,4.0,3.0,2.0,9125.0,1400.0
https://www.domai...,$750 per week,"7 Pine Ridge, Don...",House,-37.7912513,145.1756489,4.0,2.0,0.0,3259.0,750.0
https://www.domai...,$1300 per week,"20 Mulsanne Way, ...",House,-37.7972323,145.1812636,5.0,2.0,2.0,5649.0,1300.0
https://www.domai...,$825pw / $3585pcm,3 Monterey Cresce...,House,-37.792402,145.1743233,3.0,1.0,5.0,3585.0,825.0
https://www.domai...,$680.00,3/49 Leslie Stree...,Townhouse,-37.7810117,145.180705,3.0,2.0,2.0,2955.0,680.0


In [12]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from math import radians, sin, cos, sqrt, atan2

# First, let's inspect our data
print("parkres schema:")
parkres.printSchema()
print("\nSample parkres data:")
parkres.show(5, truncate=False)

print("\ndomain schema:")
domain.printSchema()
print("\nSample domain data:")
domain.show(5, truncate=False)

# Assuming the geometry is in the format "POINT(longitude latitude)"
parkres = parkres.withColumn("coordinates", F.split(F.regexp_extract(F.col("geometry"), r"POINT\((.*?)\)", 1), " "))
parkres = parkres.withColumn("park_long", F.col("coordinates")[0].cast(DoubleType()))
parkres = parkres.withColumn("park_lat", F.col("coordinates")[1].cast(DoubleType()))

# Check the results of our geometry parsing
print("\nparkres after parsing geometry:")
parkres.select("name", "geometry", "park_long", "park_lat").show(5, truncate=False)

# Haversine distance function with null checks
def haversine_distance(lat1, lon1, lat2, lon2):
    if lat1 is None or lon1 is None or lat2 is None or lon2 is None:
        return None
    
    R = 6371  # Earth's radius in kilometers

    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1

    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * atan2(sqrt(a), sqrt(1-a))
    distance = R * c

    return distance

# Register UDF
distance_udf = F.udf(haversine_distance, DoubleType())

# Cross join domain and parkres to calculate distances
crossed = domain.crossJoin(parkres)

# Calculate distances
with_distances = crossed.withColumn("distance_to_park", 
    distance_udf(F.col("latitude"), F.col("longitude"), 
                 F.col("park_lat"), F.col("park_long")))

# Check the results of our distance calculation
print("\nSample of distance calculations:")
with_distances.select("url", "name", "latitude", "longitude", "park_lat", "park_long", "distance_to_park").show(5, truncate=False)

# Find nearest park for each property
nearest_park = with_distances.groupBy("url") \
    .agg(F.min("distance_to_park").alias("nearest_park_distance"))

# Join back to get full property data with nearest park distance
result = domain.join(nearest_park, on="url")

# Group by distance ranges and calculate average price
analysis = result.groupBy(F.round(F.col("nearest_park_distance")).alias("distance_km")) \
    .agg(F.avg("extracted_price").alias("avg_price"), 
         F.count("url").alias("property_count")) \
    .orderBy("distance_km")

print("\nFinal analysis:")
analysis.show()

parkres schema:
root
 |-- name: string (nullable = true)
 |-- sa2_name: string (nullable = true)
 |-- postcode: integer (nullable = true)
 |-- geometry: string (nullable = true)
 |-- park_lat: double (nullable = true)
 |-- park_long: double (nullable = true)
 |-- coordinates: array (nullable = true)
 |    |-- element: string (containsNull = false)


Sample parkres data:
+-------------------------------+---------------------------+--------+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

[Stage 28:>                                                         (0 + 1) / 1]

+-----------+-----------------+--------------+
|distance_km|        avg_price|property_count|
+-----------+-----------------+--------------+
|       NULL|679.4782640182082|          9828|
+-----------+-----------------+--------------+



                                                                                

In [10]:
from pyspark.sql import functions as F
from pyspark.sql.types import DoubleType
from math import radians, sin, cos, sqrt, atan2

# Convert parkres geometry to lat/long
# This is a placeholder - you'll need to implement the actual conversion
parkres = parkres.withColumn("park_lat", F.udf(lambda x: extract_lat(x), DoubleType())("geometry")) \
                 .withColumn("park_long", F.udf(lambda x: extract_long(x), DoubleType())("geometry"))

# Haversine distance function
def haversine_distance(lat1, lon1, lat2, lon2):
    R = 6371  # Earth's radius in kilometers

    lat1, lon1, lat2, lon2 = map(radians, [lat1, lon1, lat2, lon2])
    dlat = lat2 - lat1
    dlon = lon2 - lon1

    a = sin(dlat/2)**2 + cos(lat1) * cos(lat2) * sin(dlon/2)**2
    c = 2 * atan2(sqrt(a), sqrt(1-a))
    distance = R * c

    return distance

# Register UDF
distance_udf = F.udf(haversine_distance, DoubleType())

# Cross join domain and parkres to calculate distances
crossed = domain.crossJoin(parkres)

# Calculate distances
with_distances = crossed.withColumn("distance_to_park", 
    distance_udf(F.col("latitude"), F.col("longitude"), 
                 F.col("park_lat"), F.col("park_long")))

# Find nearest park for each property
nearest_park = with_distances.groupBy("url") \
    .agg(F.min("distance_to_park").alias("nearest_park_distance"))

# Join back to get full property data with nearest park distance
result = domain.join(nearest_park, on="url")

# Group by distance ranges and calculate average price
analysis = result.groupBy(F.round(F.col("nearest_park_distance")).alias("distance_km")) \
    .agg(F.avg("extracted_price").alias("avg_price"), 
         F.count("url").alias("property_count")) \
    .orderBy("distance_km")

analysis.show()

# Optional: Perform more advanced statistical analysis

24/09/19 12:36:18 ERROR PythonUDFRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/home/franklin/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 1225, in main
    eval_type = read_int(infile)
  File "/home/franklin/.local/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 596, in read_int
    raise EOFError
EOFError

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:572)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:94)
	at org.apache.spark.sql.execution.python.BasePythonUDFRunner$$anon$1.read(PythonUDFRunner.scala:75)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:525)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collec

PythonException: 
  An exception was thrown from the Python worker. Please see the stack trace below.
Traceback (most recent call last):
  File "/tmp/ipykernel_40549/747276159.py", line 7, in <lambda>
NameError: name 'extract_lat' is not defined
