<a id="1"></a>
## <font color= #0cc754> 1. Introduction </font>


This notebook is a project based on what I learned about the AWS cloud when doing an Alura course. In this project we make use of several cloud services such as AWS Glue, AWS S3, AWS EMR and others. Sometimes an image is better than a 1000 words, so the pipeline we will be using here can be summarized by the following diagram: (When looking on GitHub preview, use night theme)


<p align="center">
    <img src="images/boston_diagram.png" alt="boston_architecture">
</p>


<a id="2"></a>
## <font color= #0cc754> 2. Library Imports </font>


In [26]:
import urllib.request
import os
import pandas as pd
import boto3
from dotenv import load_dotenv
from io import BytesIO
import shutil
import sys
sys.path.append('../')
import utils
from utils import *
import pyarrow.parquet as pq


<a id="3"></a>
## <font color= #0cc754> 3. Data extraction and upload into S3: Bronze layer </font>


First create a directory called data in the same directory as this notebook

In [1]:
!mkdir data

Now we create a function to extract data from the Boston Dataset https://data.boston.gov/dataset/311-service-requests

In [2]:
def extract_data(url, filename):
  try:
    urllib.request.urlretrieve(url, filename)
  except Exception as e:
    print(e)


Now we extract the data from that dataset, years 2015-2020.

In [5]:
extract_data("https://data.boston.gov/dataset/8048697b-ad64-4bfc-b090-ee00169f2323/resource/c9509ab4-6f6d-4b97-979a-0cf2a10c922b/download/311_service_requests_2015.csv", "data/data_2015.csv")
extract_data("https://data.boston.gov/dataset/8048697b-ad64-4bfc-b090-ee00169f2323/resource/b7ea6b1b-3ca4-4c5b-9713-6dc1db52379a/download/311_service_requests_2016.csv", "data/data_2016.csv")
extract_data("https://data.boston.gov/dataset/8048697b-ad64-4bfc-b090-ee00169f2323/resource/30022137-709d-465e-baae-ca155b51927d/download/311_service_requests_2017.csv", "data/data_2017.csv")
extract_data("https://data.boston.gov/dataset/8048697b-ad64-4bfc-b090-ee00169f2323/resource/2be28d90-3a90-4af1-a3f6-f28c1e25880a/download/311_service_requests_2018.csv", "data/data_2018.csv")
extract_data("https://data.boston.gov/dataset/8048697b-ad64-4bfc-b090-ee00169f2323/resource/ea2e4696-4a2d-429c-9807-d02eb92e0222/download/311_service_requests_2019.csv", "data/data_2019.csv")
extract_data("https://data.boston.gov/dataset/8048697b-ad64-4bfc-b090-ee00169f2323/resource/6ff6a6fd-3141-4440-a880-6f60a37fe789/download/script_105774672_20210108153400_combine.csv", "data/data_2020.csv")


We verify that our files were downloaded

In [3]:
file_names = os.listdir('./data')
print(file_names)

['data_2015.csv', 'data_2016.csv', 'data_2017.csv', 'data_2018.csv', 'data_2019.csv', 'data_2020.csv']


We create a dictionary of dataframes, containing the dataframes we downloaded previously

In [4]:
dict_dfs = {}
for file in file_names:
    year = file.split("_")[-1].split(".")[0]
    dict_dfs[year] = pd.read_csv("data/" + file)


We can take a look at our dataset, to have an idea how it looks like. For example, for 2015 we have

In [9]:
dict_dfs['2015'].head()

