<a href="https://colab.research.google.com/github/pmsil/pipeline_data_lake_aws_Data_Boston/blob/main/Pipeline_aws_datalakealura.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Pipeline de ingestão de dados externos na AWS**

**Objetivo:** Construção de uma pipeline completa para ingestão de dados externos em um data lake na AWS.

**Serviços utilizados:**

1.   IAM
2.   S3
3.   Lake Formation

**Fonte:** usaremos uma base de dados chamada **Data Boston**, com as informações entre os anos de 2015 e 2020 sobre diversas solicitações da cidade de Boston.

**URL:** https://data.boston.gov/dataset/311-service-requests

**Observação:** Esse projeto foi criado a partir da:

**Formação Alura**: AWS Data Lake: construindo pipelines na AWS

**Curso**: AWS Data Lake: criando uma pipeline para ingestão de dados

**Instrutora:** Ana Hashimoto

In [1]:
!mkdir -p data

In [29]:
import urllib.request
import pandas as pd
from google.colab import userdata
from io import BytesIO

In [16]:
pip install boto3

Collecting boto3
  Downloading boto3-1.34.108-py3-none-any.whl (139 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.3/139.3 kB[0m [31m3.5 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting botocore<1.35.0,>=1.34.108 (from boto3)
  Downloading botocore-1.34.108-py3-none-any.whl (12.2 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m12.2/12.2 MB[0m [31m30.6 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting jmespath<2.0.0,>=0.7.1 (from boto3)
  Downloading jmespath-1.0.1-py3-none-any.whl (20 kB)
Collecting s3transfer<0.11.0,>=0.10.0 (from boto3)
  Downloading s3transfer-0.10.1-py3-none-any.whl (82 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m82.2/82.2 kB[0m [31m11.2 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: jmespath, botocore, s3transfer, boto3
Successfully installed boto3-1.34.108 botocore-1.34.108 jmespath-1.0.1 s3transfer-0.10.1


In [17]:
import boto3

In [3]:
def extract_data(url, filename):
  try:
    urllib.request.urlretrieve (url, filename)

  except Exception as e:
    print(e)

In [5]:
extract_data("https://data.boston.gov/dataset/8048697b-ad64-4bfc-b090-ee00169f2323/resource/c9509ab4-6f6d-4b97-979a-0cf2a10c922b/download/311_service_requests_2015.csv", "data/dados_2015.csv")
extract_data("https://data.boston.gov/dataset/8048697b-ad64-4bfc-b090-ee00169f2323/resource/b7ea6b1b-3ca4-4c5b-9713-6dc1db52379a/download/311_service_requests_2016.csv", "data/dados_2016.csv")
extract_data("https://data.boston.gov/dataset/8048697b-ad64-4bfc-b090-ee00169f2323/resource/30022137-709d-465e-baae-ca155b51927d/download/311_service_requests_2017.csv", "data/dados_2017.csv")
extract_data("https://data.boston.gov/dataset/8048697b-ad64-4bfc-b090-ee00169f2323/resource/2be28d90-3a90-4af1-a3f6-f28c1e25880a/download/311_service_requests_2018.csv", "data/dados_2018.csv")
extract_data("https://data.boston.gov/dataset/8048697b-ad64-4bfc-b090-ee00169f2323/resource/ea2e4696-4a2d-429c-9807-d02eb92e0222/download/311_service_requests_2019.csv", "data/dados_2019.csv")
extract_data("https://data.boston.gov/dataset/8048697b-ad64-4bfc-b090-ee00169f2323/resource/6ff6a6fd-3141-4440-a880-6f60a37fe789/download/script_105774672_20210108153400_combine.csv", "data/dados_2020.csv")


In [10]:
arquivos = [
    "data/dados_2015.csv",
    "data/dados_2016.csv",
    "data/dados_2017.csv",
    "data/dados_2018.csv",
    "data/dados_2019.csv",
    "data/dados_2020.csv",
]

In [7]:
dfs = {}

In [11]:
for arquivo in arquivos:
  ano = arquivo.split("_")[-1].split(".")[0]
  dfs[ano] = pd.read_csv(arquivo)

In [15]:
dfs["2018"].head()

Unnamed: 0,case_enquiry_id,open_dt,target_dt,closed_dt,ontime,case_status,closure_reason,case_title,subject,reason,...,police_district,neighborhood,neighborhood_services_district,ward,precinct,location_street_name,location_zipcode,latitude,longitude,source
0,101002296861,2018-01-01 00:08:00,,2018-03-01 15:18:12,ONTIME,Closed,Case Closed. Closed date : 2018-03-01 15:18:12...,Fire Hydrant,Boston Water & Sewer Commission,Fire Hydrant,...,E13,Jamaica Plain,11,Ward 19,1903,413 Centre St,2130.0,42.3214,-71.1109,Constituent Call
1,101002296862,2018-01-01 00:09:46,2018-01-02 08:30:00,2018-01-02 21:15:22,OVERDUE,Closed,Case Closed. Closed date : 2018-01-02 21:15:22...,Parking Enforcement,Transportation - Traffic Division,Enforcement & Abandoned Vehicles,...,C6,South Boston / South Boston Waterfront,5,Ward 6,604,428 W Second St,2127.0,42.3373,-71.0445,Citizens Connect App
2,101002296864,2018-01-01 00:11:24,2018-01-02 08:30:00,2018-01-02 21:15:11,OVERDUE,Closed,Case Closed. Closed date : 2018-01-02 21:15:11...,Parking Enforcement,Transportation - Traffic Division,Enforcement & Abandoned Vehicles,...,C6,South Boston / South Boston Waterfront,5,Ward 6,604,401-403 W Second St,2127.0,42.3371,-71.0442,Citizens Connect App
3,101002296865,2018-01-01 00:19:06,2018-01-02 08:30:00,2018-01-02 21:14:48,OVERDUE,Closed,Case Closed. Closed date : 2018-01-02 21:14:48...,Parking Enforcement,Transportation - Traffic Division,Enforcement & Abandoned Vehicles,...,B3,Dorchester,9,17,1701,INTERSECTION School St & Washington St,,42.3594,-71.0587,Citizens Connect App
4,101002296866,2018-01-01 00:21:25,2018-01-02 08:30:00,2018-01-02 21:14:32,OVERDUE,Closed,Case Closed. Closed date : 2018-01-02 21:14:32...,Parking Enforcement,Transportation - Traffic Division,Enforcement & Abandoned Vehicles,...,B3,Dorchester,9,Ward 17,1701,6-8 School St,2124.0,42.2965,-71.0729,Citizens Connect App


Conectando a conta de armazenamento

In [34]:
# Utilizei o secrets do google colab para proteger os dados de acesso

aws_access_key_id = userdata.get('aws_access_key_id')
aws_secret_access_key = userdata.get('aws_secret_access_key')
region_name = "us-east-2"

boto3.setup_default_session(
    aws_access_key_id = aws_access_key_id,
    aws_secret_access_key = aws_secret_access_key,
    region_name = region_name,
)

s3 = boto3.client("s3")

In [31]:
content = """
Olá, S3"
"""
with open ("hello-secret-s3.txt", "w+") as f:
  f.write(content)

In [33]:
s3.upload_file("hello-secret-s3.txt", "alura-datalakeaws-pmsil", "bronze/hello-secret-s3")

Salvar arquivo em Parquet

In [24]:
for ano, df in dfs.items():
  parquet_buffer = BytesIO()
  df.to_parquet(parquet_buffer)

  s3.put_object(
      Bucket = "alura-datalakeaws-pmsil",
      Key=f"bronze/dados_{ano}.parquet",
      Body=parquet_buffer.getvalue(),
  )

In [25]:
response = s3.list_objects(Bucket = "alura-datalakeaws-pmsil")

In [27]:
keys = [obj["Key"] for obj in response ["Contents"]]
print(keys)

['bronze/', 'bronze/dados_2015.parquet', 'bronze/dados_2016.parquet', 'bronze/dados_2017.parquet', 'bronze/dados_2018.parquet', 'bronze/dados_2019.parquet', 'bronze/dados_2020.parquet', 'bronze/hello-s3', 'silver/']
