In [None]:
#!pip install pyspark
#!pip install folium

In [None]:
# disable warnings
import warnings
warnings.filterwarnings('ignore')


In [2]:
from pyspark.sql.session import SparkSession
from pyspark.sql.functions import udf, dense_rank, count, when, isnan, col, avg, row_number, monotonically_increasing_id,udf
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType
from pyspark.ml.feature import VectorAssembler, MinMaxScaler
from pyspark.ml.clustering import KMeans
from pyspark.sql.window import Window
import folium
from pyspark.sql import functions as F
from pyspark.ml.linalg import Vectors, VectorUDT

In [3]:
session=SparkSession.builder.appName("DC_crime").master("local[3]").getOrCreate()

24/03/10 19:29:09 WARN Utils: Your hostname, M2-Piton.local resolves to a loopback address: 127.0.0.1; using 10.216.194.237 instead (on interface en0)
24/03/10 19:29:09 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/10 19:29:10 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Define schema

In [4]:
schema_bnb = "id INT, log_price DOUBLE, property_type STRING, room_type STRING, accommodates INT, bathrooms DOUBLE, bed_type STRING, cancellation_policy STRING, cleaning_fee BOOLEAN, city STRING, description STRING, first_review STRING, host_has_profile_pic BOOLEAN, host_identity_verified BOOLEAN, host_response_rate STRING, host_since STRING, instant_bookable BOOLEAN, last_review STRING, latitude FLOAT, longitude FLOAT, name STRING, neighbourhood STRING, number_of_reviews INT, review_scores_rating DOUBLE, thumbnail_url STRING, zipcode STRING, bedrooms DOUBLE, beds DOUBLE"
schema_crime = StructType([
    StructField("offense_group", StringType(), True),
    StructField("census_tract", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("end_date", StringType(), True),
    StructField("offense_text", StringType(), True),
    StructField("shift", StringType(), True),
    StructField("district", DoubleType(), True),
    StructField("yblock", DoubleType(), True),
    StructField("ward", DoubleType(), True),
    StructField("year", IntegerType(), True),
    StructField("offensekey", StringType(), True),
    StructField("bid", StringType(), True),
    StructField("sector", StringType(), True),
    StructField("psa", IntegerType(), True),
    StructField("ucr_rank", IntegerType(), True),
    StructField("block_group", StringType(), True),
    StructField("voting_precinct", StringType(), True),
    StructField("xblock", DoubleType(), True),
    StructField("block", StringType(), True),
    StructField("start_date", TimestampType(), True),
    StructField("ccn", IntegerType(), True),
    StructField("offense", StringType(), True),
    StructField("anc", StringType(), True),
    StructField("report_date", TimestampType(), True),
    StructField("method", StringType(), True),
    StructField("location", StringType(), True),
    StructField("latitude", DoubleType(), True)
])

Import data

In [5]:
dc_bnb = session.read.csv("DC_Airbnb_Data2.csv", header=True,schema=schema_bnb, sep=";")
crime_data = session.read.csv("crime_dc2.csv", header=True, schema=schema_crime, sep=";")

Explore data

In [6]:
dc_bnb.show(truncate=False)

print((dc_bnb.count(), len(dc_bnb.columns)))

24/03/10 19:29:16 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+--------+------------------+-------------+---------------+------------+---------+--------+-------------------+------------+----+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------

In [7]:
crime_data.show()

# print number of rows and columns
print((crime_data.count(), len(crime_data.columns)))

+-------------+------------+--------------+-------------------+--------------------+--------+--------+---------------+----+----+--------------------+--------------------+------+----+--------+-----------+---------------+--------------+--------------------+-------------------+--------+--------------------+---+-------------------+------+--------------------+-------------+
|offense_group|census_tract|     longitude|           end_date|        offense_text|   shift|district|         yblock|ward|year|          offensekey|                 bid|sector| psa|ucr_rank|block_group|voting_precinct|        xblock|               block|         start_date|     ccn|             offense|anc|        report_date|method|            location|     latitude|
+-------------+------------+--------------+-------------------+--------------------+--------+--------+---------------+----+----+--------------------+--------------------+------+----+--------+-----------+---------------+--------------+--------------------+-

# Create a neighborhood map

In [8]:
# how many unique values are in the column "neighbourhood"
dc_bnb.select("neighbourhood").distinct().count()

146

We need to re-define how neighborhoods are created. 
We will use clustering based on the latitude and longitude of the houses to define the neighborhoods.

In [10]:
# print null values in the column "latitude" and "longitude"
columns = ["latitude", "longitude", "neighbourhood"]

dc_bnb.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in columns]).show()