Unnamed: 0,case_enquiry_id,open_dt,target_dt,closed_dt,ontime,case_status,closure_reason,case_title,subject,reason,...,police_district,neighborhood,neighborhood_services_district,ward,precinct,location_street_name,location_zipcode,latitude,longitude,source
0,101001240734,2015-01-01 00:20:09,,2015-01-02 09:08:42,ONTIME,Closed,Case Closed Case Noted,Other,Mayor's 24 Hour Hotline,Notification,...,A1,Downtown / Financial District,14,Ward 5,503,33 Beacon St,2108.0,42.3574,-71.0648,Constituent Call
1,101001240735,2015-01-01 00:25:23,2015-01-16 08:30:00,2015-01-02 16:17:43,ONTIME,Closed,Case Closed Duplicate of Existing Case duplica...,PRINTED,Public Works Department,Street Lights,...,D4,Fenway / Kenmore / Audubon Circle / Longwood,14,Ward 5,502,145-151 Ipswich St,2215.0,42.3474,-71.0944,Citizens Connect App
2,101001240736,2015-01-01 00:25:36,2015-01-16 08:30:00,2015-04-28 08:49:50,OVERDUE,Closed,Case Closed Case Resolved Rebuilt luminaires a...,PRINTED,Public Works Department,Street Lights,...,D4,Fenway / Kenmore / Audubon Circle / Longwood,14,Ward 5,502,145-151 Ipswich St,2215.0,42.3474,-71.0944,Citizens Connect App
3,101001240740,2015-01-01 01:45:37,,,ONTIME,Open,,Animal Found,Animal Control,Animal Issues,...,E5,West Roxbury,12,Ward 20,2010,4833 Washington St,2132.0,42.2711,-71.1464,Constituent Call
4,101001240742,2015-01-01 02:24:59,,2015-01-02 09:09:00,ONTIME,Closed,Case Closed Case Noted,Fire,Mayor's 24 Hour Hotline,Notification,...,C6,Dorchester,5,Ward 7,708,10 Washburn St,2125.0,42.3254,-71.0574,Constituent Call


Again for 2015 we can use our functions form utils.py to describe the numeric and object columns. 

In [15]:
numeric_summary(dict_dfs['2015'])

1. Data shape (rows, columns): (210084, 29)
2. Number of duplicate rows: 0


Unnamed: 0,Dtype,Counts,Nulls,Min,Max,Uniques,UniqueValues
case_enquiry_id,int64,210084,0,101001200000.0,101001700000.0,210084,Too many
location_zipcode,float64,161353,48731,2108.0,2467.0,35,Too many
latitude,float64,210084,0,42.2321,42.3952,1612,Too many
longitude,float64,210084,0,-71.1854,-70.9957,1788,Too many


In [17]:
object_summary(dict_dfs['2015'])

1. Data shape (rows, columns): (210084, 29)
2. Number of duplicate rows: 0


Unnamed: 0,Dtype,Counts,Nulls,Top,Frequency,Uniques,UniqueValues
open_dt,object,210084,0,2015-07-23 10:51:00,17,192517,Too many
target_dt,object,172896,37188,2015-02-19 08:30:00,4186,111667,Too many
closed_dt,object,194390,15694,2015-02-20 00:56:13,17,179380,Too many
ontime,object,209925,159,ONTIME,146332,3,"[ONTIME, OVERDUE, nan]"
case_status,object,210084,0,Closed,194390,2,"[Closed, Open]"
closure_reason,object,210084,0,,15632,131641,Too many
case_title,object,209893,191,Request for Snow Plowing,30325,3717,Too many
subject,object,210084,0,Public Works Department,132347,14,Too many
reason,object,210084,0,Street Cleaning,51483,50,Too many
type,object,210084,0,Request for Snow Plowing,30358,184,Too many


We already see some things we should do, for example open_dt should be timestamp data type, but at this stage, we are only conserned in the raw data, no transformation will occur. 

Next we will access our AWS credentials from our .env file. This, of course, will not be available on GitHub.

In [2]:
# Load environment variables from .env file
load_dotenv('.env')

# Access the variables
aws_access_key_id = os.getenv('AWS_ACCESS_KEY_ID')
aws_secret_access_key = os.getenv('AWS_SECRET_ACCESS_KEY')
region_name = os.getenv('REGION_NAME')

We start boto3 client.

In [3]:
boto3.setup_default_session(
    aws_access_key_id=aws_access_key_id,
    aws_secret_access_key=aws_secret_access_key,
    region_name=region_name
)

s3 = boto3.client('s3')

Just to see which buckets we have

In [10]:
s3.list_buckets()['Buckets']

[{'Name': 'project-bostondata',
  'CreationDate': datetime.datetime(2024, 10, 9, 17, 59, 18, tzinfo=tzutc())}]

Now we convert our .csv files into parquet ( for compression purposes) and upload them into our s3 folder. This step can take some time.

In [11]:
for year, df in dict_dfs.items():
    parquet_buffer = BytesIO()
    df.to_parquet(parquet_buffer)
    s3.put_object(
        Bucket="project-bostondata",
        Key=f"bronze/data_{year}.parquet",
        Body=parquet_buffer.getvalue(),
    )

To ensure our files are uploaded, we list the objects of our Bucket.

In [12]:
response = s3.list_objects(Bucket="project-bostondata")
print( [obj['Key'] for obj in response['Contents']] )

