In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job



sys.argv.extend(['--DATABASE', 'default', '--TABLE', 'm2045_mps_antisocial_behaviour_txt'])

params = ['DATABASE', 'TABLE']
if '--JOB_NAME' in sys.argv:
    params.append('JOB_NAME')
args = getResolvedOptions(sys.argv, params)

sc = SparkContext.getOrCreate()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)

if 'JOB_NAME' in args:
    jobname = args['JOB_NAME']
else:
    jobname = "police_data_job"
job.init(jobname, args)

#get logger for this glue job
logger = glueContext.get_logger()
logger.info(f"Job {jobname} started with args: {args}")



Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/10 19:41:43 WARN Job$: Job run ID police_data_job is either null or empty or its same as Job name. 
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


In [2]:
def read_csv_to_dyf(glueContext, database, table):
    """
    Reads a CSV file and returns a DynamicFrame.

    Args:
        glueContext (GlueContext): The Glue context object.
        database (str): DataCatalog databse name
        table (str): The name of the table in the DataCatalog.

    Returns:
        DynamicFrame: The DynamicFrame object representing the CSV data.
    """
    # Read the CSV file into a DynamicFrame
    dyf = glueContext.create_dynamic_frame.from_catalog(
        database=database,
        table_name=table)
    return dyf
dyf = read_csv_to_dyf(glueContext, args['DATABASE'], args['TABLE'])
dyf.printSchema()

ANTLR Tool version 4.3 used for code generation does not match the current runtime version 4.9.3
ANTLR Tool version 4.3 used for code generation does not match the current runtime version 4.9.3
[Stage 0:>                                                          (0 + 1) / 1]

root
|-- date: string
|-- hour: string
|-- op01: long
|-- opening_type_1: string
|-- op02: choice
|    |-- long
|    |-- string
|-- opening_type_2: string
|-- op03: choice
|    |-- long
|    |-- string
|-- opening_type_3: string
|-- cl01: choice
|    |-- long
|    |-- string
|-- close_type_1: string
|-- cl02: choice
|    |-- long
|    |-- string
|-- close_type_2: string
|-- cl03: string
|-- close_type_3: string
|-- resolution_type_1: string
|-- resolution_type_2: string
|-- ward: string
|-- ward_code: string
|-- response_time: long
|-- duplicate: string
|-- asbcount: long
|-- datetime: long
|-- ward_wardcode: string
|-- safer_neighborhood_team_name: string
|-- safer_neighborhood_team_code: string
|-- safer_neighborhood_team_borough_name: string
|-- safer_neighborhood_team_borough_code: string



                                                                                

In [3]:
dyf.show()

