####  Set up and start your interactive session


In [4]:
%idle_timeout 2880
%glue_version 3.0
%worker_type G.1X
%number_of_workers 5

import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from pyspark.sql import functions as F
from awsglue.context import GlueContext
from awsglue.job import Job
  
sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

You are already connected to a glueetl session 65479d7d-faf3-46a6-8ca4-62a3767725a7.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Current idle_timeout is 2800 minutes.
idle_timeout has been set to 2880 minutes.


You are already connected to a glueetl session 65479d7d-faf3-46a6-8ca4-62a3767725a7.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Setting Glue version to: 3.0


You are already connected to a glueetl session 65479d7d-faf3-46a6-8ca4-62a3767725a7.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous worker type: G.1X
Setting new worker type to: G.1X


You are already connected to a glueetl session 65479d7d-faf3-46a6-8ca4-62a3767725a7.

No change will be made to the current session that is set as glueetl. The session configuration change will apply to newly created sessions.


Previous number of workers: 5
Setting new number of workers to: 5



#### Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema


In [3]:
dyf = glueContext.create_dynamic_frame.from_catalog(database='chicago_crime_cleanet_parquet_database', table_name='chicago_crime_cleaned_parquet', useCatalogSchema=True)
dyf.printSchema()

root
|-- id: long
|-- case_number: string
|-- date: timestamp
|-- block: string
|-- iucr: string
|-- primary_type: string
|-- description: string
|-- location_description: string
|-- arrest: boolean
|-- domestic: boolean
|-- beat: long
|-- district: long
|-- ward: double
|-- community_area: double
|-- fbi_code: string
|-- x_coordinate: double
|-- y_coordinate: double
|-- year: long
|-- updated_on: timestamp
|-- latitude: double
|-- longitude: double
|-- location: string
|-- __index_level_0__: long


#### Convert the DynamicFrame to a Spark DataFrame


In [4]:
df = dyf.toDF()




#### Filter, Aggregate and rank crimes per week per community_area

In [5]:
from pyspark.sql.window import Window
from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType

# Assuming 'date' is the column that contains the date information
df_filtered = df.filter((F.col('date') > '2019-12-31') & (F.col('date') < '2023-01-01'))

# Find the last date in the DataFrame
last_date = df_filtered.agg(F.max("date")).collect()[0][0]

# Create a new column with the day difference between each date and the last date
df_filtered = df_filtered.withColumn("days_from_last", F.datediff(F.lit(last_date), "date"))

# Calculate the new window based on days from last
df_filtered = df_filtered.withColumn("window_start_days_from_last", (F.floor(df_filtered.days_from_last / 7) * 7).cast(IntegerType()))
df_filtered = df_filtered.withColumn("window_end_days_from_last", (F.floor(df_filtered.days_from_last / 7) * 7 + 6).cast(IntegerType()))

# Calculate window start and end dates
df_filtered = df_filtered.withColumn("window_end", F.expr(f"date_add(to_date('{last_date}'), -window_start_days_from_last)"))
df_filtered = df_filtered.withColumn("window_start", F.expr(f"date_add(to_date('{last_date}'), -window_end_days_from_last)"))

# Now proceed with 7-day window aggregation
df_aggregated = df_filtered.groupBy("window_start", "window_end", "community_area") \
                           .agg(F.count("*").alias("num_of_crimes"))

# Create a window specification
windowSpec = Window.partitionBy("window_start", "window_end").orderBy(F.desc("num_of_crimes"))

# Use the rank function over the window specification
df_ranked = df_aggregated.withColumn("rank", F.rank().over(windowSpec))

# Show the DataFrame
df_ranked.sort("window_end", ascending=False).show()


+------------+----------+--------------+-------------+----+
|window_start|window_end|community_area|num_of_crimes|rank|
+------------+----------+--------------+-------------+----+
|  2022-12-25|2022-12-31|          19.0|           64|  19|
|  2022-12-25|2022-12-31|          23.0|           83|  10|
|  2022-12-25|2022-12-31|           6.0|           67|  18|
|  2022-12-25|2022-12-31|          71.0|           93|   6|
|  2022-12-25|2022-12-31|          49.0|           83|  10|
|  2022-12-25|2022-12-31|          66.0|           76|  14|
|  2022-12-25|2022-12-31|          67.0|           71|  17|
|  2022-12-25|2022-12-31|           8.0|          149|   2|
|  2022-12-25|2022-12-31|          44.0|           96|   5|
|  2022-12-25|2022-12-31|          46.0|           88|   8|
|  2022-12-25|2022-12-31|          68.0|           84|   9|
|  2022-12-25|2022-12-31|          69.0|           81|  12|
|  2022-12-25|2022-12-31|          29.0|           78|  13|
|  2022-12-25|2022-12-31|          24.0|