['bronze/', 'bronze/data_2015.parquet', 'bronze/data_2016.parquet', 'bronze/data_2017.parquet', 'bronze/data_2018.parquet', 'bronze/data_2019.parquet', 'bronze/data_2020.parquet', 'silver/']


Now we delete our data folder, since our data has already been uploaded in the s3 bucket.

In [13]:
# Delete the ./data folder and its contents
shutil.rmtree('./data')

<a id="4"></a>
## <font color= #0cc754> 4. Transformation of the raw data: Silver layer </font>

Our next step is to transform this raw data in a more useful one. Here we make the s3 silver layer. Here is which modifications we are going to do

| Column Name | Data Type | Transformation|
|------|-------------|-------|
| closure_reason   | string | Drop Column |
| closure_reason_normalized | string   | New column: Take column `closure_reason` and extract the date when the case was closed using Regex |
| open_year   | int | New column: Extract year from `open_dt` column |
| open_month    | int |New column: Extract month from `open_dt` column |
| duration_hours    | int |New column: Duration of the case, in hours |


To do these transformations we are going to use AWS Glue. This next part takes place in the AWS console, I am providing in this notebook some information about the procedure, but a lot of datails will not be presented here, so some knowledge about AWS enviroment is required.

For example, one needs to create a datalake using AWS datalake formation, a data catalog table for the bronze layer (using AWS Glue Crawler) and for the silver layer where your user has the correct permissions. The .json file for the schema of silver layer table is provided: 

```json 
[
    {"Name": "case_enquiry_id", "Type": "bigint"},
    {"Name": "open_dt", "Type": "timestamp"},
    {"Name": "target_dt", "Type": "timestamp"},
    {"Name": "closed_dt", "Type": "timestamp"},
    {"Name": "ontime", "Type": "string"},
    {"Name": "case_status", "Type": "string"},
    {"Name": "case_title", "Type": "string"},
    {"Name": "subject", "Type": "string"},
    {"Name": "reason", "Type": "string"},
    {"Name": "type", "Type": "string"},
    {"Name": "queue", "Type": "string"},
    {"Name": "department", "Type": "string"},
    {"Name": "submittedphoto", "Type": "string"},
    {"Name": "closedphoto", "Type": "string"},
    {"Name": "location", "Type": "string"},
    {"Name": "fire_district", "Type": "string"},
    {"Name": "pwd_district", "Type": "string"},
    {"Name": "city_council_district", "Type": "string"},
    {"Name": "police_district", "Type": "string"},
    {"Name": "neighborhood", "Type": "string"},
    {"Name": "neighborhood_services_district", "Type": "string"},
    {"Name": "ward", "Type": "string"},
    {"Name": "precinct", "Type": "string"},
    {"Name": "location_street_name", "Type": "string"},
    {"Name": "location_zipcode", "Type": "string"},
    {"Name": "latitude", "Type": "string"},
    {"Name": "longitude", "Type": "string"},
    {"Name": "source", "Type": "string"},
    {"Name": "closure_reason_normalized", "Type": "string"},
    {"Name": "duration_hours", "Type": "double"}
]
```


Next, using the AWS Glue job, we use the following script

