Output URI
s3://groups-bucket-dblab-905418150721/group23/outputs/query4/

In [13]:
%%configure -f
{
    "conf": {
        "spark.executor.instances": "2",
        "spark.executor.cores": "1",
        "spark.executor.memory": "2g",
        "spark.driver.memory": "2g"
    }
}

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
14,application_1738075734771_0015,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
4,application_1738075734771_0005,pyspark,idle,Link,Link,,
7,application_1738075734771_0008,pyspark,idle,Link,Link,,
10,application_1738075734771_0011,pyspark,idle,Link,Link,,
11,application_1738075734771_0012,pyspark,busy,Link,Link,,
14,application_1738075734771_0015,pyspark,idle,Link,Link,,✔


## Πρώτος τρόπος
Ακολουθούμε την εξής μεθοδολογία:
- Φορτώνουμε το census και φιλτράρουμε ο,τι δεν έχει geometry
- Κρατάμε μόνο τις στήλες που χρειαζόμαστε για το average income per person
- Κάνουμε group by ZCTA10, COMM κρατώντας άθροισμα housing και population
- Ανοίγουμε το αρχείο με το household income per zip code και μετατρέπουμε το income σε double
- Κάνουμε join τον πίνακα με (zcta10,comm) με τον πίνακα Inocme βάσει zip code
- Κάνουμε group by comm για να βρούμε το average income per person per community
- Κάνουμε order by average income είτε descending για τα top είτε ascending για τα bottom και κρατάμε μόνο τα πρώτα 3
Πλέον έχουμε βρει τα communities που μας νοιάζουν και κάνουμε τις ίδιες πράξεις για τα top και bottom οπότε τα αναφέρουμε μαζί:
- Ανοίγουμε το αρχείο με τα εγκλήματα, κάνουμε filter όσα ήταν εκτός του 15 και όσα δεν έχουν γεωμετρία.
- Ενώνουμε τον πίνακα με τα blocks για να βρούμε σε ποιο community είναι κάθε έγκλημα
- Ενώνουμε τον νέο πίνακα με τα αποτελέσματα από το Income για να βρούμε μόνο τα εγκλήματα που μας ενδιαφέρουν.
- Κάνουμε group by Vict Descent και μετράμε rows
- Ανοίγουμε το αρχείο με το race description και το κάνουμε join με τα αποτελέσματά μας για να πάρουμε την περιγραφή του race
- Κάνουμε order by # desc για να τυπώσουμε με φθίνουσα σειρά εγκλημάτων.

In [9]:
from pyspark.sql import SparkSession, functions as F
from sedona.spark import *
from pyspark.sql.functions import col
import time
from pyspark.sql.functions import monotonically_increasing_id, row_number, expr
from pyspark.sql.window import Window
from pyspark.sql.functions import (
    col, substring, rank, count, desc, sum as spark_sum
)



# Create spark Session
spark = SparkSession \
    .builder \
    .appName("QUERY 4") \
    .getOrCreate()

# Create sedona context
sedona = SedonaContext.create(spark)


crime19path = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv"
income_path = "s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv"
census_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
re_codes_path = "s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv"

start_time = time.time()

