Query 1
---
Να ταξινομημηθούν, σε φθίνουσα σειρά, οι ηλικιακές ομάδες των θυμάτων σε περιστατικά που περιλαμβάνουν οποιαδήποτε μορφή “βαριάς σωματικής βλάβης”. Θεωρείστε τις εξής ηλικιακές ομάδες:
- **Παιδιά:** < 18
- **Νεαροί ενήλικοι:** 18 – 24
- **Ενήλικοι:** 25 – 64
- **Ηλικιωμένοι:** >64
---
Question 1
---
Να υλοποιηθεί το Query 1 χρησιμοποιώντας τα DataFrame και RDD APIs. Να εκτελέσετε και
τις δύο υλοποιήσεις με 4 Spark executors. Υπάρχει διαφορά στην επίδοση μεταξύ των δύο APIs;
Αιτιολογήσετε την απάντησή σας. 
---

In [1]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "4"
    }
}

ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3839,application_1732639283265_3779,pyspark,idle,Link,Link,,
3840,application_1732639283265_3780,pyspark,idle,Link,Link,,
3843,application_1732639283265_3783,pyspark,idle,Link,Link,,
3850,application_1732639283265_3790,pyspark,idle,Link,Link,,
3855,application_1732639283265_3795,pyspark,idle,Link,Link,,
3872,application_1732639283265_3812,pyspark,idle,Link,Link,,
3873,application_1732639283265_3813,pyspark,idle,Link,Link,,
3874,application_1732639283265_3814,pyspark,busy,Link,Link,,


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, when
# To log our application's execution time:
import time

# Load crime data from S3
crime_df = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv", header=True, inferSchema=True)
crime_df2 = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv", header=True, inferSchema=True)
crime_data = crime_df.union(crime_df2)

crime_data = crime_data.filter((col("LON") != 0) & (col("LAT") != 0))

start_time = time.time()
# Filter records with "aggravated assault" in the description
agg_assault = crime_data.filter(col("Crm Cd Desc").contains("AGGRAVATED ASSAULT")) \
    .filter(col("Vict Age") >= 0)

# Categorize ages into groups
age_groups = agg_assault.select(
    when(col("Vict Age") < 18, "Children") \
    .when((col("Vict Age") >= 18) & (col("Vict Age") <= 24), "Young adults") \
    .when((col("Vict Age") >= 25) & (col("Vict Age") <= 64), "Adults") \
    .otherwise("Elderly").alias("Age_Group")
)

# Count and sort age groups
age_group_counts = age_groups.groupBy("Age_Group").count() \
                              .orderBy(col("count").desc())

# Show results
age_group_counts.show()

# Stop timing and print out the execution duration
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken for Dataframe API: {elapsed_time:.2f} seconds")

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3875,application_1732639283265_3815,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+------------+------+
|   Age_Group| count|
+------------+------+
|      Adults|121052|
|Young adults| 33588|
|    Children| 15918|
|     Elderly|  5985|
+------------+------+

Time taken for Dataframe API: 4.43 seconds

In [3]:
start_time = time.time()

# Convert to RDD and perform operations
rdd = crime_data.rdd.map(lambda row: (row["Crm Cd Desc"], row["Vict Age"])) \
                    .filter(lambda x: "AGGRAVATED ASSAULT" in x[0]) \
                    .filter(lambda x: x[1] >= 0) \
                    .map(lambda x: ("Children" if x[1] < 18 else
                                    "Young adults" if 18 <= x[1] <= 24 else
                                    "Adults" if 25 <= x[1] <= 64 else
                                    "Elderly")) \
                    .countByValue()

# Sort and print
for group, count in sorted(rdd.items(), key=lambda x: -x[1]):
    print(f"{group}: {count}")

# Stop timing and print out the execution duration
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken for RDD API: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Adults: 121052
Young adults: 33588
Children: 15918
Elderly: 5985
Time taken for RDD API: 15.38 seconds

Query 2
---

Να βρεθούν, για κάθε έτος, τα 3 Αστυνομικά Τμήματα με το υψηλότερο ποσοστό κλεισμένων (περατωμένων) υποθέσεων. Να τυπωθούν το έτος, τα ονόματα (τοποθεσίες) των τμημάτων, τα ποσοστά τους καθώς και οι αριθμοί του ranking τους στην ετήσια κατάταξη. Τα αποτελέσματα να δοθούν σε σειρά αύξουσα ως προς το έτος και το ranking (δείτε παράδειγμα στον Πίνακα 2).

---
Question 2
---
- Να υλοποιηθεί το Query 2 χρησιμοποιώντας τα DataFrame και SQL APIs. Να αναφέρετε και
να συγκρίνετε τους χρόνους εκτέλεσης μεταξύ των δύο υλοποιήσεων.
- Να γράψετε κώδικα Spark που μετατρέπει το κυρίως data set σε parquet2 file format και
αποθηκεύει ένα μοναδικό .parquet αρχείο στο S3 bucket της ομάδας σας. Επιλέξτε μία από
τις δύο υλοποιήσεις του υποερωτήματος α) (DataFrame ή SQL) και συγκρίνετε τους χρόνους
εκτέλεσης της εφαρμογής σας όταν τα δεδομένα εισάγονται σαν .csv και σαν .parquet.
---

In [4]:
from pyspark.sql.window import Window
from pyspark.sql.functions import rank, col, year, to_date
import pyspark.sql.functions as f
# Correct the column name "AREA " to "AREA" if necessary
crime_data = crime_data.withColumn("YEAR", year(to_date(col("DATE OCC"), "MM/dd/yyyy hh:mm:ss a")))

