# Εξαμηνιαία Εργασία

### Νικόλαος Καρακώστας 03120138
### Μιχαήλ Δημητρόπουλος 03120119

## Ερώτημα 1

### RDD APIs

In [3]:
from pyspark.sql import SparkSession
import time
import csv

# Initialize SparkSession and SparkContext
# Configure SparkSession with 4 executors
spark = SparkSession.builder \
    .appName("Query 1 RDD") \
    .config("spark.executor.instances", "4") \
    .getOrCreate()

sc = spark.sparkContext

# Start timing
start_time = time.time()

# Load and process the data for 2010s
crime_data_10s = sc.textFile("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv")
header_10s = crime_data_10s.first()
crime_data_10s = crime_data_10s.filter(lambda x: x != header_10s)
crime_data_10s = crime_data_10s.map(lambda line: list(csv.reader([line]))[0])
filtered_10s = crime_data_10s.filter(lambda x: "AGGRAVATED ASSAULT" in x[9])
useful_10s = filtered_10s.map(lambda x: [x[11], [x[0]]])

# Load and process the data for 2020s
crime_data_20s = sc.textFile("s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv")
header_20s = crime_data_20s.first()
crime_data_20s = crime_data_20s.filter(lambda x: x != header_20s)
crime_data_20s = crime_data_20s.map(lambda line: list(csv.reader([line]))[0])
filtered_20s = crime_data_20s.filter(lambda x: "AGGRAVATED ASSAULT" in x[9])
useful_20s = filtered_20s.map(lambda x: [x[11], [x[0]]])

# Combine the two RDDs
combined_useful = useful_10s.union(useful_20s)

# Function to categorize age into groups
def categorize_age(age):
    try:
        age = int(age)  # Convert age to integer
        if age < 18:
            return "Younger than 18"
        elif 18 <= age <= 24:
            return "Between 18 and 24"
        elif 25 <= age <= 64:
            return "Between 25 and 64"
        else:
            return "Older than 64"
    except ValueError:
        return "Unknown"  # Handle cases where age is not a valid integer

# Group and count by age categories
grouped_data = combined_useful.map(lambda x: (categorize_age(x[0]), 1)) \
                              .reduceByKey(lambda a, b: a + b)

# Sort the groups by count in descending order
sorted_groups = grouped_data.sortBy(lambda x: x[1], ascending=False)

# Collect and print the results
results = sorted_groups.collect()
for group, count in results:
    print(f"{group}: {count}")

# End timing
end_time = time.time()
print(f"Execution time: {end_time - start_time} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Between 25 and 64: 121093
Between 18 and 24: 33605
Younger than 18: 15928
Older than 64: 5985
Execution time: 7.186655759811401 seconds

### DataFrame

In [6]:
from pyspark.sql import SparkSession
import time
from pyspark.sql.functions import col, when, count

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Query 1 DataFrame") \
    .config("spark.executor.instances", "4") \
    .getOrCreate()

# Start timing
start_time = time.time()

# Load and process the data for 2010s
crime_data_10s = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True,
    inferSchema=True
)

# Filter for the required description and select necessary columns
useful_10s = crime_data_10s.filter(
    col("Crm Cd Desc").contains("AGGRAVATED ASSAULT")
).select(
    col("Vict Age").alias("age"),
    col("DR_NO").alias("id")
)

# Load and process the data for 2020s
crime_data_20s = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True,
    inferSchema=True
)

# Filter for the required description and select necessary columns
useful_20s = crime_data_20s.filter(
    col("Crm Cd Desc").contains("AGGRAVATED ASSAULT")
).select(
    col("Vict Age").alias("age"),
    col("DR_NO").alias("id")
)

# Combine the two DataFrames
combined_useful = useful_10s.union(useful_20s)

# Categorize age into groups
categorized = combined_useful.withColumn(
    "age_group",
    when(col("age") < 18, "Younger than 18")
    .when((col("age") >= 18) & (col("age") <= 24), "Between 18 and 24")
    .when((col("age") >= 25) & (col("age") <= 64), "Between 25 and 64")
    .when(col("age") > 64, "Older than 64")
    .otherwise("Unknown")
)

# Group by age group and count
grouped_data = categorized.groupBy("age_group").agg(count("*").alias("count"))

# Sort the groups by count in descending order
sorted_groups = grouped_data.orderBy(col("count").desc())

