In [1]:
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
from awsglue.dynamicframe import DynamicFrame
import boto3
from pyspark.sql import functions as F
from pyspark.sql.types import FloatType, IntegerType
from pyspark.sql import DataFrame

In [2]:
def get_files_from_s3(bucket_name, s3, logger=None):
    """Retrieve a list of files from an S3 bucket.
    Args:
        bucket_name (str): The name of the S3 bucket.
        s3 (boto3.client): The S3 client to use for accessing the bucket.
        logger (Logger, optional): Logger for logging messages. Defaults to None.
    Returns:
        list: List of file names in the S3 bucket."""
    try:
        # List objects in the specified S3 bucket
        response = s3.list_objects_v2(Bucket=bucket_name)
    except Exception as e:
        if logger:
            logger.error(f"Error accessing bucket {bucket_name}: {e}")
        else:
            # If no logger is provided, print the error
            print(f"Error accessing bucket {bucket_name}: {e}")
        return []
    files = []
    if 'Contents' in response:
        for item in response['Contents']:
            files.append(item['Key'])
    return files

def partition_files_by_extension(files):
    """Partition a list of files by their extension.
    Args:
        files (list): List of file names.
    Returns:
        tuple: Two lists, one for CSV files and one for XLSX files.
    """
    csv_files = []
    xlsx_files = []
    for file in files:
        if file.endswith('.csv'):
            csv_files.append(file)
        elif file.endswith('.xlsx'):
            xlsx_files.append(file)
    return csv_files, xlsx_files

def load_to_dyf(files, glueContext, s3_bucket, logger=None):
    """Load files into a DynamicFrame.
    Args:
        files (list): List of file names to load.
        glueContext (GlueContext): The Glue context for creating DynamicFrames.
        logger (Logger, optional): Logger for logging messages. Defaults to None.
    Returns:
        list: List of DynamicFrames created from the files.
    Raises:
        ValueError: If no files are loaded into DynamicFrames.
    """
    dyf_list = []
    for file in files:
        try:
            # print(glueContext,'<--->')
            dyf = glueContext.create_dynamic_frame.from_options(
                connection_type="s3",
                format="csv",
                connection_options={"paths": [f"s3://{s3_bucket}/{file}"]},
                format_options={"withHeader": True}
            )
            dyf_list.append(dyf)
            logger.info(f"Loaded file {file} into DynamicFrame")
        except Exception as e:
            if logger:
                logger.error(f"Error loading file {file}: {e}")
            else:
                print(f"Error loading file {file}: {e}")

    if not dyf_list:
        if logger:
            logger.error("No files were loaded into DynamicFrames.")
        else:
            print("No files were loaded into DynamicFrames.")
        raise ValueError("No files were loaded into DynamicFrames.")
    return dyf_list

def consistent_schema(dyf_list):
    """Check if all DynamicFrames in the list have the same schema.
    Args:
        dyf_list (list): List of DynamicFrames to check.
    Returns:
        bool: True if all DynamicFrames have the same schema, False otherwise.
    """
    if not dyf_list:
        return True  # Empty list is considered consistent
    first_schema = dyf_list[0].schema()
    for dyf in dyf_list[1:]:
        if dyf.schema() != first_schema:
            return False
    return True

In [3]:
import sys
from dotenv import load_dotenv
import os
load_dotenv()
BUCKET_NAME = os.getenv('BUCKET_NAME')
sys.argv.extend(["--BUCKET_NAME",f"{BUCKET_NAME}"])
print(sys.argv)