# remove null values in the column "latitude" and "longitude" and "neighbourhood"
dc_bnb = dc_bnb.na.drop(subset=["latitude", "longitude", "neighbourhood"])

+--------+---------+-------------+
|latitude|longitude|neighbourhood|
+--------+---------+-------------+
|      29|       11|          789|
+--------+---------+-------------+



In [11]:
# cluster the neighbourhoods based on the latitude and longitude
vec_assembler = VectorAssembler(inputCols=["latitude", "longitude"], outputCol="features")
vec_df = vec_assembler.transform(dc_bnb)

kmeans = KMeans(k=30, seed=2)               # 30 clusters
model = kmeans.fit(vec_df.select("features"))

fitted = model.transform(vec_df)

centers = model.clusterCenters()

# assign the cluster number to each neighbourhood in the dataframe
fitted = fitted.withColumnRenamed("prediction", "cluster")

# join the original dataframe with the cluster number
dc_bnb = dc_bnb.join(fitted.select("id", "cluster"), "id")

24/03/10 19:30:03 WARN InstanceBuilder: Failed to load implementation from:dev.ludovic.netlib.blas.JNIBLAS


Now define a new neighborhood column as the most common neighborhood in the given cluster.

In [12]:
# Group by cluster and neighborhood, count occurrences
cluster_neighborhood_counts = dc_bnb.groupBy("cluster", "neighbourhood").agg(count("*").alias("count"))

# Rank neighborhoods within each cluster based on count
window_spec = Window.partitionBy("cluster").orderBy(col("count").desc())
ranked_neighborhoods = cluster_neighborhood_counts.withColumn("rank", dense_rank().over(window_spec))

# Filter to get the most common neighborhood for each cluster
most_common_neighborhoods = ranked_neighborhoods.filter(col("rank") == 1)

# Create a dictionary to map cluster to most common neighborhood
cluster_to_neighborhood_map = most_common_neighborhoods.select("cluster", "neighbourhood").rdd.collectAsMap()

# Define a UDF to map cluster to most common neighborhood
def map_cluster_to_neighborhood(cluster):
    return cluster_to_neighborhood_map.get(cluster, None)

# Apply the UDF to create a new column "new_neighborhood" based on the most common neighborhood in each cluster
map_cluster_to_neighborhood_udf = udf(map_cluster_to_neighborhood, StringType())
dc_bnb = dc_bnb.withColumn("new_neighborhood", map_cluster_to_neighborhood_udf(col("cluster")))
dc_bnb.show()

[Stage 75:>                                                         (0 + 1) / 1]

+--------+------------------+---------------+---------------+------------+---------+-------------+-------------------+------------+----+--------------------+------------+--------------------+----------------------+------------------+----------+----------------+-----------+---------+----------+--------------------+--------------------+-----------------+--------------------+--------------------+-------+--------+----+-------+--------------------+
|      id|         log_price|  property_type|      room_type|accommodates|bathrooms|     bed_type|cancellation_policy|cleaning_fee|city|         description|first_review|host_has_profile_pic|host_identity_verified|host_response_rate|host_since|instant_bookable|last_review| latitude| longitude|                name|       neighbourhood|number_of_reviews|review_scores_rating|       thumbnail_url|zipcode|bedrooms|beds|cluster|    new_neighborhood|
+--------+------------------+---------------+---------------+------------+---------+-------------+------

                                                                                

In [13]:
# show the count of observations in each new neighborhood
dc_bnb.groupBy("new_neighborhood").count().show()