start_time = time.time()
dept_closed = crime_data.select("YEAR", "AREA NAME", "Status") \
                   .groupBy("YEAR", "AREA NAME") \
                   .agg(
                        f.sum(f.when((col("Status") != "IC") & (col("Status") != "UNK"), 1)).alias("closed_cases"),
                        f.count("*").alias("total_cases")
                    ) \
                    .withColumn(
                        "closed_case_rate",
                        (col("closed_cases") / col("total_cases")) * 100
                    )

windowSpec = Window.partitionBy("YEAR").orderBy(col("closed_case_rate").desc())

result = dept_closed.select("*", rank().over(windowSpec).alias("rank")) \
                    .filter(col("rank") <= 3) \
                    .orderBy(col("YEAR").asc(), col("rank").asc())

result.show()

# Stop timing and print out the execution duration
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken for Dataframe API: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+------------+-----------+------------------+----+
|YEAR|  AREA NAME|closed_cases|total_cases|  closed_case_rate|rank|
+----+-----------+------------+-----------+------------------+----+
|2010|    Rampart|        2860|       8706|    32.85090742017|   1|
|2010|    Olympic|        2762|       8764|31.515289821999087|   2|
|2010|     Harbor|        2818|       9598| 29.36028339237341|   3|
|2011|    Olympic|        2798|       7987| 35.03192688118192|   1|
|2011|    Rampart|        2744|       8443|32.500296103280824|   2|
|2011|     Harbor|        2806|       9840|28.516260162601625|   3|
|2012|    Olympic|        2923|       8523|34.295435879385195|   1|
|2012|    Rampart|        2791|       8598|32.461037450569904|   2|
|2012|     Harbor|        2781|       9416|29.534834324553948|   3|
|2013|    Olympic|        2789|       8305| 33.58217940999398|   1|
|2013|    Rampart|        2616|       8148|  32.1060382916053|   2|
|2013|     Harbor|        2504|       8429| 29.7

In [5]:
# Register temp views
crime_data.createOrReplaceTempView("crime")

start_time = time.time()
# SQL Query
sql_result = spark.sql("""
WITH crime_data_year AS (
    SELECT
        `YEAR`,
        `AREA NAME`,
        `Status`
    FROM crime
),
dept_closed AS (
    SELECT
        `YEAR`,
        `AREA NAME`,
        SUM(CASE WHEN `Status` NOT IN ('IC', 'UNK') THEN 1 ELSE 0 END) AS closed_cases,
        COUNT(*) AS total_cases,
        (SUM(CASE WHEN `Status` NOT IN ('IC', 'UNK') THEN 1 ELSE 0 END) * 100.0 / COUNT(*)) AS closed_case_rate
    FROM crime_data_year
    GROUP BY `YEAR`, `AREA NAME`
),
ranked_data AS (
    SELECT
        `YEAR`,
        `AREA NAME`,
        closed_cases,
        total_cases,
        closed_case_rate,
        RANK() OVER (PARTITION BY `YEAR` ORDER BY closed_case_rate DESC) AS rank
    FROM dept_closed
)
SELECT
    `YEAR`,
    `AREA NAME`,
    closed_cases,
    total_cases,
    closed_case_rate,
    rank
FROM ranked_data
WHERE rank <= 3
ORDER BY `YEAR` ASC, rank ASC;
""")

sql_result.show()

# Stop timing and print out the execution duration
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken for SQL API: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+------------+-----------+-----------------+----+
|YEAR|  AREA NAME|closed_cases|total_cases| closed_case_rate|rank|
+----+-----------+------------+-----------+-----------------+----+
|2010|    Rampart|        2860|       8706|32.85090742017000|   1|
|2010|    Olympic|        2762|       8764|31.51528982199909|   2|
|2010|     Harbor|        2818|       9598|29.36028339237341|   3|
|2011|    Olympic|        2798|       7987|35.03192688118192|   1|
|2011|    Rampart|        2744|       8443|32.50029610328082|   2|
|2011|     Harbor|        2806|       9840|28.51626016260163|   3|
|2012|    Olympic|        2923|       8523|34.29543587938519|   1|
|2012|    Rampart|        2791|       8598|32.46103745056990|   2|
|2012|     Harbor|        2781|       9416|29.53483432455395|   3|
|2013|    Olympic|        2789|       8305|33.58217940999398|   1|
|2013|    Rampart|        2616|       8148|32.10603829160530|   2|
|2013|     Harbor|        2504|       8429|29.70696405267529| 