if __name__ == "__main__":
    params = ['BUCKET_NAME']
    if '--JOB_NAME' in sys.argv:
        params.append('JOB_NAME')
    args = getResolvedOptions(sys.argv, params)

    sc = SparkContext.getOrCreate()
    glueContext = GlueContext(sc)
    spark = glueContext.spark_session
    job = Job(glueContext)

    if 'JOB_NAME' in args:
        jobname = args['JOB_NAME']
    else:
        jobname = "police_data_job"
    job.init(jobname, args)

    #get logger for this glue job
    logger = glueContext.get_logger()
    logger.info(f"Job {jobname} started with args: {args}")

    # Get the S3 bucket name from the arguments
    s3_bucket = args['BUCKET_NAME']
    s3_client = boto3.client('s3')

    
    # Retrieve csv data files from the S3 bucket
    logger.info(f"Retrieving files from S3 bucket: s3://{s3_bucket}")
    files = get_files_from_s3(s3_bucket, s3_client, logger)
    csv_files, xlsx_files = partition_files_by_extension(files)

    #load CSV files into DynamicFrames
    dyf_list = load_to_dyf(csv_files, glueContext, s3_bucket, logger)
    logger.info(f"Loaded {len(dyf_list)} DynamicFrames from CSV files.")

['/usr/local/lib/python3.11/site-packages/ipykernel_launcher.py', '--f=/root/.local/share/jupyter/runtime/kernel-v337e15d0bef5fdb42276dde43a63b352e99a944d1.json', '--BUCKET_NAME', 'data-bucket-properties-5a166591']


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/08/14 19:26:20 WARN Job$: Job run ID police_data_job is either null or empty or its same as Job name. 
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.


In [5]:
df = dyf_list[0].toDF()



In [7]:
df.printSchema()

root
 |-- uprn: string (nullable = true)
 |-- os_topo_toid: string (nullable = true)
 |-- easting: string (nullable = true)
 |-- northing: string (nullable = true)
 |-- postcode_locator: string (nullable = true)
 |-- administrative_area: string (nullable = true)
 |-- oa21cd: string (nullable = true)
 |-- lsoa21cd: string (nullable = true)
 |-- lsoa21nm: string (nullable = true)
 |-- lsoa11cd: string (nullable = true)
 |-- lsoa11nm: string (nullable = true)
 |-- ward22cd: string (nullable = true)
 |-- ward22nm: string (nullable = true)
 |-- property_type: string (nullable = true)
 |-- built_form: string (nullable = true)
 |-- property_type_built_form: string (nullable = true)
 |-- tenure: string (nullable = true)
 |-- tenure_known: string (nullable = true)
 |-- building_use: string (nullable = true)
 |-- construction_age_band: string (nullable = true)
 |-- construction_age_band_known: string (nullable = true)
 |-- epc_score: string (nullable = true)
 |-- epc_score_known: string (nullabl

In [8]:
# Check for null values in specific column
null_count = df.filter(F.col('total_floor_area').isNull()).count()
print(f"Null values in column_name: {null_count}")


25/08/14 19:48:30 WARN SparkStringUtils: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.

Null values in column_name: 0


                                                                                

In [None]:
# Check for null values in specific column
null_count = df.filter(F.col('total_floor_area_known').isNull())
null_count.show()

[Stage 5:>                                                          (0 + 1) / 1]

+----+------------+-------+--------+----------------+-------------------+------+--------+--------+--------+--------+--------+--------+-------------+----------+------------------------+------+------------+------------+---------------------+---------------------------+---------+---------------+----------+----------------+-------------------+-------------------------+--------------------+--------------------------+----------------------+----------------------------+----------------+----------------------+---------------------+--------------+---------+---------------+---------------+---------------------+---------+---------------+---------------+---------------------+------------+------------------+--------------+--------------------+--------------+--------------------+------------------+------------------------+-------------+-----------------------+--------+---------------------+-------------------+---------------+----------+------------+------------------+---------------------+----------

                                                                                

In [None]:
df.select('total_floor_area', 'total_floor_area_known').filter(F.col('total_floor_area_known') == '0').show(20)

+----------------+----------------------+
|total_floor_area|total_floor_area_known|
+----------------+----------------------+
|             138|                     0|
|             131|                     0|
|             118|                     0|
|             147|                     0|
|             137|                     0|
|             226|                     0|
|             129|                     0|
|             110|                     0|
|             125|                     0|
|             126|                     0|
|              91|                     0|
|             180|                     0|
|             118|                     0|
|             121|                     0|
|             135|                     0|
|             126|                     0|
|             129|                     0|
|             107|                     0|
|             108|                     0|
|             128|                     0|
+----------------+----------------

                                                                                