+--------------------+-----+
|    new_neighborhood|count|
+--------------------+-----+
|          Georgetown|  224|
|           Anacostia|   55|
|        Adams Morgan|  301|
|    Columbia Heights|  212|
|        Kingman Park|  111|
|American Universi...|   58|
|        Bloomingdale|  292|
|                Shaw|  233|
|   U Street Corridor|  226|
|         Glover Park|  151|
|            Fairlawn|   71|
|           Brookland|  104|
|       Michigan Park|   61|
|            Petworth|  136|
|         Chevy Chase|   79|
|Southwest Waterfront|  135|
|        Logan Circle|  307|
|Near Northeast/H ...|  453|
| 16th Street Heights|  122|
|        Capitol Hill|  488|
+--------------------+-----+
only showing top 20 rows



Now we have to assign the neighborhood to crime data.

In [14]:
# for each new neighborhood, compute the center of the cluster based on the latitude and longitude
neighborhood_center_coordinates = dc_bnb.groupBy("new_neighborhood").agg(avg("latitude").alias("latitude"), avg("longitude").alias("longitude"))
neighborhood_center_coordinates.show()

+--------------------+------------------+------------------+
|    new_neighborhood|          latitude|         longitude|
+--------------------+------------------+------------------+
|          Georgetown| 38.90388653959547|-77.05444312095642|
|           Anacostia|38.852218281139024|-76.99205391623757|
|        Adams Morgan|38.927533007143346|  -77.039559069662|
|    Columbia Heights|38.930930767419206|-77.02662881815209|
|        Kingman Park| 38.89536790590029|-76.97802418201893|
|American Universi...|38.933861370744374|-77.09414883317619|
|        Bloomingdale| 38.91581259688286| -77.0079067439249|
|                Shaw| 38.91303068169197|-77.02005754724593|
|   U Street Corridor| 38.91938486352431|-77.03117985008038|
|         Glover Park|  38.9179904255646|-77.07475048343078|
|            Fairlawn| 38.86675154994911|-76.96971635415521|
|           Brookland|38.928054075974686|-76.98877972822923|
|       Michigan Park| 38.94807390306817|-76.99399191434266|
|            Petworth| 3

In [15]:
# Add a unique index to the dataset
crime_data = crime_data.withColumn("crime_id", monotonically_increasing_id())

Cluster the crime data based on the distance (in terms of latitude and longitude) from the center of the neighborhood.


In [16]:
vec_assembler = VectorAssembler(inputCols=["latitude", "longitude"], outputCol="features")
vec_df = vec_assembler.transform(crime_data)

kmeans = KMeans(k=30, seed=2)
model = kmeans.fit(vec_df.select("features"))

fitted = model.transform(vec_df)

# print the cluster centers
centers = model.clusterCenters()
# assign the cluster number to each neighbourhood in the dataframe
fitted = fitted.withColumnRenamed("prediction", "cluster")

# join the original dataframe with the cluster number
crime_data = crime_data.join(fitted.select("crime_id", "cluster"), "crime_id")

In [17]:
# for each cluster in the crime data, compute the center of the cluster based on the latitude and longitude
crime_center_coordinates = crime_data.groupBy("cluster").agg(avg("latitude").alias("latitude_clu"), avg("longitude").alias("longitude_clu"))
crime_center_coordinates.show()

+-------+------------------+------------------+
|cluster|      latitude_clu|     longitude_clu|
+-------+------------------+------------------+
|     28| 38.85250677095168| -76.9878469243565|
|     27|38.954918065109915|-77.02657264882089|
|     26| 38.90335079418653|-76.98059528752184|
|     12| 38.88458040934941|-76.98865126578049|
|     22| 38.97393770317896| -77.0247385184189|
|      1|38.868630951655994|-76.95615755703474|
|     13| 38.92033886500375|-77.04307064795735|
|     16|38.862529700608626|-76.97233763706753|
|      6| 38.83106525440506|-77.00217776837682|
|      3| 38.89848481491638|-76.99563641180454|
|     20|38.925792903061144|-76.99341110993473|
|      5| 38.89957718336734|-76.92701846201275|
|     19|  38.9360899713567|-77.02381512057306|
|     15|38.909001243251616|-77.00565358007367|
|      9| 38.92493707872363|-76.96730975463903|
|     17| 38.87675184598807|  -77.005565934766|
|      4| 38.95616168489807|-77.07313133666398|
|      8| 38.90482569280678|-77.04542952