{"date": "2025-05-03", "hour": "20:00", "op01": 215, "opening_type_1": "ASB Nuisance", "op02": 202, "opening_type_2": "Rowdy Or Inconsiderate Behaviour", "op03": 612, "opening_type_3": "Mental Health", "cl01": 215, "close_type_1": "ASB Nuisance", "cl02": "   ", "close_type_2": "", "cl03": "   ", "close_type_3": "", "resolution_type_1": "Inform / Informed", "resolution_type_2": "", "ward": "EA06", "ward_code": "", "response_time": 0, "duplicate": "No", "asbcount": 1, "datetime": 20258, "ward_wardcode": "E05014058", "safer_neighborhood_team_name": "Chadwell Heath", "safer_neighborhood_team_code": "E05014058", "safer_neighborhood_team_borough_name": "Barking and Dagenham", "safer_neighborhood_team_borough_code": "KG"}
{"date": "2025-06-12", "hour": "13:00", "op01": 215, "opening_type_1": "ASB Nuisance", "op02": "   ", "opening_type_2": "", "op03": "   ", "opening_type_3": "", "cl01": 305, "close_type_1": "Civil Disputes", "cl02": "   ", "close_type_2": "", "cl03": "   ", "close_type_3": "

Example: Create a DynamicFrame from a table in the AWS Glue Data Catalog and display its schema¶

In [4]:
def resolve_schema(dyf, database, table):
    """
    Resolves the schema of a DynamicFrame to match the catalog.

    Args:
        dyf (DynamicFrame): The DynamicFrame to resolve.

    Returns:
        DynamicFrame: The resolved DynamicFrame.
    """
    resolved_dyf = ResolveChoice.apply(
    frame=dyf,
    choice="match_catalog",
    database=database,
    table_name=table
    )
    return resolved_dyf
dyf = resolve_schema(dyf, args['DATABASE'], args['TABLE'])
dyf.printSchema()

root
|-- date: string
|-- hour: string
|-- op01: long
|-- opening_type_1: string
|-- op02: long
|-- opening_type_2: string
|-- op03: long
|-- opening_type_3: string
|-- cl01: long
|-- close_type_1: string
|-- cl02: long
|-- close_type_2: string
|-- cl03: string
|-- close_type_3: string
|-- resolution_type_1: string
|-- resolution_type_2: string
|-- ward: string
|-- ward_code: string
|-- response_time: long
|-- duplicate: string
|-- asbcount: long
|-- datetime: long
|-- ward_wardcode: string
|-- safer_neighborhood_team_name: string
|-- safer_neighborhood_team_code: string
|-- safer_neighborhood_team_borough_name: string
|-- safer_neighborhood_team_borough_code: string



In [5]:
dyf.show()

{"date": "2025-05-03", "hour": "20:00", "op01": 215, "opening_type_1": "ASB Nuisance", "opening_type_2": "Rowdy Or Inconsiderate Behaviour", "opening_type_3": "Mental Health", "close_type_1": "ASB Nuisance", "close_type_2": "", "cl03": "   ", "close_type_3": "", "resolution_type_1": "Inform / Informed", "resolution_type_2": "", "ward": "EA06", "ward_code": "", "response_time": 0, "duplicate": "No", "asbcount": 1, "datetime": 20258, "ward_wardcode": "E05014058", "safer_neighborhood_team_name": "Chadwell Heath", "safer_neighborhood_team_code": "E05014058", "safer_neighborhood_team_borough_name": "Barking and Dagenham", "safer_neighborhood_team_borough_code": "KG", "op02": 202, "op03": 612, "cl01": 215}
{"date": "2025-06-12", "hour": "13:00", "op01": 215, "opening_type_1": "ASB Nuisance", "opening_type_2": "", "opening_type_3": "", "close_type_1": "Civil Disputes", "close_type_2": "", "cl03": "   ", "close_type_3": "", "resolution_type_1": "Inform / Informed", "resolution_type_2": "", "wa

Example: Convert the DynamicFrame to a Spark DataFrame and display a sample of the data

In [6]:
df = dyf.toDF()
df.show()

25/08/10 19:42:01 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+----------+-----+----+--------------+----+--------------------+----+--------------------+----+--------------------+----+------------+----+------------+--------------------+--------------------+----+---------+-------------+---------+--------+--------+-------------+----------------------------+----------------------------+------------------------------------+------------------------------------+
|      date| hour|op01|opening_type_1|op02|      opening_type_2|op03|      opening_type_3|cl01|        close_type_1|cl02|close_type_2|cl03|close_type_3|   resolution_type_1|   resolution_type_2|ward|ward_code|response_time|duplicate|asbcount|datetime|ward_wardcode|safer_neighborhood_team_name|safer_neighborhood_team_code|safer_neighborhood_team_borough_name|safer_neighborhood_team_borough_code|
+----------+-----+----+--------------+----+--------------------+----+--------------------+----+--------------------+----+------------+----+------------+--------------------+--------------------+----+-----

In [7]:
col_to_drop = [
        'op01',
        'op02',
        'op03',
        'cl01',
        'cl02',
        'cl03',
        'close_type_3']
def clean_data(df, drop):
    """
    Cleans the data by dropping col from drop.

    Args:
        df (DataFrame): The DataFrame to be cleaned.

    Returns:
        DataFrame: The cleaned DataFrame.
    """
    df = df.drop(*drop)

    return df
df = clean_data(df, col_to_drop)
df.show(10, False)

+----------+-----+--------------+--------------------------------+--------------------------------+------------------------+------------+-----------------------+------------------------------+----+---------+-------------+---------+--------+--------+-------------+----------------------------+----------------------------+------------------------------------+------------------------------------+
|date      |hour |opening_type_1|opening_type_2                  |opening_type_3                  |close_type_1            |close_type_2|resolution_type_1      |resolution_type_2             |ward|ward_code|response_time|duplicate|asbcount|datetime|ward_wardcode|safer_neighborhood_team_name|safer_neighborhood_team_code|safer_neighborhood_team_borough_name|safer_neighborhood_team_borough_code|
+----------+-----+--------------+--------------------------------+--------------------------------+------------------------+------------+-----------------------+------------------------------+----+---------+-

Example: Write the data in the DynamicFrame to a location in Amazon S3 and a table for it in the AWS Glue Data Catalog

In [8]:
from awsglue.dynamicframe import DynamicFrame
cleaned_data = DynamicFrame.fromDF(df, glueContext, "cleaned_data")
cleaned_data.show()

{"date": "2025-05-03", "hour": "20:00", "opening_type_1": "ASB Nuisance", "opening_type_2": "Rowdy Or Inconsiderate Behaviour", "opening_type_3": "Mental Health", "close_type_1": "ASB Nuisance", "close_type_2": "", "resolution_type_1": "Inform / Informed", "resolution_type_2": "", "ward": "EA06", "ward_code": "", "response_time": 0, "duplicate": "No", "asbcount": 1, "datetime": 20258, "ward_wardcode": "E05014058", "safer_neighborhood_team_name": "Chadwell Heath", "safer_neighborhood_team_code": "E05014058", "safer_neighborhood_team_borough_name": "Barking and Dagenham", "safer_neighborhood_team_borough_code": "KG"}
{"date": "2025-06-12", "hour": "13:00", "opening_type_1": "ASB Nuisance", "opening_type_2": "", "opening_type_3": "", "close_type_1": "Civil Disputes", "close_type_2": "", "resolution_type_1": "Inform / Informed", "resolution_type_2": "", "ward": "SW61", "ward_code": "", "response_time": 0, "duplicate": "No", "asbcount": 1, "datetime": 20258, "ward_wardcode": "E05014012", "s

In [None]:
s3_bucket_path = 's3://uros75-police-data/cleaned/'
def write_data(glueContext, dyf, database, table):
    """
    Writes the DynamicFrame to S3 and updates the DataCatalog.

    Args:
        glueContext (GlueContext): The Glue context object.
        dyf (DynamicFrame): The DynamicFrame to write.
        database (str): The name of the DataCatalog database.
        table (str): The name of the table in the DataCatalog.
    """
    s3output = glueContext.getSink(
    path=s3_bucket_path,
    connection_type="s3",
    updateBehavior="UPDATE_IN_DATABASE",
    partitionKeys=["date"],
    compression="snappy",
    enableUpdateCatalog=True,
    transformation_ctx="s3output",
    )
    s3output.setCatalogInfo(
    catalogDatabase=database, catalogTableName=f'{table}_cleaned'
    )
    s3output.setFormat("glueparquet")
    s3output.writeFrame(dyf)
write_data(glueContext, cleaned_data, args['DATABASE'], args['TABLE'])


[Stage 6:>                                                          (0 + 1) / 1]