In [0]:
from pyspark.sql import SparkSession, Row
from pyspark.sql.types import StructType, StructField, StringType, DoubleType
from pyspark.sql import functions as F
from pyspark.sql.functions import udf
import random
import math

spark = SparkSession.builder.appName("DemoCityData").getOrCreate()

# Creating the demo data
municipalities = ["Tampere", "Helsinki", "Espoo", "Turku"]
streets = ["Lastutie", "Urheilutie", "Peltotie", "Mets채tie", "Sep채ntie", "Korpilontie", "Vanha H채rk채tie"]
postal_codes = ["31400", "00100", "02100", "20100"]
building_uses = ["1", "2"]

data = []
# adding 200 buildings to the data
for i in range(1, 201): 
    muni = random.choice(municipalities)
    street = random.choice(streets)
    postal = random.choice(postal_codes)
    lat = round(60 + random.random(), 6)
    lon = round(22 + random.random(), 6)
    house_number = str(random.randint(1, 50))
    building_id = f"B{i:06d}"
    use = random.choice(building_uses)
    data.append((building_id, "02", muni, street, house_number, postal, lat, lon, use))

schema = StructType([
    StructField("building_id", StringType(), True),
    StructField("region", StringType(), True),
    StructField("municipality", StringType(), True),
    StructField("street", StringType(), True),
    StructField("house_number", StringType(), True),
    StructField("postal_code", StringType(), True),
    StructField("latitude_wgs84", DoubleType(), True),
    StructField("longitude_wgs84", DoubleType(), True),
    StructField("building_use", StringType(), True)
])

# adding kampusareena to the data
data.append(("101060573F", "02", "Tampere", "Kampusareenakatu", "1", "33720", 60.623, 23.510, "1"))

locationDF = spark.createDataFrame(data, schema=schema)



In [0]:
kampusareenaBuildingId: str = "101060573F"

# Haversine function for calculating distance between two points
def haversine(lat1: float, lon1: float, lat2: float, lon2: float) -> float:
    R: float = 6378.1  # radius of Earth in kilometers
    phi1 = math.radians(lat1)
    phi2 = math.radians(lat2)
    deltaPhi = math.radians(lat2 - lat1)
    deltaLambda = math.radians(lon2 - lon1)

    a = (
        math.sin(deltaPhi * deltaPhi / 4.0) +
        math.cos(phi1) * math.cos(phi2) * math.sin(deltaLambda * deltaLambda / 4.0)
    )
    return 2 * R * math.atan2(math.sqrt(a), math.sqrt(1 - a))

# Count distinct postal code areas per municipality
areas = locationDF.groupBy("municipality").agg(
F.countDistinct("postal_code").alias("areas")
)

# Count streets and buildings per municipality
buildings = locationDF.groupBy("municipality").agg(
F.countDistinct("street").alias("streets"),
F.countDistinct("building_id").alias("buildings")
)

# Combine metrics and calculate buildings per area
municipalityDF = areas.join(buildings, on="municipality") \
.withColumn("buildings_per_area", F.round(F.col("buildings") / F.col("areas"), 1))

kampusareena = locationDF.filter(F.col("building_id") == kampusareenaBuildingId).first()

# Ensure Kampusareena is found
if kampusareena is None:
    raise ValueError("Kampusareena not found")

kampus_lat = kampusareena["latitude_wgs84"]
kampus_lon = kampusareena["longitude_wgs84"]

# UDF to calculate distance to Kampusareena
kampus_distance = udf(lambda lat, lon: haversine(lat, lon, kampus_lat, kampus_lon), DoubleType())

# Add distance column to all buildings
locationDF = locationDF.withColumn("distance_to_kampus", kampus_distance("latitude_wgs84", "longitude_wgs84"))

# Find minimum distance to Kampusareena per municipality
min_distance = locationDF.groupBy("municipality").agg(
F.round(F.min("distance_to_kampus"), 1).alias("min_distance")
)

# Final result: top 10 municipalities by building density
municipalityDF = municipalityDF.join(min_distance, on="municipality") \
.orderBy(F.desc("buildings_per_area")).limit(10)

In [0]:
print("The 10 municipalities with the highest buildings per area (postal code) ratio:")
municipalityDF.show(truncate=False)

The 10 municipalities with the highest buildings per area (postal code) ratio:
+------------+-----+-------+---------+------------------+------------+
|municipality|areas|streets|buildings|buildings_per_area|min_distance|
+------------+-----+-------+---------+------------------+------------+
|Helsinki    |4    |7      |50       |12.5              |34.8        |
|Espoo       |4    |7      |49       |12.3              |31.6        |
|Turku       |4    |7      |46       |11.5              |35.2        |
|Tampere     |5    |8      |56       |11.2              |0.0         |
+------------+-----+-------+---------+------------------+------------+



In [0]:
# Finds the closest building to the average of all buildings in the given municipality.
def closest_to_average(df, filter_col, filter_val):
    
    # Filter relevant rows and remove duplicate buildings
    base = df.filter(F.col(filter_col) == filter_val).dropDuplicates(["building_id"])

    # Compute average latitude and longitude
    avg = base.agg(
        F.avg("latitude_wgs84").alias("avg_lat"),
        F.avg("longitude_wgs84").alias("avg_lon")
    ).first()

    # Ensure that valid data exists
    if (
        avg is None or
        avg["avg_lat"] is None or
        avg["avg_lon"] is None
    ):
        raise ValueError(f"No valid data for {filter_val}")

    # Create UDF for calculating distance
    avg_udf = F.udf(
        lambda lat, lon: haversine(lat, lon, avg["avg_lat"], avg["avg_lon"]),
        DoubleType()
    )

    # Calculate distance for each building
    with_dist = base.withColumn(
        "dist_to_avg",
        avg_udf("latitude_wgs84", "longitude_wgs84")
    )

    # Select the building closest to the average
    closest = with_dist.orderBy("dist_to_avg").first()

    # Return address and distance as a dictionary
    return {
        "address": f"{closest['street']} {closest['house_number']}",
        "distance": float(f"{closest['dist_to_avg']:.3f}")
    }

tampere = closest_to_average(locationDF, "municipality", "Tampere")
tampereAddress = tampere["address"]
tampereDistance = tampere["distance"]

hervanta = closest_to_average(locationDF, "postal_code", "33720")
hervantaAddress = hervanta["address"]
hervantaDistance = hervanta["distance"]


In [0]:
print(f"The address closest to the average location in Tampere: '{tampereAddress}' at ({tampereDistance} km)")
print(f"The address closest to the average location in Hervanta: '{hervantaAddress}' at ({hervantaDistance} km)")

The address closest to the average location in Tampere: 'Lastutie 6' at (7.63 km)
The address closest to the average location in Hervanta: 'Kampusareenakatu 1' at (0.0 km)