Use euclidean distance to calculate the distance between the crime and the center of the neighborhood.

In [18]:
# Define the distance calculation function
def calculate_distance(lat1, lon1, lat2, lon2):
    return ((lat1 - lat2) ** 2 + (lon1 - lon2) ** 2) ** 0.5

# Register the function as a UDF
distance_udf = udf(calculate_distance, DoubleType())

# Cross join the two dataframes to calculate the distance between each neighborhood center and each crime cluster center
distance_df = crime_center_coordinates.crossJoin(neighborhood_center_coordinates)

# Calculate the distance between each neighborhood center and each crime cluster center
distance_df = distance_df.withColumn("distance", distance_udf(col("latitude"), col("longitude"), col("latitude_clu"), col("longitude_clu")))

# For each crime cluster, find the closest neighborhood center
window_spec = Window.partitionBy("cluster").orderBy(col("distance"))
closest_neighborhood = distance_df.withColumn("row_number", row_number().over(window_spec)).filter(col("row_number") == 1).drop("row_number", "latitude_clu", "longitude_clu", "distance")
closest_neighborhood.show()

+-------+--------------------+------------------+------------------+
|cluster|    new_neighborhood|          latitude|         longitude|
+-------+--------------------+------------------+------------------+
|      0|       Michigan Park| 38.94807390306817|-76.99399191434266|
|      1|            Fairlawn| 38.86675154994911|-76.96971635415521|
|      2| Mount Vernon Square|38.901467135974336|-77.01809903553554|
|      3|Near Northeast/H ...| 38.90098940615622| -76.9943667784432|
|      4|         Chevy Chase|38.955296625064896| -77.0708621061301|
|      5|    Marshall Heights| 38.89174458517957|-76.93714244330107|
|      6|           Anacostia|38.852218281139024|-76.99205391623757|
|      7|   U Street Corridor| 38.91938486352431|-77.03117985008038|
|      8|          Georgetown| 38.90388653959547|-77.05444312095642|
|      9|           Woodridge| 38.93119123870251|-76.96762414072074|
|     10|    Marshall Heights| 38.89174458517957|-76.93714244330107|
|     11|American Universi...|38.9

In [19]:
# add to crime_data the closest neighborhood center for each crime cluster
crime_data = crime_data.join(closest_neighborhood, "cluster")
crime_data.show()

                                                                                

+-------+--------+-------------+------------+--------------+-------------------+--------------------+--------+--------+-----------------+----+----+--------------------+---------+------+----+--------+-----------+---------------+-----------------+--------------------+-------------------+--------+--------------------+---+-------------------+------+--------------------+-------------+----------------+------------------+------------------+
|cluster|crime_id|offense_group|census_tract|     longitude|           end_date|        offense_text|   shift|district|           yblock|ward|year|          offensekey|      bid|sector| psa|ucr_rank|block_group|voting_precinct|           xblock|               block|         start_date|     ccn|             offense|anc|        report_date|method|            location|     latitude|new_neighborhood|          latitude|         longitude|
+-------+--------+-------------+------------+--------------+-------------------+--------------------+--------+--------+-----

Clean columns that are not needed.

In [20]:
columns_to_drop_crime = ["cluster", "neighborhood_cluster", "census_tract", 'longitude', 'end_date', 'district', 'yblock', 'ward', 'year', 'bid', 'sector', 'psa', 'block_group', 'voting_precinct', 'xblock', 'block', 'start_date', 'ccn', 'anc', 'report_date', 'location','latitude', 'longitude']
crime_data = crime_data.drop(*columns_to_drop_crime)

In [21]:
columns_to_drop_bnb = ["thumbnail_url", "zipcode", "cluster", 'neighbourhood']
dc_bnb = dc_bnb.drop(*columns_to_drop_bnb)

In [22]:
# print dimesion of the data
print((dc_bnb.count(), len(dc_bnb.columns)))
print((crime_data.count(), len(crime_data.columns)))

