In [1]:
# Import necessary libraries
import time
from pyspark.sql import SparkSession

# Create Spark session
spark = SparkSession.builder.appName("Query_2").getOrCreate()

# Load crime data from S3
crime_data_path = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/"





Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,User,Current session?
1831,application_1732639283265_1792,pyspark,idle,Link,Link,,✔


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

SparkSession available as 'spark'.


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [9]:
from pyspark.sql.functions import count, year, desc, to_timestamp, col
from pyspark.sql.window import Window
from pyspark.sql import functions as F
import time  # Importing time for tracking execution duration

# Start timer to measure execution time
start_time_df = time.time()

# Load the crime data into a Spark DataFrame
crime_df = spark.read.csv(crime_data_path, header=True, inferSchema=True)

# Filter out rows with invalid coordinates (LAT = 0 or LON = 0)
crime_df = crime_df.filter((col("LAT") != 0) & (col("LON") != 0))

# Convert the "DATE OCC" column to a timestamp and extract the year into a new column 'YEAR'
crime_df = crime_df.withColumn('YEAR', to_timestamp(crime_df['DATE OCC'], 'MM/dd/yyyy hh:mm:ss a'))

# Filter the dataset to include only closed cases (not under investigation or unknown)
# Create new columns 'year' and 'precinct' for grouping
# Group by year and precinct and calculate the count of closed cases
closed_cases_df = crime_df.filter((crime_df['Status Desc'] != 'Invest Cont') & (crime_df['Status Desc'] != 'UNK')) \
    .withColumn('year', year(crime_df['YEAR'])) \
    .withColumn('precinct', crime_df['AREA NAME']) \
    .groupBy('year', 'precinct') \
    .agg(count('*').alias('closed_cases'))

# Group the dataset by year and precinct and calculate the total number of cases
total_cases_per_year_df = crime_df.withColumn('year', year(crime_df['YEAR'])) \
    .withColumn('precinct', crime_df['AREA NAME']) \
    .groupBy('year', 'precinct') \
    .agg(count('*').alias('total_cases'))

# Join the closed cases DataFrame with the total cases DataFrame on year and precinct
joined_df = closed_cases_df.join(total_cases_per_year_df, on=['year', 'precinct'])

# Calculate the closed case rate for each year and precinct
joined_df = joined_df.withColumn('closed_case_rate', col('closed_cases') / col('total_cases'))

# Define a window specification to rank precincts by closed case rate within each year
window_spec = Window.partitionBy('year').orderBy(F.desc('closed_case_rate'))

# Add a rank column to the DataFrame based on the closed case rate
ranked_df = joined_df.withColumn('#', F.rank().over(window_spec))

# Filter the DataFrame to include only the top 3 precincts by closed case rate for each year
# Select relevant columns for the final output
top_3_departments_df = ranked_df.filter(ranked_df['#'] <= 3).select('year', 'precinct', 'closed_case_rate', '#')

# Display the top 3 precincts for each year
top_3_departments_df.show()