# Collect and print results
results = sorted_groups.collect()
for row in results:
    print(f"{row['age_group']}: {row['count']}")

# End timing
end_time = time.time()
print(f"Execution time: {end_time - start_time} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Between 25 and 64: 121093
Between 18 and 24: 33605
Younger than 18: 15928
Older than 64: 5985
Execution time: 3.670485734939575 seconds

## Ερώτημα 2

### DataFrame

In [12]:
from pyspark.sql import SparkSession, Window
from pyspark.sql.functions import col, count, when, desc, rank
import time

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Query 2 DataFrame") \
    .config("spark.executor.instances", "4") \
    .getOrCreate()

# Start timing
start_time = time.time()

# Load data for the years 2010-2019
crime_data_10s = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True,
    inferSchema=True
)

# Load data for the years 2020-present
crime_data_20s = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True,
    inferSchema=True
)

# Combine both datasets
combined_data = crime_data_10s.union(crime_data_20s)

# Add a "year" column based on the "DATE OCC" column
combined_data = combined_data.withColumn("year", col("DATE OCC").substr(7, 4).cast("int"))

# Filter out rows where "year" is null or invalid
combined_data = combined_data.filter(col("year").isNotNull())

# Calculate total cases per year and precinct
total_cases = combined_data.groupBy("year", "AREA NAME").agg(
    count("*").alias("total_cases")
)

# Filter completed cases (closed cases have a "STATUS" not equal to "IC")
completed_cases = combined_data.filter(col("Status Desc") != "UNK")
completed_cases = combined_data.filter(col("Status Desc") != "Invest Cont")

# Calculate closed cases per year and precinct
closed_cases = completed_cases.groupBy("year", "AREA NAME").agg(
    count("*").alias("closed_cases")
)

# Join total cases and closed cases
case_rates = total_cases.join(
    closed_cases,
    on=["year", "AREA NAME"],
    how="left"
).withColumn(
    "closed_case_rate", (col("closed_cases") / col("total_cases")) * 100
)

# Rank precincts by closed case rate per year
window_spec = Window.partitionBy("year").orderBy(desc("closed_case_rate"))
ranked_data = case_rates.withColumn("rank", rank().over(window_spec))

# Select top 3 precincts per year
top_3_precincts = ranked_data.filter(col("rank") <= 3)

# Sort results by year and rank
result = top_3_precincts.orderBy("year", "rank")

# Show final result
result.select("year", "AREA NAME", "closed_case_rate", "rank").show(60)

# End timing
end_time = time.time()
print(f"Execution time: {end_time - start_time} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+------------------+----+
|year|  AREA NAME|  closed_case_rate|rank|
+----+-----------+------------------+----+
|2010|    Rampart| 32.84713448949121|   1|
|2010|    Olympic|31.515289821999087|   2|
|2010|     Harbor| 29.36028339237341|   3|
|2011|    Olympic|35.040060090135206|   1|
|2011|    Rampart|  32.4964471814306|   2|
|2011|     Harbor| 28.51336246316431|   3|
|2012|    Olympic| 34.29708533302119|   1|
|2012|    Rampart| 32.46000463714352|   2|
|2012|     Harbor|29.509585848956675|   3|
|2013|    Olympic| 33.58217940999398|   1|
|2013|    Rampart|  32.1060382916053|   2|
|2013|     Harbor|29.735499940695053|   3|
|2014|   Van Nuys|  32.0215235281705|   1|
|2014|West Valley| 31.49754809505847|   2|
|2014|    Mission|31.224939855653567|   3|
|2015|   Van Nuys|32.265140677157845|   1|
|2015|    Mission|30.463762673676303|   2|
|2015|   Foothill|30.353001803658852|   3|
|2016|   Van Nuys|32.194518462124094|   1|
|2016|West Valley| 31.40146437042384|   2|
|2016|   Fo

### SQL APIs

In [13]:
from pyspark.sql import SparkSession
import time

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Query 2 SQL API") \
    .config("spark.executor.instances", "4") \
    .getOrCreate()

# Start timing
start_time = time.time()

# Load data for the years 2010-2019
crime_data_10s = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True,
    inferSchema=True
)

# Load data for the years 2020-present
crime_data_20s = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True,
    inferSchema=True
)

# Combine both datasets
combined_data1 = crime_data_10s.union(crime_data_20s)

# Add a "year" column based on the "DATE OCC" column
combined_data = combined_data1.withColumn("year", combined_data1["DATE OCC"].substr(7, 4).cast("int"))