(4870, 26)
(26091, 9)


Take the exponential of the log price to get the original price.

In [23]:
dc_bnb = dc_bnb.withColumn("price", when(col("log_price") > 0, 2.718 ** col("log_price")).otherwise(0))

Price distribution across neighborhoods

In [24]:
# plot the average price for each neighborhood
dc_bnb.groupBy("new_neighborhood").agg(avg("price").alias("avg_price")).orderBy("avg_price").show()

+--------------------+------------------+
|    new_neighborhood|         avg_price|
+--------------------+------------------+
|          Manor Park|120.02171402513397|
|    Marshall Heights|120.74042241670718|
|       Michigan Park|  125.932614051981|
|            Petworth|138.43042695625576|
|            Fairlawn|159.66697643286494|
|           Brookland| 165.2592827902537|
|       Shepherd Park| 171.5478247922781|
|    Columbia Heights|172.45226232830908|
|        Bloomingdale| 172.4909208492714|
|           Anacostia| 175.3850905272893|
|           Woodridge| 186.4942256435799|
|         Chevy Chase|188.91464030155802|
|American Universi...|189.81965475194588|
|                Shaw|198.08953713339665|
|        Adams Morgan|198.46705661750568|
|        Kingman Park|199.76182887011595|
|       Dupont Circle|207.20646672525297|
|      Cleveland Park|211.17781530870298|
| 16th Street Heights|213.91819934970385|
|          Georgetown|218.18933817250868|
+--------------------+------------

# Final dataset for the dashboard

Now it's time to create the final dataset for the dashboard by aggregating housing data and crime data for each neighborhood.

Rearrange house data by neighborhood.

In [25]:
airbnb = dc_bnb.groupBy("new_neighborhood") \
    .agg(
        F.count("*").alias("total_listings"),
        F.count(F.when(F.col("property_type") == "Apartment", 1)).alias("apartments"),
        F.count(F.when(F.col("property_type") == "House", 1)).alias("houses"),
        F.count(F.when(F.col("property_type") == "Condominium", 1)).alias("condos"),
        F.count(F.when(F.col("property_type") == "Loft", 1)).alias("lofts"),
        F.count(F.when(F.col("room_type") == "Entire home/apt", 1)).alias("entire_home_apt"),
        F.count(F.when(F.col("room_type") == "Private room", 1)).alias("private_room"),
        F.count(F.when(F.col("room_type") == "Shared room", 1)).alias("shared_room"),
        F.round(F.avg("accommodates"), 1).alias("avg_accommodates"),
        F.round(F.avg("bedrooms"), 1).alias("avg_bedrooms"),
        F.count(F.when(F.col("cancellation_policy") == "moderate", 1)).alias("moderate"),
        F.count(F.when(F.col("cancellation_policy") == "flexible", 1)).alias("flexible"),
        F.count(F.when(F.col("cancellation_policy") == "strict", 1)).alias("strict"),
        F.round(F.avg("number_of_reviews"), 2).alias("avg_reviews"),
        F.round(F.avg("review_scores_rating"), 2).alias("avg_review_score"),
        F.round(F.avg("price"), 2).alias("avg_price"),
        F.round(F.min("price"), 2).alias("min_price"),
        F.round(F.max("price"), 2).alias("max_price")
    )

airbnb.show()

+--------------------+--------------+----------+------+------+-----+---------------+------------+-----------+----------------+------------+--------+--------+------+-----------+----------------+---------+---------+---------+
|    new_neighborhood|total_listings|apartments|houses|condos|lofts|entire_home_apt|private_room|shared_room|avg_accommodates|avg_bedrooms|moderate|flexible|strict|avg_reviews|avg_review_score|avg_price|min_price|max_price|
+--------------------+--------------+----------+------+------+-----+---------------+------------+-----------+----------------+------------+--------+--------+------+-----------+----------------+---------+---------+---------+
|          Georgetown|           224|       153|    35|    15|    1|            180|          36|          8|             3.1|         1.1|      51|      72|    81|      19.13|           93.32|   218.19|     10.0|  1948.47|
|           Anacostia|            55|         8|    34|     6|    1|             21|          34|       