# Calculate and print the execution time for the DataFrame API operations
execution_time_df = time.time() - start_time_df
print(f"Execution time for DataFrame API: {execution_time_df} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+-------------------+---+
|year|   precinct|   closed_case_rate|  #|
+----+-----------+-------------------+---+
|2010|    Rampart|    0.3285090742017|  1|
|2010|    Olympic| 0.3151528982199909|  2|
|2010|     Harbor| 0.2936028339237341|  3|
|2011|    Olympic| 0.3503192688118192|  1|
|2011|    Rampart|0.32500296103280824|  2|
|2011|     Harbor|0.28516260162601625|  3|
|2012|    Olympic|0.34295435879385194|  1|
|2012|    Rampart|  0.324610374505699|  2|
|2012|     Harbor| 0.2953483432455395|  3|
|2013|    Olympic| 0.3358217940999398|  1|
|2013|    Rampart|0.32106038291605304|  2|
|2013|     Harbor| 0.2970696405267529|  3|
|2014|   Van Nuys|0.32002956393200294|  1|
|2014|West Valley| 0.3151271079788573|  2|
|2014|    Mission| 0.3121740874448456|  3|
|2015|   Van Nuys|0.32265140677157844|  1|
|2015|    Mission|0.30466622852314335|  2|
|2015|   Foothill| 0.3035300180365885|  3|
|2016|   Van Nuys|  0.321880650994575|  1|
|2016|West Valley|0.31404702970297027|  2|
+----+-----

In [10]:
from pyspark.sql import SparkSession
import time
from pyspark.sql.functions import count, year, desc, to_timestamp, col
from pyspark.sql.window import Window
from pyspark.sql import functions as F

# Start a timer to measure execution time
start_time_sql = time.time()

# Load crime data from the specified path into a DataFrame
crime1 = spark.read.csv(crime_data_path, header=True, inferSchema=True)

# Filter out invalid geographical entries (where LAT or LON is 0)
crime = crime1.filter((col("LAT") != 0) & (col("LON") != 0))

# Convert the 'DATE OCC' column to a timestamp and extract the 'YEAR' field
crime = crime.withColumn('YEAR', to_timestamp(crime['DATE OCC'], 'MM/dd/yyyy hh:mm:ss a'))

# Register the DataFrame as a temporary SQL table for query execution
crime.createOrReplaceTempView('crime_data')

# SQL Query: Analyze the top 3 precincts per year based on closed case rates
sql_query = """
WITH aggregated_data AS (
    SELECT 
        YEAR(YEAR) AS year,  -- Extract the year from the timestamp
        `AREA NAME` AS precinct,  -- Use precinct name
        SUM(CASE WHEN `Status Desc` NOT IN ('Invest Cont', 'UNK') THEN 1 ELSE 0 END) AS closed_cases, -- Count closed cases
        COUNT(*) AS total_cases  -- Count total cases
    FROM crime_data
    GROUP BY YEAR(YEAR), `AREA NAME`  -- Group data by year and precinct
),
ranked_data AS (
    SELECT 
        year, 
        precinct, 
        closed_cases * 1.0 / total_cases AS closed_case_rate,  -- Calculate closure rate
        RANK() OVER (PARTITION BY year ORDER BY closed_cases * 1.0 / total_cases DESC) AS rank -- Rank precincts by closure rate
    FROM aggregated_data
)
SELECT 
    year, 
    precinct, 
    closed_case_rate, 
    rank AS `#`
FROM ranked_data
WHERE rank <= 3 -- Filter top 3 precincts per year
ORDER BY year, `#`; -- Sort by year and rank
"""

# Execute the SQL query and store the results in a DataFrame
top_3_departments_sql = spark.sql(sql_query)

# Display the top 3 precincts per year along with their closure rate and rank
top_3_departments_sql.show()

# Measure execution time for the entire process
execution_time_sql = time.time() - start_time_sql
print(f"Execution time for SQL API: {execution_time_sql} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+------------------+---+
|year|   precinct|  closed_case_rate|  #|
+----+-----------+------------------+---+
|2010|    Rampart|0.3285090742017000|  1|
|2010|    Olympic|0.3151528982199909|  2|
|2010|     Harbor|0.2936028339237341|  3|
|2011|    Olympic|0.3503192688118192|  1|
|2011|    Rampart|0.3250029610328082|  2|
|2011|     Harbor|0.2851626016260163|  3|
|2012|    Olympic|0.3429543587938519|  1|
|2012|    Rampart|0.3246103745056990|  2|
|2012|     Harbor|0.2953483432455395|  3|
|2013|    Olympic|0.3358217940999398|  1|
|2013|    Rampart|0.3210603829160530|  2|
|2013|     Harbor|0.2970696405267529|  3|
|2014|   Van Nuys|0.3200295639320030|  1|
|2014|West Valley|0.3151271079788573|  2|
|2014|    Mission|0.3121740874448456|  3|
|2015|   Van Nuys|0.3226514067715784|  1|
|2015|    Mission|0.3046662285231434|  2|
|2015|   Foothill|0.3035300180365885|  3|
|2016|   Van Nuys|0.3218806509945750|  1|
|2016|West Valley|0.3140470297029703|  2|
+----+-----------+----------------

In [11]:
s3_bucket = "s3://groups-bucket-dblab-905418150721/group19/datasheet.parquet"

crime_data_path = "s3://initial-notebook-data-bucket-dblab-905418150721/CrimeData/"
crime_df = spark.read.csv(crime_data_path, header=True, inferSchema=True)

# Coalesce the data into a single partition to create a single .parquet file
single_file_df = crime_df.coalesce(1)

# Save the DataFrame as a Parquet file to the specified S3 bucket
single_file_df.write.mode("overwrite").parquet(s3_bucket)

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [12]:
from pyspark.sql.functions import count, year, desc, to_timestamp, col
from pyspark.sql.window import Window
from pyspark.sql import functions as F
import time  # Importing the time module to track execution duration

# Start timer to measure execution time
start_time_df = time.time()

# Convert the 'DATE OCC' column to a timestamp format and add a new column 'YEAR'
single_file_df = single_file_df.withColumn('YEAR', to_timestamp(single_file_df['DATE OCC'], 'MM/dd/yyyy hh:mm:ss a'))

# Filter out records with invalid latitude and longitude (assumed as (LAT = 0) or (LON = 0))
single_file_df = single_file_df.filter((col("LAT") != 0) & (col("LON") != 0))

# Filter to include only closed cases (excluding "Invest Cont" and "UNK"), then calculate the count of closed cases
# Group by year and precinct, and create new columns 'year' and 'precinct'
closed_cases_df = single_file_df.filter((single_file_df['Status Desc'] != 'Invest Cont') & 
                                        (single_file_df['Status Desc'] != 'UNK')) \
    .withColumn('year', year(single_file_df['YEAR'])) \
    .withColumn('precinct', single_file_df['AREA NAME']) \
    .groupBy('year', 'precinct') \
    .agg(count('*').alias('closed_cases'))

# Calculate the total cases per year and precinct
total_cases_per_year_df = single_file_df.withColumn('year', year(single_file_df['YEAR'])) \
    .withColumn('precinct', single_file_df['AREA NAME']) \
    .groupBy('year', 'precinct') \
    .agg(count('*').alias('total_cases'))

# Join the DataFrame of closed cases with the total cases DataFrame on 'year' and 'precinct'
joined_df = closed_cases_df.join(total_cases_per_year_df, on=['year', 'precinct'])

# Add a new column for the closed case rate (ratio of closed cases to total cases)
joined_df = joined_df.withColumn('closed_case_rate', col('closed_cases') / col('total_cases'))

# Define a window specification to rank precincts by their closed case rate for each year
window_spec = Window.partitionBy('year').orderBy(F.desc('closed_case_rate'))

# Add a rank column to the DataFrame based on the closed case rate
ranked_df = joined_df.withColumn('#', F.rank().over(window_spec))

# Filter the DataFrame to include only the top 3 precincts for each year
# Select the relevant columns for the final output
top_3_departments_df = ranked_df.filter(ranked_df['#'] <= 3).select('year', 'precinct', 'closed_case_rate', '#')

# Display the top 3 precincts for each year along with their closed case rates
top_3_departments_df.show()

# Calculate and print the total execution time for the DataFrame API operations
execution_time_df = time.time() - start_time_df
print(f"Execution time for DataFrame API: {execution_time_df} seconds")


FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

+----+-----------+-------------------+---+
|year|   precinct|   closed_case_rate|  #|
+----+-----------+-------------------+---+
|2010|    Rampart|    0.3285090742017|  1|
|2010|    Olympic| 0.3151528982199909|  2|
|2010|     Harbor| 0.2936028339237341|  3|
|2011|    Olympic| 0.3503192688118192|  1|
|2011|    Rampart|0.32500296103280824|  2|
|2011|     Harbor|0.28516260162601625|  3|
|2012|    Olympic|0.34295435879385194|  1|
|2012|    Rampart|  0.324610374505699|  2|
|2012|     Harbor| 0.2953483432455395|  3|
|2013|    Olympic| 0.3358217940999398|  1|
|2013|    Rampart|0.32106038291605304|  2|
|2013|     Harbor| 0.2970696405267529|  3|
|2014|   Van Nuys|0.32002956393200294|  1|
|2014|West Valley| 0.3151271079788573|  2|
|2014|    Mission| 0.3121740874448456|  3|
|2015|   Van Nuys|0.32265140677157844|  1|
|2015|    Mission|0.30466622852314335|  2|
|2015|   Foothill| 0.3035300180365885|  3|
|2016|   Van Nuys|  0.321880650994575|  1|
|2016|West Valley|0.31404702970297027|  2|
+----+-----