# Health Violations 

This notebook contains an end-to-end implementation of a `PySpark` application in an AWS EMR cluster. It is mostly based in the [Getting started with Amazon EMR](https://docs.aws.amazon.com/emr/latest/ManagementGuide/emr-gs.html) tutorial.

## Plan and configure AWS EMR cluster

### Prepare storage

Create bucket

In [1]:
!aws s3api create-bucket \
--bucket emr20220705-bucket \
--region us-east-1

{
    "Location": "/emr20220705-bucket"
}


Prepare application with input data for EMR

In [2]:
%%writefile health_violations.py
import argparse

from pyspark.sql import SparkSession

def calculate_red_violations(data_source, output_uri):
    """
    Processes sample food establishment inspection data and queries the data to find the top 10 establishments
    with the most Red violations from 2006 to 2020.

    :param data_source: The URI of your food establishment data CSV, such as 's3://DOC-EXAMPLE-BUCKET/food-establishment-data.csv'.
    :param output_uri: The URI where output is written, such as 's3://DOC-EXAMPLE-BUCKET/restaurant_violation_results'.
    """
    with SparkSession.builder.appName("Calculate Red Health Violations").getOrCreate() as spark:
        # Load the restaurant violation CSV data
        if data_source is not None:
            restaurants_df = spark.read.option("header", "true").csv(data_source)

        # Create an in-memory DataFrame to query
        restaurants_df.createOrReplaceTempView("restaurant_violations")

        # Create a DataFrame of the top 10 restaurants with the most Red violations
        top_red_violation_restaurants = spark.sql("""SELECT name, count(*) AS total_red_violations 
          FROM restaurant_violations 
          WHERE violation_type = 'RED' 
          GROUP BY name 
          ORDER BY total_red_violations DESC LIMIT 10""")

        # Write the results to the specified output URI
        top_red_violation_restaurants.write.option("header", "true").mode("overwrite").csv(output_uri)

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument(
        '--data_source', help="The URI for you CSV restaurant data, like an S3 bucket location.")
    parser.add_argument(
        '--output_uri', help="The URI where output is saved, like an S3 bucket location.")
    args = parser.parse_args()

    calculate_red_violations(args.data_source, args.output_uri)


Overwriting health_violations.py


Load script to S3

In [11]:
!aws s3api put-object \
--bucket emr20220705-bucket \
--key health_violations.py \
--body health_violations.py

{
    "ETag": "\"d41d8cd98f00b204e9800998ecf8427e\""
}


Download and unzip data

In [4]:
!curl -O https://docs.aws.amazon.com/emr/latest/ManagementGuide/samples/food_establishment_data.zip

  % Total    % Received % Xferd  Average Speed   Time    Time     Time  Current
                                 Dload  Upload   Total   Spent    Left  Speed
100  567k  100  567k    0     0   649k      0 --:--:-- --:--:-- --:--:--  649k


In [5]:
!unzip food_establishment_data.zip

Archive:  food_establishment_data.zip
  inflating: food_establishment_data.csv  


Load data to S3

In [5]:
!aws s3api put-object \
--bucket emr20220705-bucket \
--key food_establishment_data.csv

{
    "ETag": "\"d41d8cd98f00b204e9800998ecf8427e\""
}


In [17]:
!aws s3api list-objects --bucket emr20220705-bucket | jq .Contents[]

[1;39m{
  [0m[34;1m"Key"[0m[1;39m: [0m[0;32m"food_establishment_data.csv"[0m[1;39m,
  [0m[34;1m"LastModified"[0m[1;39m: [0m[0;32m"2022-07-06T16:09:26+00:00"[0m[1;39m,
  [0m[34;1m"ETag"[0m[1;39m: [0m[0;32m"\"bac70373588490a8c1be0a71be46df85\""[0m[1;39m,
  [0m[34;1m"Size"[0m[1;39m: [0m[0;39m11879954[0m[1;39m,
  [0m[34;1m"StorageClass"[0m[1;39m: [0m[0;32m"STANDARD"[0m[1;39m,
  [0m[34;1m"Owner"[0m[1;39m: [0m[1;39m{
    [0m[34;1m"DisplayName"[0m[1;39m: [0m[0;32m"ramirocadavid"[0m[1;39m,
    [0m[34;1m"ID"[0m[1;39m: [0m[0;32m"473deab5a66936901bdb14e2e649f82810cd9ba06781c2e41076db8af13ba357"[0m[1;39m
  [1;39m}[0m[1;39m
[1;39m}[0m
[1;39m{
  [0m[34;1m"Key"[0m[1;39m: [0m[0;32m"health_violations.py"[0m[1;39m,
  [0m[34;1m"LastModified"[0m[1;39m: [0m[0;32m"2022-07-06T16:10:50+00:00"[0m[1;39m,
  [0m[34;1m"ETag"[0m[1;39m: [0m[0;32m"\"96241d93d4b3514f913993c12b72ae41\""[0m[1;39m,
  [0m[34;1m"Size"[0m[1;39m:

### Launch EMR cluster

Create IAM [default roles](https://docs.aws.amazon.com/cli/latest/reference/emr/create-default-roles.html) that will be used to create the cluster

In [9]:
!aws emr create-default-roles

[
    {
        "Role": {
            "Path": "/",
            "RoleName": "EMR_EC2_DefaultRole",
            "RoleId": "AROATCWRPAY4I7ULTDGBO",
            "Arn": "arn:aws:iam::211966428728:role/EMR_EC2_DefaultRole",
            "CreateDate": "2022-07-05T20:04:55+00:00",
            "AssumeRolePolicyDocument": {
                "Version": "2008-10-17",
                "Statement": [
                    {
                        "Sid": "",
                        "Effect": "Allow",
                        "Principal": {
                            "Service": "ec2.amazonaws.com"
                        },
                        "Action": "sts:AssumeRole"
                    }
                ]
            }
        },
        "RolePolicy": {
            "Version": "2012-10-17",
            "Statement": [
                {
                    "Effect": "Allow",
                    "Resource": "*",
                    "Action": [
                        "cloudwatch:*",
                  

Create Spark Cluster

In [18]:
%%bash
aws emr create-cluster \
--name "food-cluster" \
--release-label emr-5.36.0 \
--applications Name=Spark \
--ec2-attributes KeyName=ec2KeyPair \
--instance-type m5.xlarge \
--instance-count 3 \
--use-default-roles

{
    "ClusterId": "j-1ETWETHEL7UQQ",
    "ClusterArn": "arn:aws:elasticmapreduce:us-east-1:211966428728:cluster/j-1ETWETHEL7UQQ"
}


Check cluster status

In [21]:
!aws emr describe-cluster --cluster-id j-1ETWETHEL7UQQ

{
    "Cluster": {
        "Id": "j-1ETWETHEL7UQQ",
        "Name": "food-cluster",
        "Status": {
            "State": "WAITING",
            "StateChangeReason": {
                "Message": "Cluster ready to run steps."
            },
            "Timeline": {
                "CreationDateTime": "2022-07-06T11:12:45.882000-05:00",
                "ReadyDateTime": "2022-07-06T11:17:46.676000-05:00"
            }
        },
        "Ec2InstanceAttributes": {
            "Ec2KeyName": "ec2KeyPair",
            "RequestedEc2SubnetIds": [],
            "Ec2AvailabilityZone": "us-east-1a",
            "RequestedEc2AvailabilityZones": [],
            "IamInstanceProfile": "EMR_EC2_DefaultRole",
            "EmrManagedMasterSecurityGroup": "sg-0f721147b6edce673",
            "EmrManagedSlaveSecurityGroup": "sg-0d673ae7607b1191c"
        },
        "InstanceCollectionType": "INSTANCE_GROUP",
        "ReleaseLabel": "emr-5.36.0",
        "AutoTerminate": false,
        "TerminationProtec

## Submit work to cluster

Submit `PySpark` script as a step

In [22]:
!aws emr add-steps \
--cluster-id j-1ETWETHEL7UQQ \
--steps Type=Spark,Name="Spark Application Step",ActionOnFailure=CONTINUE,Args=[s3://emr20220705-bucket/health_violations.py,--data_source,s3://emr20220705-bucket/food_establishment_data.csv,--output_uri,s3://emr20220705-bucket/MyOutputFolder]

{
    "StepIds": [
        "s-1BI75YE4G7C2R"
    ]
}


Query status of step

In [31]:
!aws emr describe-step \
--cluster-id j-1ETWETHEL7UQQ \
--step-id s-1BI75YE4G7C2R

{
    "Step": {
        "Id": "s-1BI75YE4G7C2R",
        "Name": "Spark Application Step",
        "Config": {
            "Jar": "command-runner.jar",
            "Properties": {},
            "Args": [
                "spark-submit",
                "s3://emr20220705-bucket/health_violations.py",
                "--data_source",
                "s3://emr20220705-bucket/food_establishment_data.csv",
                "--output_uri",
                "s3://emr20220705-bucket/MyOutputFolder"
            ]
        },
        "ActionOnFailure": "CONTINUE",
        "Status": {
            "State": "COMPLETED",
            "StateChangeReason": {},
            "Timeline": {
                "CreationDateTime": "2022-07-06T11:21:15.863000-05:00",
                "StartDateTime": "2022-07-06T11:21:29.842000-05:00",
                "EndDateTime": "2022-07-06T11:22:02.061000-05:00"
            }
        }
    }
}


View results

In [32]:
!aws s3api list-objects --bucket emr20220705-bucket

{
    "Contents": [
        {
            "Key": "MyOutputFolder/_SUCCESS",
            "LastModified": "2022-07-06T16:22:00+00:00",
            "ETag": "\"d41d8cd98f00b204e9800998ecf8427e\"",
            "Size": 0,
            "StorageClass": "STANDARD",
            "Owner": {
                "DisplayName": "ramirocadavid",
                "ID": "473deab5a66936901bdb14e2e649f82810cd9ba06781c2e41076db8af13ba357"
            }
        },
        {
            "Key": "MyOutputFolder/part-00000-c872ba62-1ed7-4986-bf83-5608524cee7f-c000.csv",
            "LastModified": "2022-07-06T16:21:59+00:00",
            "ETag": "\"e0d83228014ebdd8848d89563cd8d4f1\"",
            "Size": 219,
            "StorageClass": "STANDARD",
            "Owner": {
                "DisplayName": "ramirocadavid",
                "ID": "473deab5a66936901bdb14e2e649f82810cd9ba06781c2e41076db8af13ba357"
            }
        },
        {
            "Key": "food_establishment_data.csv",
            "LastModified": 

In [37]:
!aws s3api get-object \
--bucket emr20220705-bucket \
--key MyOutputFolder/part-00000-c872ba62-1ed7-4986-bf83-5608524cee7f-c000.csv \
output.csv

{
    "AcceptRanges": "bytes",
    "LastModified": "2022-07-06T16:21:59+00:00",
    "ContentLength": 219,
    "ETag": "\"e0d83228014ebdd8848d89563cd8d4f1\"",
    "ContentType": "binary/octet-stream",
    "Metadata": {}
}


## Clean up resources

Terminate cluster

In [38]:
!aws emr terminate-clusters --cluster-ids j-1ETWETHEL7UQQ  

In [44]:
!aws emr describe-cluster --cluster-id j-1ETWETHEL7UQQ

{
    "Cluster": {
        "Id": "j-1ETWETHEL7UQQ",
        "Name": "food-cluster",
        "Status": {
            "State": "TERMINATED",
            "StateChangeReason": {
                "Code": "USER_REQUEST",
                "Message": "Terminated by user request"
            },
            "Timeline": {
                "CreationDateTime": "2022-07-06T11:12:45.882000-05:00",
                "ReadyDateTime": "2022-07-06T11:17:46.676000-05:00",
                "EndDateTime": "2022-07-06T11:28:37.614000-05:00"
            }
        },
        "Ec2InstanceAttributes": {
            "Ec2KeyName": "ec2KeyPair",
            "RequestedEc2SubnetIds": [],
            "Ec2AvailabilityZone": "us-east-1a",
            "RequestedEc2AvailabilityZones": [],
            "IamInstanceProfile": "EMR_EC2_DefaultRole",
            "EmrManagedMasterSecurityGroup": "sg-0f721147b6edce673",
            "EmrManagedSlaveSecurityGroup": "sg-0d673ae7607b1191c"
        },
        "InstanceCollectionType": "INST

Delete S3 bucket

In [42]:
!aws s3api list-objects --bucket emr20220705-bucket | jq .Contents[].Key

[0;32m"MyOutputFolder/_SUCCESS"[0m
[0;32m"MyOutputFolder/part-00000-c872ba62-1ed7-4986-bf83-5608524cee7f-c000.csv"[0m
[0;32m"food_establishment_data.csv"[0m
[0;32m"health_violations.py"[0m


In [43]:
!aws s3api delete-objects \
--bucket emr20220705-bucket \
--delete Objects=[{Key=MyOutputFolder/_SUCCESS},{Key=MyOutputFolder/part-00000-c872ba62-1ed7-4986-bf83-5608524cee7f-c000.csv},{Key=food_establishment_data.csv},{Key=health_violations.py}]

{
    "Deleted": [
        {
            "Key": "food_establishment_data.csv"
        },
        {
            "Key": "MyOutputFolder/_SUCCESS"
        },
        {
            "Key": "health_violations.py"
        },
        {
            "Key": "MyOutputFolder/part-00000-c872ba62-1ed7-4986-bf83-5608524cee7f-c000.csv"
        }
    ]
}


In [45]:
!aws s3api delete-bucket --bucket emr20220705-bucket

Check that bucket was deleted

In [46]:
!aws s3api list-buckets | jq .Buckets[].Name

[0;32m"aws-athena-query-results-211966428728-us-east-1"[0m