In [6]:
# Write results to S3 -> 
#    1. create the output directory in your S3 bucket
#    2. change your group number below 
#    3. and uncomment
group_number = "14"
s3_path = "s3://groups-bucket-dblab-905418150721/group"+group_number+"/crime_data/"
crime_data.write.mode("overwrite").parquet(s3_path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [7]:
crime_data_parquet = spark.read.parquet(s3_path)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [8]:
start_time = time.time()

dept_closed_parquet = crime_data_parquet.select("YEAR", "AREA NAME", "Status") \
                   .groupBy("YEAR", "AREA NAME") \
                   .agg(
                        f.sum(f.when((col("Status") != "IC") & (col("Status") != "UNK"), 1)).alias("closed_cases"),
                        f.count("*").alias("total_cases")
                    ) \
                    .withColumn(
                        "closed_case_rate",
                        (col("closed_cases") / col("total_cases")) * 100
                    )

windowSpec = Window.partitionBy("YEAR").orderBy(col("closed_case_rate").desc())

result_parquet = dept_closed_parquet.select("*", rank().over(windowSpec).alias("rank")) \
                    .filter(col("rank") <= 3) \
                    .orderBy(col("YEAR").asc(), col("rank").asc())

result_parquet.show()

# Stop timing and print out the execution duration
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken for Dataframe API using Parquet: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+------------+-----------+------------------+----+
|YEAR|  AREA NAME|closed_cases|total_cases|  closed_case_rate|rank|
+----+-----------+------------+-----------+------------------+----+
|2010|    Rampart|        2860|       8706|    32.85090742017|   1|
|2010|    Olympic|        2762|       8764|31.515289821999087|   2|
|2010|     Harbor|        2818|       9598| 29.36028339237341|   3|
|2011|    Olympic|        2798|       7987| 35.03192688118192|   1|
|2011|    Rampart|        2744|       8443|32.500296103280824|   2|
|2011|     Harbor|        2806|       9840|28.516260162601625|   3|
|2012|    Olympic|        2923|       8523|34.295435879385195|   1|
|2012|    Rampart|        2791|       8598|32.461037450569904|   2|
|2012|     Harbor|        2781|       9416|29.534834324553948|   3|
|2013|    Olympic|        2789|       8305| 33.58217940999398|   1|
|2013|    Rampart|        2616|       8148|  32.1060382916053|   2|
|2013|     Harbor|        2504|       8429| 29.7

Query 3
---

Χρησιμοποιώντας ως αναφορά τα δεδομένα της απογραφής 2010 για τον πληθυσμό και εκείνα της απογραφής του 2015 για το εισόδημα ανα νοικοκυριό, να υπολογίσετε για κάθε περιοχή του Los Angeles τα παρακάτω:
- Το μέσο ετήσιο εισόδημα ανά άτομο
- Την αναλογία συνολικού αριθμού εγκλημάτων ανά άτομο.
Τα αποτελέσματα να συγκεντρωθούν σε ένα πίνακα.

---
Question 3
---

Να υλοποιηθεί το Query 3 χρησιμοποιώντας DataFrame ή SQL API. Χρησιμοποιήστε τις μεθόδους hint & explain για να βρείτε ποιες στρατηγικές join χρησιμοποιεί ο catalyst optimizer.
Πειραματιστείτε αναγκάζοντας το Spark να χρησιμοποιήσει διαφορετικές στρατηγικές (μεταξύ των BROADCAST, MERGE, SHUFFLE_HASH, SHUFFLE_REPLICATE_NL) και σχολιάστε τα αποτελέσματα
που παρατηρείτε. Ποιά (ή ποιές) από τις διαθέσιμες στρατηγικές join του Spark είναι καταλληλότερη(ες) και γιατί;

---

In [23]:
from sedona.spark import *
from pyspark.sql.functions import col, regexp_replace, trim, format_number, round
import pyspark.sql.functions as f
from pyspark.sql import SparkSession

# Create sedona context
sedona = SedonaContext.create(spark)
crime_data = crime_data.withColumn("geom", ST_Point("LON", "LAT"))

start_time = time.time()

# Read the file from s3

geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")

# Formatting magic
flattened_df = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type") \
            .filter(col("CITY") == "Los Angeles")

LA_areas = flattened_df.select("ZCTA10","COMM","POP_2010","HOUSING10","geometry") \
                .filter(((col("POP_2010")>0) & (col("HOUSING10")>0)) & (trim(col("COMM"))!="")) \
                .groupBy(["ZCTA10","COMM"]) \
                .agg(
                    ST_Union_Aggr("geometry").alias("geometry"),
                    f.sum("HOUSING10").alias("Households"),
                    f.sum("POP_2010").alias("Population")
                )

income = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True)
income = income.withColumn("Estimated Median Income", regexp_replace(col("Estimated Median Income"), r"[\$,]", "").cast("int")) \
    .select(
        col("Zip Code").alias("ZIP"),
        col("Estimated Median Income").alias("Median_Income")
    )

joined_income = LA_areas.join(income, LA_areas["ZCTA10"] == income["ZIP"], "inner") \
    .drop("ZCTA10", "ZIP") \
    .withColumn("Total_Income", col("Households") * col("Median_Income")) \
    .drop("Median_Income") \
    .groupBy("COMM") \
    .agg(
        f.sum("Population").alias("Population"),
        f.sum("Total_Income").alias("Total_Income"),
        ST_Union_Aggr("geometry").alias("geometry")
)

population_data = joined_income.join(crime_data, ST_Within(crime_data.geom, joined_income.geometry), "inner") \
    .groupBy("COMM","Population","Total_Income") \
    .agg(
        f.count(col("*")).alias("Total Crimes")
    ) \
    .withColumn("Crime Rate",format_number((col("Total Crimes")/col("Population")),4)) \
    .withColumn("Estimated_Median_Income", (col("Total_Income") / col("Population"))) \
    .drop("Total_Income","Population","Total crimes") \
    .orderBy(col("Estimated_Median_Income").desc())