# load blocks
census = sedona.read.format('geojson').option("multiLine", "true").load(census_path).selectExpr("explode(features) as features").select("features.*")
census = census.select([col(f"properties.{col_name}").alias(col_name) for col_name in census.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")

census = census.filter(col("geometry").isNotNull())



# group by ZCTA10, COMM
# GET POP 2010 and HOUSING 10 per zcta10, comm
blocks = (
    census.filter((col("CITY") == "Los Angeles"))
    .select(
        "ZCTA10",
        "COMM",
        "POP_2010",
        "HOUSING10"
    )
)

print("blocks:")
blocks.show(10)

zip_pop_all = (
    blocks
    .groupBy("ZCTA10", "COMM")
    .agg(
        F.sum("POP_2010").alias("population"),
        F.sum("HOUSING10").alias("houses_per_zip_code_in_community")
    )
)

print("zip_pop_all")
zip_pop_all.show(10)

# load income
income = spark.read.format('csv').options(header='true').load(income_path)
# normalise income

norm_inc = (
    income
    .select(
        F.col("Zip Code").alias("zip_code"),
        F.regexp_replace(
            F.regexp_replace(F.col("Estimated Median Income"), "\\$", ""),
            ",", ""
        ).cast("int").alias("med_inc")
    )
)

print("norm_inc")
norm_inc.show(10)

# join the tables to find money per zcta10, comm
income_data = (
        zip_pop_all
        .join(
            norm_inc,
            zip_pop_all.ZCTA10 == norm_inc.zip_code,
            "inner"
        )
    )


# group by comm and find average income per person
income_result = income_data.groupBy("COMM").agg(
    F.expr("CAST(SUM(med_inc * houses_per_zip_code_in_community) / SUM(population) AS INT)").alias("average_income")
)
print("income result:")
income_result.show(10)

# keep only top 3 and only the comm row A
top_3 = income_result.orderBy(F.desc("average_income")).limit(3).select("COMM")

print("top3 bottom3")
# also keep only the bottom 3 and only comm row B
bottom_3 = income_result.orderBy("average_income").limit(3).select("COMM")

top_3.show(10)
bottom_3.show(10)


# load crimes filter 2015
crime19 = spark.read.format('csv').options(header='true').load(crime19path)
crime19 = crime19.filter((col("LAT").isNotNull()) & (col("LAT") != 0) & (col("LON").isNotNull()) & (col("LON") != 0))
crime19 = crime19.withColumn("point", expr("ST_Point(LON, LAT)"))
crime_2015_df = crime19.withColumn(
    "Year",
    substring(col("Date Rptd"), 7, 4)
).filter(col("Year") == "2015")

print("crimes 2015")
crime_2015_df.show(10)

# join crimes with blocks on inside polygon to get the comm
crimes_in_block = (
        census
        .join(
            crime_2015_df,
            # The condition for your spatial join
            ST_Within(F.col("point"), F.col("geometry")),
            "inner"
        )
    )

print(f"crimes in block {crimes_in_block.count()} rows:")
crimes_in_block.show(10)

# keep only victim descent, comm
crimes_in_block_filtered = crimes_in_block.select("Vict Descent", "COMM")

# join with A 
joined_top_3 = crimes_in_block_filtered.join(top_3, on="COMM", how="inner")

# join with B
joined_bottom_3 = crimes_in_block_filtered.join(bottom_3, on="COMM", how="inner")

# aggregate on victim descent with count A and B
result_top = joined_top_3.groupBy("Vict Descent").agg(
    F.count("*").alias("#")
)

result_bottom = joined_bottom_3.groupBy("Vict Descent").agg(
    F.count("*").alias("#")
)

# Open Vict Desc
re_codes_df = spark.read.csv(re_codes_path, header=True, inferSchema=True)

# join to rename columns

renamed_top = result_top.join(
    re_codes_df,
    result_top["Vict Descent"] == re_codes_df["Vict Descent"],
    how="inner"
).select(
    re_codes_df["Vict Descent Full"].alias("Victim Descent"),
    result_top["#"]
)

renamed_bottom = result_bottom.join(
    re_codes_df,
    result_bottom["Vict Descent"] == re_codes_df["Vict Descent"],
    how="inner"
).select(
    re_codes_df["Vict Descent Full"].alias("Victim Descent"),
    result_bottom["#"]
)

# order in descending
ordered_top = renamed_top.orderBy(F.desc("#"))
ordered_bottom = renamed_bottom.orderBy(F.desc("#"))


ordered_top.show(100)
ordered_bottom.show(100)

elapsed_time = time.time() - start_time

print(f"Calculated in {elapsed_time} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Interrupted by user


## Δεύτερος Τρόπος
Αρχικά θέλουμε να βρούμε τα 3 communities με το μεγαλύτερο και τα 3 communities με το μικρότερο εισόδημα:

- Φορτώνουμε το csv με τα blocks.
- Φιλτράρουμε όσα δεν έχουν geometry και κρατάμε μόνο τα columns ZCTA10, COMM, POP_2010, HOUSING10, geometry.
- Φορτώνουμε το dataset με το μέσο household income per zipcode και κανονικοποιούμε το average income σε double.
- Κάνουμε join τα blocks με το income για να βρούμε το average household income για κάθε block.
- Υπολογίζουμε το average income per person για κάθε community κάνοντας aggregation στο COMΜ
- Στο aggregation κατά COMM κάνουμε και union στα blocks με την εντολή ST_Union_Aggr.
- Κάνουμε orderBy average income και κρατάμε 3 rows (για top 3 descending για bottom 3 ascending)
Πλέον έχουμε τους πίνακες με τα top και bottom 3 communitites και τις γεωμετρίες μέσα στις οποίες αν είναι τα εγκλήματα τα κρατάμε. Κάνουμε την ίδια δουλειά για τα top και bottom 3 ανεξάρτητα οπότε αναφέρουμε μόνο για το top 3.

- Φορτώνουμε τον πίνακα με τα εγκλήματα από το 2015 μέρχρι το 2019, φιλτράρουμε όσα δεν έχουν συντεταγμένες και κρατάμε μόνο αυτά που έγιναν το 2015. Επιπλέον φτιάχνουμε ST_Point με τις συντεταγμένες του κάθε ενός
- Κάνουμε join τον πίνακα με τις γεωμετρίες των top 3 communities με τον πίνακα των εγκλημάτων με join condition να είναι το έγκλημα μέσα σε κάποιο από τα communities. (θα μπορούσαμε να κάνουμε άλλο ένα union και να συγκρίνουμε μόνο με μια τιμή)
- Για κάθε έγκλημα κρατάμε μόνο το Vict Descent και το Comm (το comm δεν χρειαζόταν αλλά θέλαμε να τυπώσουμε κάποια ενδιάμεσα αποτελέσματα).
- Κάνουμε τα δεδομένα group By Vict Descent και μετράμε rows.
- Ανοίγουμε το αρχείο με τα race codes.
- Κάνουμε join τα αποτελέσματά μας με το αρχείο race codes για να πάρουμε τα ονόματα των races
- Κάνουμε order by # descending για να τυπώσουμε τα αποτελέσματα με φθίνουσα σειρά.


In [14]:
from pyspark.sql import SparkSession, functions as F
from sedona.spark import *
from pyspark.sql.functions import col
import time
from pyspark.sql.functions import monotonically_increasing_id, row_number, expr, year
from pyspark.sql.window import Window
from pyspark.sql.functions import (
    col, substring, rank, count, desc, sum as spark_sum, to_date
)



# Create spark Session
spark = SparkSession \
    .builder \
    .appName("QUERY 4") \
    .getOrCreate()

# Create sedona context
sedona = SedonaContext.create(spark)


crime19path = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv"
income_path = "s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv"
census_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
re_codes_path = "s3://initial-notebook-data-bucket-dblab-905418150721/RE_codes.csv"

start_time = time.time()

# load blocks
census = sedona.read.format('geojson').option("multiLine", "true").load(census_path).selectExpr("explode(features) as features").select("features.*")
census = census.select([col(f"properties.{col_name}").alias(col_name) for col_name in census.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")

census = census.filter(col("geometry").isNotNull())



# group by ZCTA10, COMM
# GET POP 2010 and HOUSING 10 per zcta10, comm
blocks = (
    census.filter((col("CITY") == "Los Angeles"))
    .select(
        "ZCTA10",
        "COMM",
        "POP_2010",
        "HOUSING10",
        "geometry"
    )
)

# print("blocks:")
# blocks.show(10)


# load income
income = spark.read.format('csv').options(header='true').load(income_path)
# normalise income

norm_inc = (
    income
    .select(
        F.col("Zip Code").alias("zip_code"),
        F.regexp_replace(
            F.regexp_replace(F.col("Estimated Median Income"), "\\$", ""),
            ",", ""
        ).cast("double").alias("med_inc")
    )
)

# print("norm_inc")
# norm_inc.show(10)

# join the tables to find money per zcta10, comm
income_data = (
        blocks
        .join(
            norm_inc,
            blocks.ZCTA10 == norm_inc.zip_code,
            "inner"
        )
    )


# group by comm and find average income per person
income_result = income_data.groupBy("COMM").agg(
    F.expr("CAST(SUM(med_inc * HOUSING10) / SUM(POP_2010) AS INT)").alias("average_income"),
    ST_Union_Aggr("geometry").alias("geometry")
)
# print("income result:")
# income_result.show(10)

# keep only top 3 and only the comm row A
top_3 = income_result.orderBy(F.desc("average_income")).limit(3).select("COMM", "geometry")

print("top3 bottom3")
# also keep only the bottom 3 and only comm row B
bottom_3 = income_result.orderBy("average_income").limit(3).select("COMM", "geometry")

# top_3.show(10)
# bottom_3.show(10)


# load crimes filter 2015
crime19 = spark.read.format('csv').options(header='true').load(crime19path)
crime19 = crime19.filter((col("LAT").isNotNull()) & (col("LAT") != 0) & (col("LON").isNotNull()) & (col("LON") != 0))
crime19 = crime19.withColumn("point", expr("ST_Point(LON, LAT)"))
crime_2015_df = crime19.withColumn("Date", to_date(col("DATE OCC"), "MM/dd/yyyy hh:mm:ss a")).filter(year(col("Date")) == 2015)

# print("crimes 2015")
# crime_2015_df.show(10)

# join crimes with blocks on inside polygon to get the comm
crimes_in_top = (
        top_3
        .join(
            crime_2015_df,
            # The condition for your spatial join
            ST_Within(F.col("point"), F.col("geometry")),
            "inner"
        )
    )


crimes_in_bottom = (
        bottom_3
        .join(
            crime_2015_df,
            # The condition for your spatial join
            ST_Within(F.col("point"), F.col("geometry")),
            "inner"
        )
    )



# keep only victim descent, comm
joined_top_3 = crimes_in_top.select("Vict Descent", "COMM")
joined_bottom_3 = crimes_in_bottom.select("Vict Descent", "COMM")


# aggregate on victim descent with count A and B
result_top = joined_top_3.groupBy("Vict Descent").agg(
    F.count("*").alias("#")
)

result_bottom = joined_bottom_3.groupBy("Vict Descent").agg(
    F.count("*").alias("#")
)

# Open Vict Desc
re_codes_df = spark.read.csv(re_codes_path, header=True, inferSchema=True)

# join to rename columns

renamed_top = result_top.join(
    re_codes_df,
    result_top["Vict Descent"] == re_codes_df["Vict Descent"],
    how="inner"
).select(
    re_codes_df["Vict Descent Full"].alias("Victim Descent"),
    result_top["#"]
)

renamed_bottom = result_bottom.join(
    re_codes_df,
    result_bottom["Vict Descent"] == re_codes_df["Vict Descent"],
    how="inner"
).select(
    re_codes_df["Vict Descent Full"].alias("Victim Descent"),
    result_bottom["#"]
)

# order in descending
ordered_top = renamed_top.orderBy(F.desc("#"))
ordered_bottom = renamed_bottom.orderBy(F.desc("#"))


ordered_top.show(100)
ordered_bottom.show(100)

elapsed_time = time.time() - start_time

print(f"Calculated in {elapsed_time} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

top3 bottom3
+--------------------+---+
|      Victim Descent|  #|
+--------------------+---+
|               White|695|
|               Other| 86|
|Hispanic/Latin/Me...| 77|
|             Unknown| 49|
|               Black| 43|
|         Other Asian| 22|
|             Chinese|  1|
|American Indian/A...|  1|
+--------------------+---+

+--------------------+----+
|      Victim Descent|   #|
+--------------------+----+
|Hispanic/Latin/Me...|3342|
|               Black|1127|
|               White| 428|
|               Other| 252|
|         Other Asian| 138|
|             Unknown|  30|
|American Indian/A...|  23|
|              Korean|   4|
|            Filipino|   3|
|             Chinese|   3|
|         AsianIndian|   1|
|           Guamanian|   1|
+--------------------+----+

Calculated in 91.66757154464722 seconds