Do the same for crime data.

In [26]:
crime = crime_data.groupBy("new_neighborhood") \
    .agg(
        F.count("*").alias("total_crimes"),
        F.count(F.when(F.col("offense_group") == "property", 1)).alias("property_crimes"),
        F.count(F.when(F.col("offense_group") == "violent", 1)).alias("violent_crimes"),
        F.count(F.when(F.col("shift") == "day", 1)).alias("day_shift"),
        F.count(F.when(F.col("shift") == "evening", 1)).alias("evening_shift"),
        F.count(F.when(F.col("shift") == "midnight", 1)).alias("midnight_shift"),
        F.count(F.when(F.col("offense_text") == "ASSAULT W/DANGEROUS WEAPON", 1)).alias("assault"),
        F.count(F.when(F.col("offense_text") == "THEFT F/AUTO", 1)).alias("theft_auto"),
        F.count(F.when(F.col("offense_text") == "THEFT/OTHER", 1)).alias("theft_other"),
        F.count(F.when(F.col("offense_text") == "ROBBERY", 1)).alias("robbery"),
        F.count(F.when(F.col("offense_text") == "BURGLARY", 1)).alias("burglary"),
        F.count(F.when(F.col("offense_text") == "MOTOR VEHICLE THEFT", 1)).alias("motor_vehicle_theft"),
        F.count(F.when(F.col("offense_text") == "HOMICIDE", 1)).alias("homicide"),
        F.count(F.when(F.col("offense_text") == "SEX ABUSE", 1)).alias("sex_abuse"),
        F.count(F.when(F.col("method") == "gun", 1)).alias("guns")
    )

crime.show()

+--------------------+------------+---------------+--------------+---------+-------------+--------------+-------+----------+-----------+-------+--------+-------------------+--------+---------+----+
|    new_neighborhood|total_crimes|property_crimes|violent_crimes|day_shift|evening_shift|midnight_shift|assault|theft_auto|theft_other|robbery|burglary|motor_vehicle_theft|homicide|sex_abuse|guns|
+--------------------+------------+---------------+--------------+---------+-------------+--------------+-------+----------+-----------+-------+--------+-------------------+--------+---------+----+
|          Georgetown|        1321|           1208|           113|      513|          504|           304|     30|       211|        849|     76|      40|                108|       1|        6|  67|
|           Anacostia|        1910|           1319|           591|      598|          720|           592|    262|       255|        454|    267|      61|                548|      50|       12| 418|
|        A

Join data in one master dataset.

In [27]:
# join the two dataframes
complete_neigh = airbnb.join(crime, "new_neighborhood")
# join with the neighborhood center coordinates
complete_neigh = complete_neigh.join(neighborhood_center_coordinates, "new_neighborhood")

# Indicators

Since our dashboard needs to be intuitive, we will create some indicators to help the user understand the data.
Also we have many columns in our dataset, so it's a good idea to group them into summary indicators.

In [28]:
complete_neigh.show()

+--------------------+--------------+----------+------+------+-----+---------------+------------+-----------+----------------+------------+--------+--------+------+-----------+----------------+---------+---------+---------+------------+---------------+--------------+---------+-------------+--------------+-------+----------+-----------+-------+--------+-------------------+--------+---------+----+------------------+------------------+
|    new_neighborhood|total_listings|apartments|houses|condos|lofts|entire_home_apt|private_room|shared_room|avg_accommodates|avg_bedrooms|moderate|flexible|strict|avg_reviews|avg_review_score|avg_price|min_price|max_price|total_crimes|property_crimes|violent_crimes|day_shift|evening_shift|midnight_shift|assault|theft_auto|theft_other|robbery|burglary|motor_vehicle_theft|homicide|sex_abuse|guns|          latitude|         longitude|
+--------------------+--------------+----------+------+------+-----+---------------+------------+-----------+----------------+

1. Danger Score: summarizes violent and property crimes.

In [29]:
crime_weights = {
    'violent_crimes': 5,
    'assault': 5,
    'homicide': 7,
    'sex_abuse': 5,
    'theft_auto': 3,
    'theft_other': 2,
    'robbery': 3,
    'burglary': 3,
    'motor_vehicle_theft': 2,
    'guns': 4
}