population_data.show()
# Stop timing and print out the execution duration
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken for Dataframe API: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-------------------+----------+-----------------------+
|               COMM|Crime Rate|Estimated_Median_Income|
+-------------------+----------+-----------------------+
|  Pacific Palisades|    0.3797|      70673.55587323125|
|Palisades Highlands|    0.1878|      66867.43986433603|
|   Marina Peninsula|    0.6000|      65235.69402813004|
|            Bel Air|    0.3992|      63041.33809466166|
|      Beverly Crest|    0.3690|      60947.49019768682|
|          Brentwood|    0.4059|      60846.85469997952|
|  Mandeville Canyon|    0.2611|      55572.10949582431|
|        Playa Vista|    0.5004|      50264.47187990141|
|            Carthay|    0.7629|     49848.737445871004|
|             Venice|    1.0404|     47625.344329326406|
|       Century City|    0.6330|      45617.76047098402|
|      Playa Del Rey|    0.7426|        45522.596580114|
|        Studio City|    0.7834|     44638.477679882526|
|    Hollywood Hills|    0.7476|      43023.69992828971|
|      South Carthay|    0.7232

In [24]:
population_data.explain(mode="formatted")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

== Physical Plan ==
AdaptiveSparkPlan (33)
+- Sort (32)
   +- Exchange (31)
      +- HashAggregate (30)
         +- Exchange (29)
            +- HashAggregate (28)
               +- Project (27)
                  +- RangeJoin (26)
                     :- Filter (18)
                     :  +- ObjectHashAggregate (17)
                     :     +- Exchange (16)
                     :        +- ObjectHashAggregate (15)
                     :           +- Project (14)
                     :              +- BroadcastHashJoin Inner BuildRight (13)
                     :                 :- ObjectHashAggregate (8)
                     :                 :  +- Exchange (7)
                     :                 :     +- ObjectHashAggregate (6)
                     :                 :        +- Project (5)
                     :                 :           +- Filter (4)
                     :                 :              +- Generate (3)
                     :                 :                 

In [25]:
benchmark = []

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [43]:
from pyspark.sql import Row
import time

strategy_1 = "SHUFFLE_HASH"
strategy_2 = "SHUFFLE_HASH"
# We apply the hint to set the join strategy used
# the code executes the same query as presented above
spark.catalog.clearCache()
start_time = time.time()

joined_income = LA_areas.hint(strategy_1).join(income.hint(strategy_1), LA_areas["ZCTA10"] == income["ZIP"], "inner") \
    .drop("ZCTA10", "ZIP") \
    .withColumn("Total_Income", col("Households") * col("Median_Income")) \
    .drop("Median_Income") \
    .groupBy("COMM") \
    .agg(
        f.sum("Population").alias("Population"),
        f.sum("Total_Income").alias("Total_Income"),
        ST_Union_Aggr("geometry").alias("geometry")
)

population_data = joined_income.hint(strategy_2).join(crime_data.hint(strategy_2), ST_Within(crime_data.geom, joined_income.geometry), "inner") \
    .groupBy("COMM","Population","Total_Income") \
    .agg(
        f.count(col("*")).alias("Total Crimes")
    ) \
    .withColumn("Crime Rate",format_number((col("Total Crimes")/col("Population")),4)) \
    .withColumn("Estimated_Median_Income", (col("Total_Income") / col("Population"))) \
    .drop("Total_Income","Population","Total crimes") \
    .orderBy(col("Estimated_Median_Income").desc())

population_data.collect()

# Stop timing and print out the execution duration
end_time = time.time()
elapsed_time = end_time - start_time
benchmark.append((strategy_1, strategy_2, elapsed_time))

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [44]:
for res in benchmark:
    print(res)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

('SHUFFLE_REPLICATE_NL', 'SHUFFLE_REPLICATE_NL', 46.515050411224365)
('SHUFFLE_REPLICATE_NL', 'MERGE', 26.257415533065796)
('SHUFFLE_REPLICATE_NL', 'BROADCAST', 26.699812412261963)
('SHUFFLE_REPLICATE_NL', 'SHUFFLE_HASH', 177.74399709701538)
('MERGE', 'SHUFFLE_REPLICATE_NL', 41.28805613517761)
('MERGE', 'MERGE', 27.015713453292847)
('MERGE', 'BROADCAST', 25.8994722366333)
('MERGE', 'SHUFFLE_HASH', 29.319011926651)
('BROADCAST', 'SHUFFLE_REPLICATE_NL', 31.50220274925232)
('BROADCAST', 'MERGE', 24.24892282485962)
('BROADCAST', 'BROADCAST', 29.62976360321045)
('BROADCAST', 'SHUFFLE_HASH', 30.067917346954346)
('SHUFFLE_HASH', 'SHUFFLE_REPLICATE_NL', 17.198349714279175)
('SHUFFLE_HASH', 'MERGE', 23.190125703811646)
('SHUFFLE_HASH', 'BROADCAST', 20.927560567855835)
('SHUFFLE_HASH', 'SHUFFLE_HASH', 17.760035753250122)

Query 4
---

Να βρεθεί το φυλετικό προφίλ των καταγεγραμμένων θυμάτων εγκλημάτων (Vict Descent) στο Los Angeles για το έτος 2015 στις 3 περιοχές με το υψηλότερο κατά κεφαλήν εισόδημα. Να γίνει το ίδιο για τις 3 περιοχές με το χαμηλότερο εισόδημα. Να χρησιμοποιήσετε την αντιστοίχιση των κωδικών
καταγωγής με την πλήρη περιγραφή από το σύνολο δεδομενων Race and Ethnicity codes. Τα αποτελέσματα να τυπωθούν σε δύο ξεχωριστούς πίνακες από το υψηλότερο στο χαμηλότερο αριθμό θυμάτων ανά φυλετικό γκρουπ (δείτε παράδειγμα αποτελέσματος στον Πίνακα 3).

---
Question 4
---

