# DataFrame - LA - Query_3

## Import Data from csv files

In [1]:
from sedona.spark import *
from pyspark.sql.functions import col
from pyspark.sql import SparkSession
from pyspark.sql.functions import sum as _sum
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("GeoJSON read") \
    .getOrCreate()

# Create sedona context
sedona = SedonaContext.create(spark)
# Read the file from s3
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
blocks_df = sedona.read.format("geojson") \
            .option("multiLine", "true").load(geojson_path) \
            .selectExpr("explode(features) as features") \
            .select("features.*")
# Formatting magic
flattened_df = blocks_df.select( \
                [col(f"properties.{col_name}").alias(col_name) for col_name in \
                blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]) \
            .drop("properties") \
            .drop("type")
# Print schema
flattened_df.printSchema()


Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
3478,application_1732639283265_3434,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- BG10: string (nullable = true)
 |-- BG10FIP10: string (nullable = true)
 |-- BG12: string (nullable = true)
 |-- CB10: string (nullable = true)
 |-- CEN_FIP13: string (nullable = true)
 |-- CITY: string (nullable = true)
 |-- CITYCOM: string (nullable = true)
 |-- COMM: string (nullable = true)
 |-- CT10: string (nullable = true)
 |-- CT12: string (nullable = true)
 |-- CTCB10: string (nullable = true)
 |-- HD_2012: long (nullable = true)
 |-- HD_NAME: string (nullable = true)
 |-- HOUSING10: long (nullable = true)
 |-- LA_FIP10: string (nullable = true)
 |-- OBJECTID: long (nullable = true)
 |-- POP_2010: long (nullable = true)
 |-- PUMA10: string (nullable = true)
 |-- SPA_2012: long (nullable = true)
 |-- SPA_NAME: string (nullable = true)
 |-- SUP_DIST: string (nullable = true)
 |-- SUP_LABEL: string (nullable = true)
 |-- ShapeSTArea: double (nullable = true)
 |-- ShapeSTLength: double (nullable = true)
 |-- ZCTA10: string (nullable = true)
 |-- geometry: geometry (nulla

In [2]:
#load data from crimes
data_path = 's3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv'

df = spark.read.csv(data_path, header=True, inferSchema=True)

data2_path = 's3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv'
df2 = spark.read.csv(data2_path,header = True, inferSchema = True)

df_combined = df.union(df2)
df_combined.columns
df_combined.count()
df_combined.printSchema()



FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- DR_NO: integer (nullable = true)
 |-- Date Rptd: string (nullable = true)
 |-- DATE OCC: string (nullable = true)
 |-- TIME OCC: integer (nullable = true)
 |-- AREA : integer (nullable = true)
 |-- AREA NAME: string (nullable = true)
 |-- Rpt Dist No: integer (nullable = true)
 |-- Part 1-2: integer (nullable = true)
 |-- Crm Cd: integer (nullable = true)
 |-- Crm Cd Desc: string (nullable = true)
 |-- Mocodes: string (nullable = true)
 |-- Vict Age: integer (nullable = true)
 |-- Vict Sex: string (nullable = true)
 |-- Vict Descent: string (nullable = true)
 |-- Premis Cd: integer (nullable = true)
 |-- Premis Desc: string (nullable = true)
 |-- Weapon Used Cd: integer (nullable = true)
 |-- Weapon Desc: string (nullable = true)
 |-- Status: string (nullable = true)
 |-- Status Desc: string (nullable = true)
 |-- Crm Cd 1: integer (nullable = true)
 |-- Crm Cd 2: integer (nullable = true)
 |-- Crm Cd 3: integer (nullable = true)
 |-- Crm Cd 4: integer (nullable = true)
 |-- 

In [3]:
#load data from LA income
data3_path = 's3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv'
df3 = spark.read.csv(data3_path,header = True, inferSchema = True)
df3.columns
df3.count()
df3.printSchema()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- Zip Code: integer (nullable = true)
 |-- Community: string (nullable = true)
 |-- Estimated Median Income: string (nullable = true)

## The average annual income per person for each area of Los Angeles 

In [4]:
from pyspark.sql import functions as F

# Join με την βάση df3
joined_df = flattened_df.join(df3, flattened_df["COMM"] == df3["Community"], "inner")

# Καθαρισμός της στήλης "Estimated Median Income"
joined_df = joined_df.withColumn(
    "Cleaned Estimated Median Income",
    F.regexp_replace(F.col("Estimated Median Income"), "[$,]", "").cast("double")
)
joined_df.select("Community","Cleaned Estimated Median Income","POP_2010","HOUSING10")
# Ομαδοποίηση κατά την περιοχή "Community" και υπολογισμός των απαιτούμενων τιμών
grouped_df = joined_df.groupBy("Community").agg(
    F.sum("POP_2010").alias("Total Population"),
    F.sum("HOUSING10").alias("Total Households"),
    F.avg("Cleaned Estimated Median Income").alias("Avg Estimated Median Income")
)

# Υπολογισμός της στήλης "Income per Individual"
grouped_df = grouped_df.withColumn(
    "Income per Individual",
    (F.col("Total Households") * F.col("Avg Estimated Median Income")) / F.col("Total Population")
)

# Δημιουργία της στήλης με το δολάριο για τα αποτελέσματα
grouped_df = grouped_df.withColumn(
    "Estimated Median Income per Household",
    F.concat(F.lit("$"), F.format_number(F.col("Avg Estimated Median Income"), 2))
)

grouped_df = grouped_df.withColumn(
    "Income per Individual",
    F.concat(F.lit("$"), F.format_number(F.col("Income per Individual"), 2))
)

# Επιλογή των στηλών για εμφάνιση
result_df = grouped_df.select(
    "Community", 
    "Total Population", 
    "Total Households", 
    "Estimated Median Income per Household",
    "Income per Individual"
)

# Εμφάνιση του αποτελέσματος
result_df.show(truncate=False,n=500)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------------+----------------+-------------------------------------+---------------------+
|Community           |Total Population|Total Households|Estimated Median Income per Household|Income per Individual|
+--------------------+----------------+----------------+-------------------------------------+---------------------+
|Culver City         |77766           |34982           |$75,913.50                           |$34,148.68           |
|Pico Rivera         |62942           |17109           |$55,758.00                           |$15,156.23           |
|Malibu              |12645           |6864            |$123,681.00                          |$67,136.92           |
|Hacienda Heights    |53594           |16524           |$78,000.00                           |$24,048.81           |
|Montebello          |62500           |19768           |$45,898.00                           |$14,516.99           |
|Hawaiian Gardens    |14254           |3703            |$37,543.

## The ratio of total number of crimes per person for each Los Angeles area

In [5]:
# Βήμα 1: Προσθήκη στήλης για γεωμετρικά σημεία στα δεδομένα εγκλημάτων
df_combined = df_combined.withColumn("geom", ST_Point("LON", "LAT"))

# Βήμα 2: Εντοπισμός αν τα σημεία ανήκουν σε κάποιο πολύγωνο
df_joined = flattened_df.join(
    df_combined,
    ST_Contains(flattened_df["geometry"], df_combined["geom"]),
    how="inner"
)

df_zip = df_joined.join(df3, df_joined["COMM"] == df3["Community"], "inner")
df_zip.select("Community","geometry")
# Βήμα 3: Προσθήκη στήλης "Sum of Crimes"
df_aggregated = df_zip.groupBy("Community").agg(
    _sum(col("geometry").isNotNull().cast("int")).alias("Sum_of_Crimes")
)


df_final=result_df.join(df_aggregated, result_df['Community'] == df_aggregated['Community'], "left")

# Βήμα 4: Υπολογισμός της στήλης "Ratio_of_Crimes_Per_Person"
df_final = df_final.withColumn(
    "Ratio_of_Crimes_Per_Person", 
    col("Sum_of_Crimes") / col("Total Population")
)

# Βήμα 5: Εμφάνιση των αποτελεσμάτων
df_final = df_final.select(result_df['Community'],"Total Population","Sum_of_Crimes","Ratio_of_Crimes_Per_Person")

# Προβολή των αποτελεσμάτων
df_final.show(truncate=False,n=500)


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+----------------+-------------+--------------------------+
|Community           |Total Population|Sum_of_Crimes|Ratio_of_Crimes_Per_Person|
+--------------------+----------------+-------------+--------------------------+
|Culver City         |77766           |2780         |0.03574827045238279       |
|Pico Rivera         |62942           |2            |3.177528518318452E-5      |
|Malibu              |12645           |1            |7.908264136022143E-5      |
|Hacienda Heights    |53594           |NULL         |NULL                      |
|Montebello          |62500           |6            |9.6E-5                    |
|Hawaiian Gardens    |14254           |NULL         |NULL                      |
|Westlake Village    |16540           |2            |1.2091898428053205E-4     |
|Carson              |183428          |862          |0.004699391586889679      |
|Glendale            |1150314         |816          |7.093715281218867E-4      |
|Signal Hill         |11016 

### Εxplain

In [6]:
print("Explain for first task")
result_df.explain(mode="formatted")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Explain for first task
== Physical Plan ==
AdaptiveSparkPlan (15)
+- Project (14)
   +- HashAggregate (13)
      +- Exchange (12)
         +- HashAggregate (11)
            +- Project (10)
               +- BroadcastHashJoin Inner BuildRight (9)
                  :- Project (5)
                  :  +- Filter (4)
                  :     +- Generate (3)
                  :        +- Filter (2)
                  :           +- Scan geojson  (1)
                  +- BroadcastExchange (8)
                     +- Filter (7)
                        +- Scan csv  (6)


(1) Scan geojson 
Output [1]: [features#25]
Batched: false
Location: InMemoryFileIndex [s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson]
PushedFilters: [IsNotNull(features)]
ReadSchema: struct<features:array<struct<geometry:binary,properties:struct<BG10:string,BG10FIP10:string,BG12:string,CB10:string,CEN_FIP13:string,CITY:string,CITYCOM:string,COMM:string,CT10:string,CT12:string,CTCB10:string,HD_20

In [7]:
print("Explain for second task")
df_final.explain(mode="formatted")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Explain for second task
== Physical Plan ==
AdaptiveSparkPlan (40)
+- Project (39)
   +- SortMergeJoin LeftOuter (38)
      :- Sort (14)
      :  +- HashAggregate (13)
      :     +- Exchange (12)
      :        +- HashAggregate (11)
      :           +- Project (10)
      :              +- BroadcastHashJoin Inner BuildRight (9)
      :                 :- Project (5)
      :                 :  +- Filter (4)
      :                 :     +- Generate (3)
      :                 :        +- Filter (2)
      :                 :           +- Scan geojson  (1)
      :                 +- BroadcastExchange (8)
      :                    +- Filter (7)
      :                       +- Scan csv  (6)
      +- Sort (37)
         +- HashAggregate (36)
            +- Exchange (35)
               +- HashAggregate (34)
                  +- Project (33)
                     +- BroadcastHashJoin Inner BuildRight (32)
                        :- Project (28)
                        :  +- RangeJoin (27)
   

#### Conclutions

In [8]:
print("Task 1:")
print("In the first task, we perform 1 join:")
print("  joined_df = flattened_df.join(df3, flattened_df['ZCTA10'] == df3['Zip Code'], 'inner')")
print(
    "This join uses the BroadcastHashJoin strategy. "
    "It connects the 2010_Census_Blocks.geojson data with LA_income_2015.csv, "
    "linking income data to the population per Community of L.A."
)

print("\nTask 2:")
print("In the second task, we perform 3 joins:")
print("  1. df_joined = flattened_df.join(df_combined, ST_Contains(flattened_df['geometry'], df_combined['geom']), 'inner')")
print("     - Strategy: RangeJoin")
print("  2. df_zip = df_joined.join(df3, df_joined['ZCTA10'] == df3['Zip Code'], 'inner')")
print("     - Strategy: BroadcastHashJoin")
print("  3. df_final = result_df.join(df_aggregated, result_df['Community'] == df_aggregated['Community'], 'left')")
print("     - Strategy: SortMergeJoin")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Task 1:
In the first task, we perform 1 join:
  joined_df = flattened_df.join(df3, flattened_df['ZCTA10'] == df3['Zip Code'], 'inner')
This join uses the BroadcastHashJoin strategy. It connects the 2010_Census_Blocks.geojson data with LA_income_2015.csv, linking income data to the population per Community of L.A.

Task 2:
In the second task, we perform 3 joins:
  1. df_joined = flattened_df.join(df_combined, ST_Contains(flattened_df['geometry'], df_combined['geom']), 'inner')
     - Strategy: RangeJoin
  2. df_zip = df_joined.join(df3, df_joined['ZCTA10'] == df3['Zip Code'], 'inner')
     - Strategy: BroadcastHashJoin
  3. df_final = result_df.join(df_aggregated, result_df['Community'] == df_aggregated['Community'], 'left')
     - Strategy: SortMergeJoin