complete_neigh = complete_neigh.withColumn("Danger Score", col("violent_crimes") * crime_weights['violent_crimes'] +
                                             col("assault") * crime_weights['assault'] +
                                             col("homicide") * crime_weights['homicide'] +
                                             col('sex_abuse') * crime_weights['sex_abuse'] +
                                                col('theft_auto') * crime_weights['theft_auto'] +
                                                col('theft_other') * crime_weights['theft_other'] +
                                                col('robbery') * crime_weights['robbery'] +
                                                col('burglary') * crime_weights['burglary'] +
                                                col('motor_vehicle_theft') * crime_weights['motor_vehicle_theft'] +
                                                col('guns') * crime_weights['guns'])

2. Police Surveillance: shows how well the neighborhood is monitored by the police.

In [30]:
complete_neigh = complete_neigh.withColumn(
    'Police Surveillance',
    col('day_shift') + col('evening_shift') + col('midnight_shift')
)

3. Theft Score: summarizes the thefts in the neighborhood.

In [31]:
crime_weights_2 = {
    'theft_auto': 3,
    'theft_other': 2,
    'robbery': 4,
    'burglary': 5,
    'motor_vehicle_theft': 4,
}

# Calculate Danger Score
complete_neigh = complete_neigh.withColumn('Theft Score', col('theft_auto') * crime_weights_2['theft_auto'] +
                                                col('theft_other') * crime_weights_2['theft_other'] +
                                                col('robbery') * crime_weights_2['robbery'] +
                                                col('burglary') * crime_weights_2['burglary'] +
                                                col('motor_vehicle_theft') * crime_weights_2['motor_vehicle_theft'])

4. Property Crime: summarizes the property crimes in the neighborhood.

In [32]:
complete_neigh = complete_neigh.withColumn(
    'Property Crime Score',
    col('property_crimes') + col('motor_vehicle_theft') + col('theft_auto'))

5. Total Crimes/Total Houses: shows the crime rate in the neighborhood.

In [33]:
complete_neigh = complete_neigh.withColumn(
    'Crime Rate',
    col('total_crimes') / col('total_listings')
)

Scale all indicators from 0 to 1

In [34]:
# Define a user-defined function (UDF) to convert a scalar value to a vector
scalar_to_vector_udf = udf(lambda x: Vectors.dense([x]), VectorUDT())

# Convert scalar columns to vector columns
complete_neigh = complete_neigh.withColumn("danger_score_vector", scalar_to_vector_udf(col("Danger Score")))
complete_neigh = complete_neigh.withColumn("police_surveillance_vector", scalar_to_vector_udf(col("Police Surveillance")))
complete_neigh = complete_neigh.withColumn("theft_score_vector", scalar_to_vector_udf(col("Theft Score")))
complete_neigh = complete_neigh.withColumn("property_crime_vector", scalar_to_vector_udf(col("Property Crime Score")))
complete_neigh = complete_neigh.withColumn("crime_rate_vector", scalar_to_vector_udf(col("Crime Rate")))


scaler = MinMaxScaler(inputCol="danger_score_vector", outputCol="danger_score_final")
scaler_model = scaler.fit(complete_neigh)
complete_neigh = scaler_model.transform(complete_neigh)

scaler = MinMaxScaler(inputCol="police_surveillance_vector", outputCol="police_surveillance_final")
scaler_model = scaler.fit(complete_neigh)
complete_neigh = scaler_model.transform(complete_neigh)

scaler = MinMaxScaler(inputCol="theft_score_vector", outputCol="theft_score_final")
scaler_model = scaler.fit(complete_neigh)
complete_neigh = scaler_model.transform(complete_neigh)

scaler = MinMaxScaler(inputCol="property_crime_vector", outputCol="property_crime_final")
scaler_model = scaler.fit(complete_neigh)
complete_neigh = scaler_model.transform(complete_neigh)

scaler = MinMaxScaler(inputCol="crime_rate_vector", outputCol="crime_rate_final")
scaler_model = scaler.fit(complete_neigh)
complete_neigh = scaler_model.transform(complete_neigh)