Να υλοποιηθεί το Query 4 χρησιμοποιώντας το DataFrame ή SQL API. Να εκτελέσετε την υλοποίησή σας εφαρμόζοντας κλιμάκωση στο σύνολο των υπολογιστικών πόρων που θα χρησιμοποιήσετε: Συγκεκριμένα, καλείστε να εκτελέστε την υλοποίησή σας σε 2 executors με τα ακόλουθα configurations:
- 1 core/2 GB memory
- 2 cores/4GB memory
- 4 cores/8GB memory

---

In [59]:
from pyspark.sql.functions import substring

spark = SparkSession \
    .builder \
    .appName("Question 4") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

start_time = time.time()

race_codes = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv", header=True, inferSchema=True)
# Create sedona context
sedona = SedonaContext.create(spark)
# Read the file from s3

geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")

# Formatting magic
flattened_df = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type") \
            .filter(col("CITY") == "Los Angeles")

LA_areas = flattened_df.select("ZCTA10","COMM","POP_2010","HOUSING10","geometry") \
                .filter(((col("POP_2010")>0) & (col("HOUSING10")>0)) & (trim(col("COMM"))!="")) \
                .groupBy(["ZCTA10","COMM"]) \
                .agg(
                    ST_Union_Aggr("geometry").alias("geometry"),
                    f.sum("HOUSING10").alias("Households"),
                    f.sum("POP_2010").alias("Population")
                )

income = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True)
income = income.withColumn("Estimated Median Income", regexp_replace(col("Estimated Median Income"), r"[\$,]", "").cast("int")) \
    .select(
        col("Zip Code").alias("ZIP"),
        col("Estimated Median Income").alias("Median_Income")
    )

joined_income = LA_areas.join(income, LA_areas["ZCTA10"] == income["ZIP"], "inner") \
    .drop("ZCTA10", "ZIP") \
    .withColumn("Total_Income", col("Households") * col("Median_Income")) \
    .drop("Median_Income") \
    .groupBy("COMM") \
    .agg(
        f.sum("Population").alias("Population"),
        f.sum("Total_Income").alias("Total_Income"),
        ST_Union_Aggr("geometry").alias("geometry")
    ) \
    .withColumn("Income_Per_Capita", (col("Total_Income") / col("Population"))) \
    .select("COMM", "Income_Per_Capita", "geometry")

first_3 = joined_income.orderBy(col("Income_Per_Capita").desc()).limit(3)

last_3 = joined_income.orderBy(col("Income_Per_Capita").asc()).limit(3)

crime_data_2015 = crime_data.filter((substring(col("DATE OCC"), 7, 4) == "2015") & (col("Vict Descent") != "")) \
    .select("DATE OCC", "geom", "Vict Descent")

result_top_3 = first_3.join(crime_data_2015, ST_Within(crime_data_2015.geom, first_3.geometry), "inner") \
    .groupBy("Vict Descent") \
    .agg(
        f.count("*").alias("#")
    )

final_res_top_3 = result_top_3.join(race_codes, result_top_3["`Vict Descent`"] == race_codes["`Vict Descent`"], "inner") \
    .select(race_codes["`Vict Descent Full`"].alias("Victim Descent"), "#") \
    .orderBy(col("#").desc())

final_res_top_3.show()


result_last_3 = last_3.join(crime_data_2015, ST_Within(crime_data_2015.geom, last_3.geometry), "inner") \
    .groupBy("Vict Descent") \
    .agg(
        f.count("*").alias("#")
    )

final_res_last_3 = result_last_3.join(race_codes, result_last_3["`Vict Descent`"] == race_codes["`Vict Descent`"], "inner") \
    .select(race_codes["`Vict Descent Full`"].alias("Victim Descent"), "#") \
    .orderBy(col("#").desc())

final_res_last_3.show()

# Stop timing and print out the execution duration
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken for Dataframe API using 1 core/2 GB memory: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---+
|      Victim Descent|  #|
+--------------------+---+
|               White|649|
|               Other| 72|
|Hispanic/Latin/Me...| 66|
|             Unknown| 38|
|               Black| 37|
|         Other Asian| 21|
|American Indian/A...|  1|
|             Chinese|  1|
+--------------------+---+

+--------------------+----+
|      Victim Descent|   #|
+--------------------+----+
|Hispanic/Latin/Me...|2815|
|               Black| 761|
|               White| 330|
|               Other| 187|
|         Other Asian| 113|
|             Unknown|  22|
|American Indian/A...|  21|
|              Korean|   5|
|             Chinese|   3|
|         AsianIndian|   1|
|            Filipino|   1|
+--------------------+----+

Time taken for Dataframe API using 1 core/2 GB memory: 46.36 seconds

In [60]:
spark = SparkSession \
    .builder \
    .appName("Question 4") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

start_time = time.time()

race_codes = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv", header=True, inferSchema=True)
# Create sedona context
sedona = SedonaContext.create(spark)
# Read the file from s3

geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")

# Formatting magic
flattened_df = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type") \
            .filter(col("CITY") == "Los Angeles")

LA_areas = flattened_df.select("ZCTA10","COMM","POP_2010","HOUSING10","geometry") \
                .filter(((col("POP_2010")>0) & (col("HOUSING10")>0)) & (trim(col("COMM"))!="")) \
                .groupBy(["ZCTA10","COMM"]) \
                .agg(
                    ST_Union_Aggr("geometry").alias("geometry"),
                    f.sum("HOUSING10").alias("Households"),
                    f.sum("POP_2010").alias("Population")
                )

income = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True)
income = income.withColumn("Estimated Median Income", regexp_replace(col("Estimated Median Income"), r"[\$,]", "").cast("int")) \
    .select(
        col("Zip Code").alias("ZIP"),
        col("Estimated Median Income").alias("Median_Income")
    )

joined_income = LA_areas.join(income, LA_areas["ZCTA10"] == income["ZIP"], "inner") \
    .drop("ZCTA10", "ZIP") \
    .withColumn("Total_Income", col("Households") * col("Median_Income")) \
    .drop("Median_Income") \
    .groupBy("COMM") \
    .agg(
        f.sum("Population").alias("Population"),
        f.sum("Total_Income").alias("Total_Income"),
        ST_Union_Aggr("geometry").alias("geometry")
    ) \
    .withColumn("Income_Per_Capita", (col("Total_Income") / col("Population"))) \
    .select("COMM", "Income_Per_Capita", "geometry")

first_3 = joined_income.orderBy(col("Income_Per_Capita").desc()).limit(3)

last_3 = joined_income.orderBy(col("Income_Per_Capita").asc()).limit(3)

crime_data_2015 = crime_data.filter((substring(col("DATE OCC"), 7, 4) == "2015") & (col("Vict Descent") != "")) \
    .select("DATE OCC", "geom", "Vict Descent")

result_top_3 = first_3.join(crime_data_2015, ST_Within(crime_data_2015.geom, first_3.geometry), "inner") \
    .groupBy("Vict Descent") \
    .agg(
        f.count("*").alias("#")
    )

final_res_top_3 = result_top_3.join(race_codes, result_top_3["`Vict Descent`"] == race_codes["`Vict Descent`"], "inner") \
    .select(race_codes["`Vict Descent Full`"].alias("Victim Descent"), "#") \
    .orderBy(col("#").desc())

final_res_top_3.show()


result_last_3 = last_3.join(crime_data_2015, ST_Within(crime_data_2015.geom, last_3.geometry), "inner") \
    .groupBy("Vict Descent") \
    .agg(
        f.count("*").alias("#")
    )

final_res_last_3 = result_last_3.join(race_codes, result_last_3["`Vict Descent`"] == race_codes["`Vict Descent`"], "inner") \
    .select(race_codes["`Vict Descent Full`"].alias("Victim Descent"), "#") \
    .orderBy(col("#").desc())

final_res_last_3.show()

# Stop timing and print out the execution duration
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken for Dataframe API using 2 core/4 GB memory: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---+
|      Victim Descent|  #|
+--------------------+---+
|               White|649|
|               Other| 72|
|Hispanic/Latin/Me...| 66|
|             Unknown| 38|
|               Black| 37|
|         Other Asian| 21|
|             Chinese|  1|
|American Indian/A...|  1|
+--------------------+---+

+--------------------+----+
|      Victim Descent|   #|
+--------------------+----+
|Hispanic/Latin/Me...|2815|
|               Black| 761|
|               White| 330|
|               Other| 187|
|         Other Asian| 113|
|             Unknown|  22|
|American Indian/A...|  21|
|              Korean|   5|
|             Chinese|   3|
|         AsianIndian|   1|
|            Filipino|   1|
+--------------------+----+

Time taken for Dataframe API using 2 core/4 GB memory: 47.82 seconds

In [61]:
spark = SparkSession \
    .builder \
    .appName("Question 4") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

race_codes = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv", header=True, inferSchema=True)
# Create sedona context
sedona = SedonaContext.create(spark)
# Read the file from s3

geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")

# Formatting magic
flattened_df = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type") \
            .filter(col("CITY") == "Los Angeles")

LA_areas = flattened_df.select("ZCTA10","COMM","POP_2010","HOUSING10","geometry") \
                .filter(((col("POP_2010")>0) & (col("HOUSING10")>0)) & (trim(col("COMM"))!="")) \
                .groupBy(["ZCTA10","COMM"]) \
                .agg(
                    ST_Union_Aggr("geometry").alias("geometry"),
                    f.sum("HOUSING10").alias("Households"),
                    f.sum("POP_2010").alias("Population")
                )

income = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv", header=True)
income = income.withColumn("Estimated Median Income", regexp_replace(col("Estimated Median Income"), r"[\$,]", "").cast("int")) \
    .select(
        col("Zip Code").alias("ZIP"),
        col("Estimated Median Income").alias("Median_Income")
    )

joined_income = LA_areas.join(income, LA_areas["ZCTA10"] == income["ZIP"], "inner") \
    .drop("ZCTA10", "ZIP") \
    .withColumn("Total_Income", col("Households") * col("Median_Income")) \
    .drop("Median_Income") \
    .groupBy("COMM") \
    .agg(
        f.sum("Population").alias("Population"),
        f.sum("Total_Income").alias("Total_Income"),
        ST_Union_Aggr("geometry").alias("geometry")
    ) \
    .withColumn("Income_Per_Capita", (col("Total_Income") / col("Population"))) \
    .select("COMM", "Income_Per_Capita", "geometry")

first_3 = joined_income.orderBy(col("Income_Per_Capita").desc()).limit(3)

last_3 = joined_income.orderBy(col("Income_Per_Capita").asc()).limit(3)

crime_data_2015 = crime_data.filter((substring(col("DATE OCC"), 7, 4) == "2015") & (col("Vict Descent") != "")) \
    .select("DATE OCC", "geom", "Vict Descent")

result_top_3 = first_3.join(crime_data_2015, ST_Within(crime_data_2015.geom, first_3.geometry), "inner") \
    .groupBy("Vict Descent") \
    .agg(
        f.count("*").alias("#")
    )