# Filter out rows where "year" is null or invalid
combined_data = combined_data.filter(combined_data["year"].isNotNull())

# Register combined_data as a temporary view
combined_data.createOrReplaceTempView("crime_data")

# Write SQL query for total cases per year and precinct
spark.sql("""
    CREATE OR REPLACE TEMP VIEW total_cases AS
    SELECT
        year,
        `AREA NAME` AS precinct,
        COUNT(*) AS total_cases
    FROM crime_data
    GROUP BY year, `AREA NAME`
""")

# Write SQL query for closed cases per year and precinct
spark.sql("""
    CREATE OR REPLACE TEMP VIEW closed_cases AS
    SELECT
        year,
        `AREA NAME` AS precinct,
        COUNT(*) AS closed_cases
    FROM crime_data
    WHERE `Status Desc` != 'Invest Cont' AND `Status Desc` != 'UNK'
    GROUP BY year, `AREA NAME`
""")


# Join total_cases and closed_cases to calculate closed_case_rate
spark.sql("""
    CREATE OR REPLACE TEMP VIEW case_rates AS
    SELECT
        t.year,
        t.precinct,
        t.total_cases,
        c.closed_cases,
        (c.closed_cases / t.total_cases) * 100 AS closed_case_rate
    FROM total_cases t
    LEFT JOIN closed_cases c
    ON t.year = c.year AND t.precinct = c.precinct
""")

# Rank precincts by closed case rate per year
spark.sql("""
    CREATE OR REPLACE TEMP VIEW ranked_data AS
    SELECT
        year,
        precinct,
        total_cases,
        closed_cases,
        closed_case_rate,
        RANK() OVER (PARTITION BY year ORDER BY closed_case_rate DESC) AS rank
    FROM case_rates
""")

# Select top 3 precincts per year
top_3_precincts = spark.sql("""
    SELECT
        year,
        precinct,
        closed_case_rate,
        rank
    FROM ranked_data
    WHERE rank <= 3
    ORDER BY year, rank
""")

# Show final result
top_3_precincts.show(60)