# drop the original columns
columns_to_drop = ["Danger Score", "Police Surveillance", "Theft Score", "Property Crime Score", "Crime Rate", "danger_score_vector", "police_surveillance_vector", "theft_score_vector", "property_crime_vector", "crime_rate_vector"]
complete_neigh = complete_neigh.drop(*columns_to_drop)

complete_neigh.show()


+--------------------+--------------+----------+------+------+-----+---------------+------------+-----------+----------------+------------+--------+--------+------+-----------+----------------+---------+---------+---------+------------+---------------+--------------+---------+-------------+--------------+-------+----------+-----------+-------+--------+-------------------+--------+---------+----+------------------+------------------+--------------------+-------------------------+--------------------+--------------------+--------------------+
|    new_neighborhood|total_listings|apartments|houses|condos|lofts|entire_home_apt|private_room|shared_room|avg_accommodates|avg_bedrooms|moderate|flexible|strict|avg_reviews|avg_review_score|avg_price|min_price|max_price|total_crimes|property_crimes|violent_crimes|day_shift|evening_shift|midnight_shift|assault|theft_auto|theft_other|robbery|burglary|motor_vehicle_theft|homicide|sex_abuse|guns|          latitude|         longitude|  danger_score_fi

# Interactive map

In [35]:
map_dc = folium.Map(location=[38.89511, -77.03637], zoom_start=14)

In [36]:
# for each neighborhood, add a marker to the map
for row in complete_neigh.collect():
    # Extracting and rounding the normalized indicators
    police_surveillance_norm = int(round(row["police_surveillance_final"][0] * 100))
    theft_score_norm = int(round(row["theft_score_final"][0] * 100))
    property_crime_norm = int(round(row["property_crime_final"][0] * 100))
    crime_rate_norm = int(round(row["crime_rate_final"][0] * 100))

    
    # Constructing the popup content with a bar graph
    popup_content = f'<div style="width: 200px; height: 200px;">' \
                    f'<b>{row["new_neighborhood"]}</b><br>' \
                    f'Total Listings: {row["total_listings"]}<br>' \
                    f'Average Price: {row["avg_price"]}<br>' \
                    f'Average Review Score: {row["avg_review_score"]}<br><br>' \
                    f'<div style="padding-bottom: 5px;">' \
                    f'Police Surveillance: <b>{police_surveillance_norm}%</b> <div class="indicator-bar" style="width: {police_surveillance_norm}%; background-color: green;"></div></div>' \
                    f'<div style="padding-bottom: 5px;">' \
                    f'Theft Score: <b>{theft_score_norm}%</b> <div class="indicator-bar" style="width: {theft_score_norm}%; background-color: red;"></div></div>' \
                    f'<div style="padding-bottom: 5px;">' \
                    f'Property Crime Score: <b>{property_crime_norm}%</b> <div class="indicator-bar" style="width: {property_crime_norm}%; background-color: red;"></div></div>' \
                    f'<div>' \
                    f'Crime Rate: <b>{crime_rate_norm}%</b> <div class="indicator-bar" style="width: {crime_rate_norm}%; background-color: red;"></div></div>' \
                    f'<style>.indicator-bar {{ height: 10px; }}</style>' \
                    f'</div>'
    
    # Add circle marker for danger score
    danger_score = round(row["danger_score_final"][0], 2)
    danger_circle = folium.Circle(
        location=[row["latitude"], row["longitude"]],
        radius=danger_score * 1000,  
        color='red',
        fill=True,
        fill_color='red',
        fill_opacity=0.5,  
        popup=f'{"Danger Score"}: {danger_score}',
    )
    danger_circle.add_to(map_dc)
    
    folium.Marker([row["latitude"], row["longitude"]], 
                  popup=folium.Popup(popup_content, max_width=600, max_height=600, keep_in_front=True),  
                  tooltip=row["new_neighborhood"],
                  icon=folium.Icon(color='blue', icon='info-sign')
                 ).add_to(map_dc)


In [37]:
# save the map
map_dc.save("dc_map.html")
map_dc