final_res_top_3 = result_top_3.join(race_codes, result_top_3["`Vict Descent`"] == race_codes["`Vict Descent`"], "inner") \
    .select(race_codes["`Vict Descent Full`"].alias("Victim Descent"), "#") \
    .orderBy(col("#").desc())

final_res_top_3.show()


result_last_3 = last_3.join(crime_data_2015, ST_Within(crime_data_2015.geom, last_3.geometry), "inner") \
    .groupBy("Vict Descent") \
    .agg(
        f.count("*").alias("#")
    )

final_res_last_3 = result_last_3.join(race_codes, result_last_3["`Vict Descent`"] == race_codes["`Vict Descent`"], "inner") \
    .select(race_codes["`Vict Descent Full`"].alias("Victim Descent"), "#") \
    .orderBy(col("#").desc())

final_res_last_3.show()

# Stop timing and print out the execution duration
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken for Dataframe API using 4 core/8 GB memory: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---+
|      Victim Descent|  #|
+--------------------+---+
|               White|649|
|               Other| 72|
|Hispanic/Latin/Me...| 66|
|             Unknown| 38|
|               Black| 37|
|         Other Asian| 21|
|American Indian/A...|  1|
|             Chinese|  1|
+--------------------+---+

+--------------------+----+
|      Victim Descent|   #|
+--------------------+----+
|Hispanic/Latin/Me...|2815|
|               Black| 761|
|               White| 330|
|               Other| 187|
|         Other Asian| 113|
|             Unknown|  22|
|American Indian/A...|  21|
|              Korean|   5|
|             Chinese|   3|
|         AsianIndian|   1|
|            Filipino|   1|
+--------------------+----+

Time taken for Dataframe API using 4 core/8 GB memory: 88.80 seconds

Query 5
---

Nα υπολογιστεί, ανά αστυνομικό τμήμα, ο αριθμός εγκλημάτων που έλαβαν χώρα πλησιέστερα σε αυτό, καθώς και η μέση απόστασή του από τις τοποθεσίες όπου σημειώθηκαν τα συγκεκριμένα περιστατικά. Τα αποτελέσματα να εμφανιστούν ταξινομημένα κατά αριθμό περιστατικών, με φθίνουσα σειρά (δείτε παράδειγμα στον Πίνακα 4).

---
Question 5
---

Να υλοποιηθεί το Query 4 χρησιμοποιώντας το DataFrame ή SQL API. Να εκτελέσετε την υλοποίησή σας εφαρμόζοντας κλιμάκωση στο σύνολο των υπολογιστικών πόρων που θα χρησιμοποιήσετε: Συγκεκριμένα, καλείστε να εκτελέστε την υλοποίησή σας σε 2 executors με τα ακόλουθα configurations:
- 2 exec/4 core/8 GB memory
- 4 exec/2 cores/4GB memory
- 8 exec/1 cores/2GB memory

---

In [62]:
spark = SparkSession \
    .builder \
    .appName("Question 4") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

start_time = time.time()

police_stations = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv", header=True, inferSchema=True) \
    .withColumnRenamed("X", "LON") \
    .withColumnRenamed("Y", "LAT") \
    .withColumn("station_geom", ST_Point("LON", "LAT"))

# Cross-join police stations and crime data, calculate distances
distance_df = crime_data.crossJoin(police_stations) \
    .withColumn("distance", f.expr("ST_DistanceSphere(station_geom, geom) / 1000"))

# Find the nearest station for each crime
closest_station_df = distance_df.groupBy("DR_NO", "geom") \
    .agg(f.expr("MIN(distance)").alias("min_distance"))



# Join back with the full distance DataFrame to get station details
joined_df = distance_df.join(closest_station_df, 
                             (distance_df["DR_NO"] == closest_station_df["DR_NO"]) & 
                             (distance_df["distance"] == closest_station_df["min_distance"]),
                             "inner")

# Aggregate results for each police station
result_df = joined_df.groupBy("DIVISION") \
    .agg(
        f.avg("distance").alias("avg_distance"),
        f.count("*").alias("#"),
    ) \
    .orderBy(col("#").desc()) \
    .withColumnRenamed("DIVISION", "division")

result_df.show()
# Stop timing and print out the execution duration
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken for Dataframe API using 2exec/4 core/8 GB memory: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+------------------+------+
|        division|      avg_distance|     #|
+----------------+------------------+------+
|       HOLLYWOOD|2.0762639601787227|224340|
|        VAN NUYS| 2.953369742819784|210134|
|       SOUTHWEST| 2.191398805780885|188901|
|        WILSHIRE|2.5926655329787778|185996|
|     77TH STREET|1.7165449719700996|171827|
|         OLYMPIC|1.7236036971780924|170897|
| NORTH HOLLYWOOD|2.6430060941415667|167854|
|         PACIFIC|3.8500706553079005|161359|
|         CENTRAL|0.9924764374568925|153871|
|         RAMPART|1.5345341879190053|152736|
|       SOUTHEAST|2.4218662158881825|152176|
|     WEST VALLEY| 3.035671216314081|138643|
|         TOPANGA|3.2969548417555536|138217|
|        FOOTHILL| 4.250921708424994|134896|
|          HARBOR|3.7025615993565055|126747|
|      HOLLENBECK| 2.680181237706821|115837|
|WEST LOS ANGELES|2.7924572890341115|115781|
|          NEWTON|1.6346357397097444|111110|
|       NORTHEAST| 3.623665524604075|108109|
|         