```python
# This is the code taken from the script generated by AWS Glue
import sys
from awsglue.transforms import *
from awsglue.utils import getResolvedOptions
from pyspark.context import SparkContext
from awsglue.context import GlueContext
from awsglue.job import Job
import gs_regex_extract
from awsglue import DynamicFrame
import gs_derived

def sparkSqlQuery(glueContext, query, mapping, transformation_ctx) -> DynamicFrame:
    for alias, frame in mapping.items():
        frame.toDF().createOrReplaceTempView(alias)
    result = spark.sql(query)
    return DynamicFrame.fromDF(result, glueContext, transformation_ctx)
args = getResolvedOptions(sys.argv, ['JOB_NAME'])
sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args)

# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node1729000689671 = glueContext.create_dynamic_frame.from_catalog(database="datalake-aws-boston", table_name="bronzebronze", transformation_ctx="AWSGlueDataCatalog_node1729000689671")

# Script generated for node Change Schema
ChangeSchema_node1729000760008 = ApplyMapping.apply(frame=AWSGlueDataCatalog_node1729000689671, mappings=[("case_enquiry_id", "long", "case_enquiry_id", "bigint"), ("open_dt", "string", "open_dt", "timestamp"), ("target_dt", "string", "target_dt", "timestamp"), ("closed_dt", "string", "closed_dt", "timestamp"), ("ontime", "string", "ontime", "string"), ("case_status", "string", "case_status", "string"), ("closure_reason", "string", "closure_reason", "string"), ("case_title", "string", "case_title", "string"), ("subject", "string", "subject", "string"), ("reason", "string", "reason", "string"), ("type", "string", "type", "string"), ("queue", "string", "queue", "string"), ("department", "string", "department", "string"), ("submittedphoto", "string", "submittedphoto", "string"), ("closedphoto", "string", "closedphoto", "string"), ("location", "string", "location", "string"), ("fire_district", "string", "fire_district", "string"), ("pwd_district", "string", "pwd_district", "string"), ("city_council_district", "string", "city_council_district", "string"), ("police_district", "string", "police_district", "string"), ("neighborhood", "string", "neighborhood", "string"), ("neighborhood_services_district", "string", "neighborhood_services_district", "string"), ("ward", "string", "ward", "string"), ("precinct", "string", "precinct", "string"), ("location_street_name", "string", "location_street_name", "string"), ("location_zipcode", "double", "location_zipcode", "string"), ("latitude", "double", "latitude", "string"), ("longitude", "double", "longitude", "string"), ("source", "string", "source", "string")], transformation_ctx="ChangeSchema_node1729000760008")

# Script generated for node Regex Extractor
RegexExtractor_node1729000885816 = ChangeSchema_node1729000760008.gs_regex_extract(colName="closure_reason", regex="^Case Closed\. Closed date : \d{4}-\d{2}-\d{2} \d{2}:\d{2}:\d{2}\.\d+ (.*)", newCols="closure_reason_normalized")

# Script generated for node Derived Column Year
DerivedColumnYear_node1729001020396 = RegexExtractor_node1729000885816.gs_derived(colName="open_year", expr="YEAR(open_dt) as open_year")

# Script generated for node Derived Column Month
DerivedColumnMonth_node1729001089494 = DerivedColumnYear_node1729001020396.gs_derived(colName="open_month", expr="MONTH(open_dt) as open_month")

# Script generated for node SQL Query
SqlQuery0 = '''
SELECT tb_1.*, ROUND((unix_timestamp(closed_dt)-unix_timestamp(open_dt)/3600),0) AS duration_hours from tb_1
'''
SQLQuery_node1729001146254 = sparkSqlQuery(glueContext, query = SqlQuery0, mapping = {"tb_1":DerivedColumnMonth_node1729001089494}, transformation_ctx = "SQLQuery_node1729001146254")

# Script generated for node Change Schema
ChangeSchema_node1729002886435 = ApplyMapping.apply(frame=SQLQuery_node1729001146254, mappings=[("case_enquiry_id", "long", "case_enquiry_id", "long"), ("open_dt", "timestamp", "open_dt", "timestamp"), ("target_dt", "timestamp", "target_dt", "timestamp"), ("closed_dt", "timestamp", "closed_dt", "timestamp"), ("ontime", "string", "ontime", "string"), ("case_status", "string", "case_status", "string"), ("case_title", "string", "case_title", "string"), ("subject", "string", "subject", "string"), ("reason", "string", "reason", "string"), ("type", "string", "type", "string"), ("queue", "string", "queue", "string"), ("department", "string", "department", "string"), ("submittedphoto", "string", "submittedphoto", "string"), ("closedphoto", "string", "closedphoto", "string"), ("location", "string", "location", "string"), ("fire_district", "string", "fire_district", "string"), ("pwd_district", "string", "pwd_district", "string"), ("city_council_district", "string", "city_council_district", "string"), ("police_district", "string", "police_district", "string"), ("neighborhood", "string", "neighborhood", "string"), ("neighborhood_services_district", "string", "neighborhood_services_district", "string"), ("ward", "string", "ward", "string"), ("precinct", "string", "precinct", "string"), ("location_street_name", "string", "location_street_name", "string"), ("location_zipcode", "string", "location_zipcode", "string"), ("latitude", "string", "latitude", "string"), ("longitude", "string", "longitude", "string"), ("source", "string", "source", "string"), ("closure_reason_normalized", "string", "closure_reason_normalized", "string"), ("duration_hours", "double", "duration_hours", "double"), ("open_year", "int", "open_year", "int"), ("open_month", "int", "open_month", "int")], transformation_ctx="ChangeSchema_node1729002886435")

# Script generated for node AWS Glue Data Catalog
AWSGlueDataCatalog_node1729003470610 = glueContext.write_dynamic_frame.from_catalog(frame=ChangeSchema_node1729002886435, database="datalake-aws-boston", table_name="silver", additional_options={"partitionKeys": ["open_year", "open_month"]}, transformation_ctx="AWSGlueDataCatalog_node1729003470610")

job.commit()
```