# End timing
end_time = time.time()
print(f"Execution time: {end_time - start_time} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+------------------+----+
|year|   precinct|  closed_case_rate|rank|
+----+-----------+------------------+----+
|2010|    Rampart| 32.84713448949121|   1|
|2010|    Olympic|31.515289821999087|   2|
|2010|     Harbor| 29.36028339237341|   3|
|2011|    Olympic|35.040060090135206|   1|
|2011|    Rampart|  32.4964471814306|   2|
|2011|     Harbor| 28.51336246316431|   3|
|2012|    Olympic| 34.29708533302119|   1|
|2012|    Rampart| 32.46000463714352|   2|
|2012|     Harbor|29.509585848956675|   3|
|2013|    Olympic| 33.58217940999398|   1|
|2013|    Rampart|  32.1060382916053|   2|
|2013|     Harbor|29.723638951488557|   3|
|2014|   Van Nuys|  32.0215235281705|   1|
|2014|West Valley| 31.49754809505847|   2|
|2014|    Mission|31.224939855653567|   3|
|2015|   Van Nuys|32.265140677157845|   1|
|2015|    Mission|30.463762673676303|   2|
|2015|   Foothill|30.353001803658852|   3|
|2016|   Van Nuys|32.194518462124094|   1|
|2016|West Valley| 31.40146437042384|   2|
|2016|   Fo

### Parquet Creation

In [14]:
group_number = "30"
s3_path = "s3://groups-bucket-dblab-905418150721/group"+group_number+"/closed_cases/"
combined_data1.coalesce(1).write.mode("overwrite").parquet(s3_path)
combined_data1_again = spark.read.parquet(s3_path)
combined_data1_again.show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+---------+--------------------+--------------------+--------+-----+---------+-----------+--------+------+--------------------+--------------+--------+--------+------------+---------+--------------------+--------------+--------------------+------+------------+--------+--------+--------+--------+--------------------+--------------------+-------+---------+
|    DR_NO|           Date Rptd|            DATE OCC|TIME OCC|AREA |AREA NAME|Rpt Dist No|Part 1-2|Crm Cd|         Crm Cd Desc|       Mocodes|Vict Age|Vict Sex|Vict Descent|Premis Cd|         Premis Desc|Weapon Used Cd|         Weapon Desc|Status| Status Desc|Crm Cd 1|Crm Cd 2|Crm Cd 3|Crm Cd 4|            LOCATION|        Cross Street|    LAT|      LON|
+---------+--------------------+--------------------+--------+-----+---------+-----------+--------+------+--------------------+--------------+--------+--------+------------+---------+--------------------+--------------+--------------------+------+------------+--------+--------+--------

### SQL API with parquet

In [20]:
from pyspark.sql import SparkSession
import time

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("Query 2 SQL API") \
    .config("spark.executor.instances", "4") \
    .getOrCreate()

# Start timing
start_time = time.time()

# Load the Parquet file
s3_path = "s3://groups-bucket-dblab-905418150721/group30/closed_cases/part-00000-b293af4c-cae0-4a69-b32c-1d18c84d36a0-c000.snappy.parquet"
combined_data = spark.read.parquet(s3_path)

# Add a "year" column if not already present
if "year" not in combined_data.columns:
    combined_data = combined_data.withColumn("year", combined_data["DATE OCC"].substr(7, 4).cast("int"))

# Filter out rows where "year" is null or invalid
combined_data = combined_data.filter(combined_data["year"].isNotNull())

# Register combined_data as a temporary view
combined_data.createOrReplaceTempView("crime_data")

# Write SQL query for total cases per year and precinct
spark.sql("""
    CREATE OR REPLACE TEMP VIEW total_cases AS
    SELECT
        year,
        `AREA NAME` AS precinct,
        COUNT(*) AS total_cases
    FROM crime_data
    GROUP BY year, `AREA NAME`
""")

# Write SQL query for closed cases per year and precinct
spark.sql("""
    CREATE OR REPLACE TEMP VIEW closed_cases AS
    SELECT
        year,
        `AREA NAME` AS precinct,
        COUNT(*) AS closed_cases
    FROM crime_data
    WHERE STATUS != 'IC'
    GROUP BY year, `AREA NAME`
""")

# Join total_cases and closed_cases to calculate closed_case_rate
spark.sql("""
    CREATE OR REPLACE TEMP VIEW case_rates AS
    SELECT
        t.year,
        t.precinct,
        t.total_cases,
        c.closed_cases,
        (c.closed_cases / t.total_cases) * 100 AS closed_case_rate
    FROM total_cases t
    LEFT JOIN closed_cases c
    ON t.year = c.year AND t.precinct = c.precinct
""")

# Rank precincts by closed case rate per year
spark.sql("""
    CREATE OR REPLACE TEMP VIEW ranked_data AS
    SELECT
        year,
        precinct,
        total_cases,
        closed_cases,
        closed_case_rate,
        RANK() OVER (PARTITION BY year ORDER BY closed_case_rate DESC) AS rank
    FROM case_rates
""")

# Select top 3 precincts per year
top_3_precincts = spark.sql("""
    SELECT
        year,
        precinct,
        closed_case_rate,
        rank
    FROM ranked_data
    WHERE rank <= 3
    ORDER BY year, rank
""")

# Show final result
top_3_precincts.show(60)

# End timing
end_time = time.time()
print(f"Execution time: {end_time - start_time} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+------------------+----+
|year|   precinct|  closed_case_rate|rank|
+----+-----------+------------------+----+
|2010|    Rampart| 32.84713448949121|   1|
|2010|    Olympic|31.515289821999087|   2|
|2010|     Harbor| 29.36028339237341|   3|
|2011|    Olympic|35.040060090135206|   1|
|2011|    Rampart|  32.4964471814306|   2|
|2011|     Harbor| 28.51336246316431|   3|
|2012|    Olympic| 34.29708533302119|   1|
|2012|    Rampart| 32.46000463714352|   2|
|2012|     Harbor|29.509585848956675|   3|
|2013|    Olympic| 33.58217940999398|   1|
|2013|    Rampart|  32.1060382916053|   2|
|2013|     Harbor|29.723638951488557|   3|
|2014|   Van Nuys|  32.0215235281705|   1|
|2014|West Valley| 31.49754809505847|   2|
|2014|    Mission|31.224939855653567|   3|
|2015|   Van Nuys|32.265140677157845|   1|
|2015|    Mission|30.463762673676303|   2|
|2015|   Foothill|30.353001803658852|   3|
|2016|   Van Nuys|32.194518462124094|   1|
|2016|West Valley| 31.40146437042384|   2|
|2016|   Fo

## Ερώτημα 3

### DataFrame

In [93]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, sum, first, regexp_replace, concat, lit, lower
from sedona.spark import *
import time

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("Query 3") \
    .config("spark.executor.instances", "4") \
    .getOrCreate()

# Create Sedona Context
sedona = SedonaContext.create(spark)

geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
income_data_path = "s3://initial-notebook-data-bucket-dblab-905418150721/LA_income_2015.csv"
crime_data_10s_path = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv"
crime_data_20s_path = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv"

income_data_df = spark.read.csv(income_data_path,header=True,inferSchema=True)
crime_data_10s = spark.read.csv(crime_data_10s_path,header=True,inferSchema=True)
crime_data_20s = spark.read.csv(crime_data_20s_path,header=True,inferSchema=True)
crime_data = crime_data_10s.union(crime_data_20s)

# Load GeoJSON Data (2010 Census Blocks) from S3
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
census_blocks_df = sedona.read.format("geojson") \
    .option("multiLine", "true").load(geojson_path) \
    .selectExpr("explode(features) as features") \
    .select("features.*")

# Flatten GeoJSON properties
flattened_census_blocks_df = census_blocks_df.select(
    [col(f"properties.{col_name}").alias(col_name) for col_name in census_blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]
).drop("properties").drop("type")

census_geometry = flattened_census_blocks_df.filter(col("CITY")== "Los Angeles").select("COMM", "HOUSING10", "POP_2010", "ZCTA10", "geometry")\
    .groupBy("COMM", "ZCTA10")\
    .agg(ST_Union_Aggr("geometry").alias("geometry"), 
    sum("POP_2010").alias("Total_POP_2010"),
    sum("HOUSING10").alias("Total_HOUSING10")
        )

#census_selected_columns_df.orderBy("COMM").show()
#census_selected_columns_df.count()

# Remove the first character of the "Estimated Median Income" $
income_data_df = income_data_df.withColumn(
    "Estimated Median Income",
    col("Estimated Median Income").substr(2, 100) # Keep characters starting from position 2
)
income_data_df = income_data_df.withColumn(
    "Estimated Median Income",
    regexp_replace(col("Estimated Median Income"), ",", "")
)

# Filter and clean Income Data (ensure ZCTA10 and COMM_income are not null)
income_data_df = income_data_df.filter(
    (col("Zip Code").isNotNull()) & (col("Community").isNotNull()) & (col("Estimated Median Income").isNotNull())
)

crime_points = crime_data.filter(col("LAT").isNotNull() & col("LON").isNotNull())\
    .withColumn("geometry",ST_Point(col("LON"), col("LAT"))) \
    .select("geometry")

# Perform spatial join: Find crimes inside polygons
community_crimes = crime_points.join(
    census_geometry.hint("BROADCAST"),
    ST_Within(crime_points["geometry"], census_geometry["geometry"]),
    "inner"
).groupBy("COMM").agg(
    count("*").alias("Total Crimes")
)

result_df = (
    census_geometry.join(
        income_data_df.hint("BROADCAST"),
        census_geometry["ZCTA10"] == income_data_df["Zip Code"],
        "inner"
    ).select("COMM", "ZCTA10", "Total_POP_2010", "Total_HOUSING10","Estimated Median Income")
    .filter(
        (col("Total_POP_2010") != 0) & (col("Total_HOUSING10") != 0)
    )
)

# Add "Income per Person" column with at most 2 decimals
result_df = result_df.withColumn(
    "Income_Per_Person",
    round((col("Estimated Median Income") * col("Total_HOUSING10")) / col("Total_POP_2010"), 2) 
)
#result_df.orderBy("COMM").show()

total_population = result_df.groupBy("COMM").agg(sum("Total_POP_2010").alias("Community_Population"))

#total_population.orderBy("COMM").show()

community_income = result_df.join(
    total_population.hint("BROADCAST"),
    on="COMM",
    how="left"
)

community_income = community_income.withColumn("Community_Income",
                                 round((col("Total_POP_2010") * col("Income_Per_Person") 
            /col("Community_Population")),2))

community_income = (
    community_income
    .groupBy("COMM")
    .agg(
        sum("Total_POP_2010").alias("Community_Population"),
        round(sum("Community_Income"), 2).alias("Community_Income")
    )
)

#community_income.orderBy("COMM").show()

result_df = (
    community_income.join(
        community_crimes.hint("BROADCAST"),
        on="COMM",
        how="inner"  
    )
    .select(
        col("COMM").alias("Community"),
        col("Community_Population"),
        col("Community_Income").alias("Income_Per_Person"),
        col("Total Crimes"),
    )
)

#result_df.orderBy("COMM").show()

# Now calculate "Crimes_Per_Person" for each community
result_df = result_df.withColumn(
    "Crimes_Per_Person",
    round(col("Total Crimes") / col("Community_Population"), 2)
)

final_df = result_df.select(
    col("Community"),
    col("Income_Per_Person"),
    col("Crimes_Per_Person")
)

final_df.orderBy("Community").show()
final_df.count()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+-----------------+-----------------+
|           Community|Income_Per_Person|Crimes_Per_Person|
+--------------------+-----------------+-----------------+
|     Adams-Normandie|          8791.46|             0.74|
|              Alsace|          11239.5|             0.55|
|Angeles National ...|         28117.65|            11.35|
|    Angelino Heights|         18411.55|             0.63|
|              Arleta|         12110.78|             0.44|
|     Atwater Village|          28477.2|             0.69|
|       Baldwin Hills|          17302.7|             1.19|
|             Bel Air|         63041.34|             0.43|
|       Beverly Crest|         60947.49|             0.37|
|         Beverlywood|         29267.82|             0.52|
|       Boyle Heights|          8434.46|             0.72|
|           Brentwood|         60840.62|              0.5|
|           Brookside|         18138.62|             0.89|
|    Cadillac-Corning|         19572.78|             0.6

## Ερώτημα 4

In [94]:
result_df.orderBy("Community").show()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+--------------------+-----------------+------------+-----------------+
|           Community|Community_Population|Income_Per_Person|Total Crimes|Crimes_Per_Person|
+--------------------+--------------------+-----------------+------------+-----------------+
|     Adams-Normandie|                7842|          8791.46|        5779|             0.74|
|              Alsace|               11728|          11239.5|        6466|             0.55|
|Angeles National ...|                  20|         28117.65|         227|            11.35|
|    Angelino Heights|                2376|         18411.55|        1497|             0.63|
|              Arleta|               32876|         12110.78|       14542|             0.44|
|     Atwater Village|               14101|          28477.2|        9696|             0.69|
|       Baldwin Hills|               28637|          17302.7|       34194|             1.19|
|             Bel Air|                8261|         63041.34|        3

In [120]:
from pyspark.sql.functions import desc, col, when

from pyspark.sql import SparkSession

import time

spark = SparkSession.builder \
    .appName("Query4_4Cores_8GB") \
    .config("spark.executor.instances", "2") \
    .config("spark.executor.cores", "4") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

# Load GeoJSON Data (2010 Census Blocks) from S3
geojson_path = "s3://initial-notebook-data-bucket-dblab-905418150721/2010_Census_Blocks.geojson"
census_blocks_df = sedona.read.format("geojson") \
    .option("multiLine", "true").load(geojson_path) \
    .selectExpr("explode(features) as features") \
    .select("features.*")

# Flatten GeoJSON properties
flattened_census_blocks_df = census_blocks_df.select(
    [col(f"properties.{col_name}").alias(col_name) for col_name in census_blocks_df.schema["properties"].dataType.fieldNames()] + ["geometry"]
).drop("properties").drop("type")


census_geometry = flattened_census_blocks_df.filter(col("CITY")== "Los Angeles").select("COMM", "ZCTA10", "geometry")\
    .groupBy("COMM")\
    .agg(ST_Union_Aggr("geometry").alias("geometry"))
         
#census_geometry.orderBy("COMM").show()

# Get the 3 highest incomes per person
highest_income = final_df.orderBy(desc("Community_Income")).limit(3)
#highest_income.show()

# Get the 3 lowest incomes per person
lowest_income = final_df.orderBy("Community_Income").limit(3)
#lowest_income.show()

# Select the geometry for the 3 communities with the highest income
highest_income_geometry = (
    highest_income
    .join(census_geometry, final_df["Community"] == census_geometry["COMM"])
    .select(final_df["Community"], "Income_Per_Person", census_geometry["geometry"])
)

#highest_income_geometry.show()

# Select the geometry for the 3 communities with the lowest income
lowest_income_geometry = (
    lowest_income
    .join(census_geometry, final_df["Community"] == census_geometry["COMM"])
    .select(final_df["Community"], "Income_Per_Person", census_geometry["geometry"])
)

#lowest_income_geometry.show()

# Load and process the data for 2010s
crime_data_10s_path =  "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv"
crime_data_10s = spark.read.csv(crime_data_10s_path
   ,header=True,
    inferSchema=True
)


crime_with_year = (
    crime_data_10s
    .withColumn("year", col("DATE OCC").substr(7, 4).cast("int"))
    .filter((col("year") == 2015) & (col("Vict Descent").isNotNull()) & (col("LAT").isNotNull()) & (col("LON").isNotNull()))
    .select(
        col("DR_NO"),
        col("Vict Descent"),
        col("LAT"),
        col("LON"),
        col("year")
    )
)

#crime_with_year.show()

crime_2015_points = crime_with_year.withColumn("point", ST_Point(col("LON"), col("LAT")))

#crime_2015_points.show()



  # Perform the spatial join using ST_Within
crimes_in_high_income = crime_2015_points.join(
    highest_income_geometry,
    ST_Within(crime_2015_points["point"] ,highest_income_geometry["geometry"]),
    "inner"
).select(
    crime_2015_points["DR_NO"],
    crime_2015_points["Vict Descent"],
    crime_2015_points["LAT"],
    crime_2015_points["LON"],
    highest_income_geometry["Community"],
    highest_income_geometry["Income_Per_Person"]
)

crimes_in_low_income = crime_2015_points.join(
    lowest_income_geometry,  
    ST_Within(crime_2015_points["point"], lowest_income_geometry["geometry"]),
    "inner"
).select(
    crime_2015_points["DR_NO"],
    crime_2015_points["Vict Descent"],
    crime_2015_points["LAT"],
    crime_2015_points["LON"],
    lowest_income_geometry["Community"],  
    lowest_income_geometry["Income_Per_Person"]  
)

descent_mapping = {
    "A": "Other Asian",
    "B": "Black",
    "C": "Chinese",
    "D": "Cambodian",
    "F": "Filipino",
    "G": "Guamanian",
    "H": "Hispanic/Latin/Mexican",
    "I": "American Indian/Alaskan Native",
    "J": "Japanese",
    "K": "Korean",
    "L": "Laotian",
    "O": "Other",
    "P": "Pacific Islander",
    "S": "Samoan",
    "U": "Hawaiian",
    "V": "Vietnamese",
    "W": "White",
    "X": "Unknown",
    "Z": "Asian Indian"
}

# Build the when condition for all mappings
descent_column = when(col("Vict Descent") == "A", "Other Asian")
for code, description in descent_mapping.items():
    descent_column = descent_column.when(col("Vict Descent") == code, description)

# Replace codes with descriptions in the DataFrame
highest_vict_descent = crimes_in_high_income.groupBy("Vict Descent").count().withColumn(
    "Victim Descent", descent_column
).select(
    col("Victim Descent"),
    col("count").alias("#")
).orderBy(col("#").desc())

# Replace codes with descriptions in the DataFrame for lowest income communities
lowest_vict_descent = crimes_in_low_income.groupBy("Vict Descent").count().withColumn(
    "Victim Descent", descent_column
).select(
    col("Victim Descent"),
    col("count").alias("#")
).orderBy(col("#").desc())

highest_vict_descent.show()
lowest_vict_descent.show()                                                            
                                                            
                

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+--------------------+---+
|      Victim Descent|  #|
+--------------------+---+
|               White|695|
|               Other| 86|
|Hispanic/Latin/Me...| 77|
|             Unknown| 49|
|               Black| 43|
|         Other Asian| 22|
|             Chinese|  1|
|American Indian/A...|  1|
+--------------------+---+

+--------------------+----+
|      Victim Descent|   #|
+--------------------+----+
|Hispanic/Latin/Me...|3342|
|               Black|1127|
|               White| 428|
|               Other| 252|
|         Other Asian| 138|
|             Unknown|  30|
|American Indian/A...|  23|
|              Korean|   4|
|            Filipino|   3|
|             Chinese|   3|
|           Guamanian|   1|
|        Asian Indian|   1|
+--------------------+----+

## Ερώτημα 5

In [69]:
from sedona.spark import *
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, count, avg, expr, first, desc
from sedona.register import SedonaRegistrator
from sedona.sql.types import GeometryType
import time

# Initialize Spark session and Sedona context
spark = SparkSession.builder \
    .appName("CrimeDataAnalysis") \
    .config("spark.executor.instances", "8") \
    .config("spark.executor.cores", "1") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()
sedona = SedonaContext.create(spark)
SedonaRegistrator.registerAll(spark)

# Start timing
start_time = time.time()

# Load police stations data
police_stations_path = "s3://initial-notebook-data-bucket-dblab-905418150721/LA_Police_Stations.csv"
police_stations_df = spark.read.csv(
    police_stations_path,
    header=True,
    inferSchema=True
)

# Load and process the crime data for 2010s
crime_data_10s = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2010_to_2019_20241101.csv",
    header=True,
    inferSchema=True
)

# Load and process the crime data for 2020s
crime_data_20s = spark.read.csv(
    "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/Crime_Data_from_2020_to_Present_20241101.csv",
    header=True,
    inferSchema=True
)

# Filter for the required description and select necessary columns
useful_10s = crime_data_10s.select(
    col("DR_NO").alias("Crime ID"),
    col("AREA ").alias("Area"),
    col("AREA NAME").alias("Area Name"),
    col("LAT").alias("Latitude"),
    col("LON").alias("Longtitude")
)

useful_20s = crime_data_20s.select(
    col("DR_NO").alias("Crime ID"),
    col("AREA").alias("Area"),
    col("AREA NAME").alias("Area Name"),
    col("LAT").alias("Latitude"),
    col("LON").alias("Longtitude")
)

# Combine the two DataFrames
combined_useful = useful_10s.union(useful_20s)

# Filter out Null Island records
filtered_crimes = combined_useful.filter(
    (col("Latitude") != 0) & (col("Longtitude") != 0)
)

# Create geospatial points for crimes
crime_points = filtered_crimes.withColumn(
    "crime_geom", ST_Point(col("Longtitude"), col("Latitude"))
)

# Create geospatial points for police stations
station_points = police_stations_df.withColumn(
    "station_geom", ST_Point(col("X"), col("Y"))
)

# Perform a Cartesian join to calculate distances between each crime and police station

# Earth's radius in kilometers
EARTH_RADIUS_KM = 6371.0
# Perform a Cartesian join to calculate distances in kilometers
crime_station_distances = crime_points.crossJoin(station_points).withColumn(
    "distance", 
    ST_Distance(col("crime_geom"), col("station_geom")) * (3.141592653589793 / 180) * EARTH_RADIUS_KM
)
#crime_station_distances.show()

# Find the minimum distance for each crime and keep Area and Area Name
min_distances = crime_station_distances.groupBy("Crime ID").agg(
    expr("min(distance)").alias("min_distance"),
    first("Area").alias("Closest Area"),
    first("Area Name").alias("Closest Area Name")
)

min_distances = min_distances.orderBy("Closest Area")
# Show the result
#min_distances.show()
#min_distances.count()

# Step 1: Count of tuples for each closest area name
area_counts = min_distances.groupBy("Closest Area Name").agg(
    count("Crime ID").alias("Number of Closest Crimes")
)
#area_counts.show(50)

# Step 2: Median distance for each closest area name
area_median_distances = min_distances.groupBy("Closest Area Name").agg(
    expr("percentile_approx(min_distance, 0.5)").alias("Median Distance")
)

# Step 3: Join counts and medians, and order by Number of Closest Crimes in descending order
final_table = area_counts.join(
    area_median_distances,
    on="Closest Area Name"
).orderBy(desc("Number of Closest Crimes"))

final_table.show(21)

# End timing
end_time = time.time()
print(f"Execution time: {end_time - start_time} seconds")

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+-----------------+------------------------+------------------+
|Closest Area Name|Number of Closest Crimes|   Median Distance|
+-----------------+------------------------+------------------+
|      77th Street|                  206784| 2.292726866601563|
|        Southwest|                  192226|2.3723678063581852|
|          Pacific|                  170903| 4.342278455155561|
|          Central|                  166698| 1.093428009365451|
|      N Hollywood|                  164532|2.6969736366261494|
|        Southeast|                  161051|1.8476966788287785|
|        Hollywood|                  150663| 1.439317682870132|
|           Newton|                  148757|1.7463468819264603|
|          Olympic|                  144962|1.9791002660074106|
|          Mission|                  143600| 4.228196896970822|
|        Northeast|                  142732| 3.972583911782645|
|         Van Nuys|                  142194|2.5715909394184853|
|          Topanga|                  138