In [63]:
spark = SparkSession \
    .builder \
    .appName("Question 4") \
    .config("spark.executor.instances", "4") \
    .config("spark.executor.cores", "2") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

start_time = time.time()

police_stations = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv", header=True, inferSchema=True) \
    .withColumnRenamed("X", "LON") \
    .withColumnRenamed("Y", "LAT") \
    .withColumn("station_geom", ST_Point("LON", "LAT"))

# Cross-join police stations and crime data, calculate distances
distance_df = crime_data.crossJoin(police_stations) \
    .withColumn("distance", f.expr("ST_DistanceSphere(station_geom, geom) / 1000"))

# Find the nearest station for each crime
closest_station_df = distance_df.groupBy("DR_NO", "geom") \
    .agg(f.expr("MIN(distance)").alias("min_distance"))



# Join back with the full distance DataFrame to get station details
joined_df = distance_df.join(closest_station_df, 
                             (distance_df["DR_NO"] == closest_station_df["DR_NO"]) & 
                             (distance_df["distance"] == closest_station_df["min_distance"]),
                             "inner")

# Aggregate results for each police station
result_df = joined_df.groupBy("DIVISION") \
    .agg(
        f.avg("distance").alias("avg_distance"),
        f.count("*").alias("#"),
    ) \
    .orderBy(col("#").desc()) \
    .withColumnRenamed("DIVISION", "division")

result_df.show()
# Stop timing and print out the execution duration
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken for Dataframe API using 4exec/2 core/4 GB memory: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+------------------+------+
|        division|      avg_distance|     #|
+----------------+------------------+------+
|       HOLLYWOOD|2.0762639601787214|224340|
|        VAN NUYS|2.9533697428197825|210134|
|       SOUTHWEST|2.1913988057808855|188901|
|        WILSHIRE| 2.592665532978778|185996|
|     77TH STREET|1.7165449719701007|171827|
|         OLYMPIC|1.7236036971780937|170897|
| NORTH HOLLYWOOD| 2.643006094141568|167854|
|         PACIFIC|3.8500706553079014|161359|
|         CENTRAL|0.9924764374568922|153871|
|         RAMPART|1.5345341879190055|152736|
|       SOUTHEAST| 2.421866215888182|152176|
|     WEST VALLEY|3.0356712163140815|138643|
|         TOPANGA|3.2969548417555545|138217|
|        FOOTHILL|4.2509217084249915|134896|
|          HARBOR|3.7025615993565055|126747|
|      HOLLENBECK| 2.680181237706821|115837|
|WEST LOS ANGELES|2.7924572890341137|115781|
|          NEWTON| 1.634635739709745|111110|
|       NORTHEAST|3.6236655246040743|108109|
|         

In [64]:
spark = SparkSession \
    .builder \
    .appName("Question 4") \
    .config("spark.executor.instances", "8") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

start_time = time.time()

police_stations = spark.read.csv("s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv", header=True, inferSchema=True) \
    .withColumnRenamed("X", "LON") \
    .withColumnRenamed("Y", "LAT") \
    .withColumn("station_geom", ST_Point("LON", "LAT"))

# Cross-join police stations and crime data, calculate distances
distance_df = crime_data.crossJoin(police_stations) \
    .withColumn("distance", f.expr("ST_DistanceSphere(station_geom, geom) / 1000"))

# Find the nearest station for each crime
closest_station_df = distance_df.groupBy("DR_NO", "geom") \
    .agg(f.expr("MIN(distance)").alias("min_distance"))



# Join back with the full distance DataFrame to get station details
joined_df = distance_df.join(closest_station_df, 
                             (distance_df["DR_NO"] == closest_station_df["DR_NO"]) & 
                             (distance_df["distance"] == closest_station_df["min_distance"]),
                             "inner")

# Aggregate results for each police station
result_df = joined_df.groupBy("DIVISION") \
    .agg(
        f.avg("distance").alias("avg_distance"),
        f.count("*").alias("#"),
    ) \
    .orderBy(col("#").desc()) \
    .withColumnRenamed("DIVISION", "division")

result_df.show()
# Stop timing and print out the execution duration
end_time = time.time()
elapsed_time = end_time - start_time
print(f"Time taken for Dataframe API using 8exec/1 core/2 GB memory: {elapsed_time:.2f} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----------------+------------------+------+
|        division|      avg_distance|     #|
+----------------+------------------+------+
|       HOLLYWOOD|2.0762639601787236|224340|
|        VAN NUYS| 2.953369742819782|210134|
|       SOUTHWEST|2.1913988057808846|188901|
|        WILSHIRE|2.5926655329787778|185996|
|     77TH STREET|1.7165449719701003|171827|
|         OLYMPIC|1.7236036971780935|170897|
| NORTH HOLLYWOOD|2.6430060941415676|167854|
|         PACIFIC|3.8500706553078983|161359|
|         CENTRAL|0.9924764374568925|153871|
|         RAMPART|1.5345341879190062|152736|
|       SOUTHEAST|2.4218662158881834|152176|
|     WEST VALLEY| 3.035671216314082|138643|
|         TOPANGA|3.2969548417555554|138217|
|        FOOTHILL| 4.250921708424992|134896|
|          HARBOR|3.7025615993565077|126747|
|      HOLLENBECK|2.6801812377068197|115837|
|WEST LOS ANGELES| 2.792457289034112|115781|
|          NEWTON|1.6346357397097442|111110|
|       NORTHEAST|3.6236655246040765|108109|
|         