Or, in the ETL visual, we have
<p align="center">
    <img src="images/1.PNG" alt="1">
    <img src="images/2.PNG" alt="2">
</p>

<a id="5"></a>
## <font color= #0cc754> 5. Data processing: Gold layer </font>

We remember which columns we have in our dataframe

In [5]:
dict_dfs['2015'].columns

Index(['case_enquiry_id', 'open_dt', 'target_dt', 'closed_dt', 'ontime',
       'case_status', 'closure_reason', 'case_title', 'subject', 'reason',
       'type', 'queue', 'department', 'submittedphoto', 'closedphoto',
       'location', 'fire_district', 'pwd_district', 'city_council_district',
       'police_district', 'neighborhood', 'neighborhood_services_district',
       'ward', 'precinct', 'location_street_name', 'location_zipcode',
       'latitude', 'longitude', 'source'],
      dtype='object')

The planning for the s3 gold layer is stated here:

| Column Name | data type |Transformation|
|------|-------|--------|
| open_dt  | string |tranform to timestamp |
| closed_dt |string |tranform to timestamp |
| target_dt   | string |transform to timestamp |
|delay_days   | double | New column: closed_dt - target_dt|
|city_council_district| string| Drop Column|
 |closedphoto|string|Drop Column|
 |department|string|Drop Column|
 |fire_district|string|Drop Column|
 |location|string|Drop Column|
 |neighborhood_services_district|string|Drop Column|
 |police_district|string|Drop Column|
 |precinct|string|Drop Column|
 |pwd_district|string|Drop Column|
 |queue|string|Drop Column|
 |submittedphoto|string|Drop Column|
 |type|string|Drop Column|
 |ward|string|Drop Column|


After creating a s3 bucket for the gold layer, one needs to create a table. We provide the .json file for the schema for the gold layer table:

```json
[
  {
    "Name": "case_enquiry_id",
    "Type": "long",
    "Comment": "Identificador único para cada caso de consulta"
  },
  {
    "Name": "open_dt",
    "Type": "timestamp",
    "Comment": "Data e hora de abertura do caso"
  },
  {
    "Name": "closed_dt",
    "Type": "timestamp",
    "Comment": "Data e hora de fechamento do caso"
  },
  {
    "Name": "target_dt",
    "Type": "timestamp",
    "Comment": "Data e hora alvo do SLA"
  },
  {
    "Name": "case_status",
    "Type": "string",
    "Comment": "Status do caso"
  },
  {
    "Name": "ontime",
    "Type": "string",
    "Comment": "Indica se o caso foi resolvido dentro do tempo"
  },
  {
    "Name": "closure_reason",
    "Type": "string",
    "Comment": "Razão para o fechamento do caso"
  },
  {
    "Name": "case_title",
    "Type": "string",
    "Comment": "Título do caso"
  },
  {
    "Name": "subject",
    "Type": "string",
    "Comment": "Assunto do caso"
  },
  {
    "Name": "reason",
    "Type": "string",
    "Comment": "Razão do caso"
  },
  {
    "Name": "neighborhood",
    "Type": "string",
    "Comment": "Bairro"
  },
  {
    "Name": "location_street_name",
    "Type": "string",
    "Comment": "Nome da rua da localização"
  },
  {
    "Name": "location_zipcode",
    "Type": "integer",
    "Comment": "CEP da localização"
  },
  {
    "Name": "latitude",
    "Type": "double",
    "Comment": "Latitude da localização"
  },
  {
    "Name": "longitude",
    "Type": "double",
    "Comment": "Longitude da localização"
  },
  {
    "Name": "source",
    "Type": "string",
    "Comment": "Fonte do caso"
  },
  {
    "Name": "delay_days",
    "Type": "double",
    "Comment": "Dias de atraso"
  }
]
```

We can see how many entries we have on average in our dataset

In [10]:
total_rows = sum(df.shape[0] for df in dict_dfs.values())
average_size = total_rows / len(dict_dfs)
print(f"Average number of rows is {average_size}")

Average number of rows is 242030.0


With this size we should consider using pyspark. Since we will be using AWS EMR, we need to create a spark script, made available in the `main.py`. We need to upload it into the s3 bucket called `datalake-boston-emr`

In [23]:
# to see the list of buckets
buckets = s3.list_buckets()
print([bucket['Name'] for bucket in buckets['Buckets']])