In [6]:
df_ranked = df_ranked.withColumn("community_area", F.col("community_area").cast("int"))

df_ranked.sort(df_ranked.window_start.desc()).show()

+------------+----------+--------------+-------------+----+
|window_start|window_end|community_area|num_of_crimes|rank|
+------------+----------+--------------+-------------+----+
|  2022-12-25|2022-12-31|            19|           64|  19|
|  2022-12-25|2022-12-31|            49|           83|  10|
|  2022-12-25|2022-12-31|             6|           67|  18|
|  2022-12-25|2022-12-31|            71|           93|   6|
|  2022-12-25|2022-12-31|            23|           83|  10|
|  2022-12-25|2022-12-31|            66|           76|  14|
|  2022-12-25|2022-12-31|            67|           71|  17|
|  2022-12-25|2022-12-31|             8|          149|   2|
|  2022-12-25|2022-12-31|            44|           96|   5|
|  2022-12-25|2022-12-31|            46|           88|   8|
|  2022-12-25|2022-12-31|            68|           84|   9|
|  2022-12-25|2022-12-31|            69|           81|  12|
|  2022-12-25|2022-12-31|            29|           78|  13|
|  2022-12-25|2022-12-31|            24|

In [7]:
df_ranked.printSchema()

root
 |-- window_start: date (nullable = true)
 |-- window_end: date (nullable = true)
 |-- community_area: integer (nullable = true)
 |-- num_of_crimes: long (nullable = false)
 |-- rank: integer (nullable = true)


#### Write the data in the DynamicFrame to S3

In [2]:
import boto3

def delete_objects_with_prefix(bucket_name, prefix):
    s3 = boto3.client('s3')
    
    # Initialize variables for pagination
    continuation_token = None
    delete_keys = {'Objects': []}
    
    while True:
        # List objects with pagination support
        list_kwargs = {'Bucket': bucket_name, 'Prefix': prefix}
        if continuation_token:
            list_kwargs['ContinuationToken'] = continuation_token
            
        objects_to_delete = s3.list_objects_v2(**list_kwargs)
        
        # Prepare the list of objects to delete
        for obj in objects_to_delete.get('Contents', []):
            delete_keys['Objects'].append({'Key': obj['Key']})
        
        # Delete the objects
        if delete_keys['Objects']:
            s3.delete_objects(Bucket=bucket_name, Delete=delete_keys)
            delete_keys = {'Objects': []}  # Reset the delete_keys list
        
        # Check for more objects to list (pagination)
        if objects_to_delete.get('IsTruncated'):
            continuation_token = objects_to_delete['NextContinuationToken']
        else:
            break

# Usage
bucket_name = 'chicagocrimedata-chalenge'
folder_path = 'data/chicago_crime_rank_database/chicago_crime_rank_parquet/'

delete_objects_with_prefix(bucket_name, folder_path)




In [7]:
from awsglue.dynamicframe import DynamicFrame

dyf = DynamicFrame.fromDF(df_ranked, glueContext, "chicago_crime_rank_dynamic_frame")

dyf.printSchema()

s3output = glueContext.getSink(
  path="s3://chicagocrimedata-chalenge/data/chicago_crime_rank_database/chicago_crime_rank_parquet/",
  connection_type="s3",
  updateBehavior="UPDATE_IN_DATABASE",
  partitionKeys=["community_area", "window_start"],
  compression="snappy",
  enableUpdateCatalog=True,
  transformation_ctx="s3output",
)
s3output.setCatalogInfo(
  catalogDatabase="chicago_crime_rank_parquet_database", catalogTableName="chicago_crime_rank"
)
s3output.setFormat("glueparquet")
s3output.writeFrame(dyf)

root
|-- community_area: double
|-- num_of_crimes: long
|-- rank: int
|-- window_start: timestamp
|-- window_end: timestamp

<awsglue.dynamicframe.DynamicFrame object at 0x7f4bf4530e50>