['aws-glue-assets-140023372298-us-east-1', 'datalake-boston-emr', 'project-bostondata']


In [5]:
# we upload the main.py file to the bucket
s3.upload_file('main.py', 'datalake-boston-emr', 'main.py')

In case you need to see the name of your database and table names, you can start a boto3 client on glue and use the following code to see information regarding your database (changing the code where necessary).

In [25]:
# Initialize a Glue client
glue = boto3.client('glue', 
                    aws_access_key_id=aws_access_key_id, 
                    aws_secret_access_key=aws_secret_access_key, 
                    region_name=region_name)

# List databases
databases = glue.get_databases()
print("Databases:")
for db in databases['DatabaseList']:
    print(f"- {db['Name']}")

# List tables in a specific database
database_name = 'datalake-aws-boston'  # Replace with your database name
tables = glue.get_tables(DatabaseName=database_name)
print(f"\nTables in database '{database_name}':")
for table in tables['TableList']:
    print(f"- {table['Name']}")

# Get schema for a specific table
table_name = 'bronzebronze'  # Replace with your table name
table = glue.get_table(DatabaseName=database_name, Name=table_name)
print(f"\nSchema for table '{table_name}':")
for column in table['Table']['StorageDescriptor']['Columns']:
    print(f"- {column['Name']} ({column['Type']})")

Databases:
- datalake-aws-boston

Tables in database 'datalake-aws-boston':
- bronzebronze
- gold
- silver

Schema for table 'bronzebronze':
- case_enquiry_id (bigint)
- open_dt (string)
- target_dt (string)
- closed_dt (string)
- ontime (string)
- case_status (string)
- closure_reason (string)
- case_title (string)
- subject (string)
- reason (string)
- type (string)
- queue (string)
- department (string)
- submittedphoto (string)
- closedphoto (string)
- location (string)
- fire_district (string)
- pwd_district (string)
- city_council_district (string)
- police_district (string)
- neighborhood (string)
- neighborhood_services_district (string)
- ward (string)
- precinct (string)
- location_street_name (string)
- location_zipcode (double)
- latitude (double)
- longitude (double)
- source (string)


Now, going to AWS EMR, we create a cluster and, after creating it, in the steps section we provide the location of our `main.py`.

In the `Arguments - optional` section we need to pass `--database datalake-aws-boston --table_source bronzebronze --table_target gold`. This is in my case, the first is supposed to be the name of the database you created, the second is the table from the bronze layer ( I've made a slight mistake when creating it and made its name as bronzebronze) and finally the name of the target table, which I chose to be gold.

Now we can verify that the gold layer was indeed saved in our s3 bucket.

In [29]:
# Specify the folder path within the bucket
folder_path = "gold/"  # Replace with your folder path

# List objects in the specified folder within the bucket
response = s3.list_objects_v2(Bucket=bucket_name, Prefix=folder_path)

# Print the keys of the objects in the folder
if 'Contents' in response:
    for obj in response['Contents']:
        print(f"Key: {obj['Key']}, Size: {obj['Size']} bytes, LastModified: {obj['LastModified']}")
else:
    print("No objects found in the folder.")

Key: gold/, Size: 0 bytes, LastModified: 2024-10-15 21:32:31+00:00
Key: gold/_SUCCESS, Size: 0 bytes, LastModified: 2024-10-18 01:04:02+00:00
Key: gold/part-00000-f55ae4f1-f979-4409-be59-674f76e92e9f-c000.snappy.parquet, Size: 17694904 bytes, LastModified: 2024-10-18 01:04:02+00:00


<a id="6"></a>
## <font color= #0cc754> 6. Data insights and Dashboard with AWS QuickSight </font>


We download our .snappy.parquet file created in the previous section into the local data folder and convert it into a .csv file. We do that to use QuickSight in the newly create .csv file.

In [27]:
# Define the bucket name and the file key
bucket_name = 'project-bostondata'
file_key = 'gold/part-00000-f55ae4f1-f979-4409-be59-674f76e92e9f-c000.snappy.parquet'
download_path = './data/BostonData.snappy.parquet'

# Download the file
s3.download_file(bucket_name, file_key, download_path)

# Read the parquet file
table = pq.read_table(download_path)

# Remove the file, since we are not going to use it anymore
os.remove(download_path)

# Convert to pandas DataFrame
df = table.to_pandas()

# Save as CSV
csv_path = download_path.replace('.snappy.parquet', '.csv')
df.to_csv(csv_path, index=False)