# Pipeline AWS

## Camada Bronze

In [2]:
import pandas as pd
import requests
import os
import boto3
from io import BytesIO
import warnings
warnings.filterwarnings('ignore')

### Extraindo os arquivos

In [3]:
def extract_data(url, local_file):
    headers = {
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/91.0.4472.124 Safari/537.36'
    }
    try:
        response = requests.get(url, headers=headers)
        response.raise_for_status() 
        os.makedirs(os.path.dirname(local_file), exist_ok=True)
        with open(local_file, 'wb') as file:
            file.write(response.content)
        print(f'Data saved to {local_file}')
    except requests.exceptions.HTTPError as e:
        print(f'HTTP Error: {e}')
    except Exception as e:
        print(f'Error: {e}')

In [4]:
urls = [
    ('https://data.boston.gov/dataset/8048697b-ad64-4bfc-b090-ee00169f2323/resource/ea2e4696-4a2d-429c-9807-d02eb92e0222/download/tmpcje3ep_w.csv', 'data/dados_2019.csv'),
    ('https://data.boston.gov/dataset/8048697b-ad64-4bfc-b090-ee00169f2323/resource/6ff6a6fd-3141-4440-a880-6f60a37fe789/download/tmpcv_10m2s.csv', 'data/dados_2020.csv'),
    ('https://data.boston.gov/dataset/8048697b-ad64-4bfc-b090-ee00169f2323/resource/f53ebccd-bc61-49f9-83db-625f209c95f5/download/tmp88p9g82n.csv', 'data/dados_2021.csv'),
    ('https://data.boston.gov/dataset/8048697b-ad64-4bfc-b090-ee00169f2323/resource/81a7b022-f8fc-4da5-80e4-b160058ca207/download/tmpfm8veglw.csv', 'data/dados_2022.csv'),
    ('https://data.boston.gov/dataset/8048697b-ad64-4bfc-b090-ee00169f2323/resource/e6013a93-1321-4f2a-bf91-8d8a02f1e62f/download/tmpwbgyud93.csv', 'data/dados_2023.csv'),
]

for url, file in urls:
    extract_data(url, file)

Data saved to data/dados_2019.csv
Data saved to data/dados_2020.csv
Data saved to data/dados_2021.csv
Data saved to data/dados_2022.csv
Data saved to data/dados_2023.csv


In [None]:
arquivos = [
    'data/raw/dados_2019.csv',
    'data/raw/dados_2020.csv',
    'data/raw/dados_2021.csv',
    'data/raw/dados_2022.csv',
    'data/raw/dados_2023.csv'
]

dfs = {}

for arquivo in arquivos:
    ano = arquivo.split('_')[-1].split('.')[0]
    dfs[ano] = pd.read_csv(arquivo)

dfs['2019'].head()

Unnamed: 0,case_enquiry_id,open_dt,sla_target_dt,closed_dt,on_time,case_status,closure_reason,case_title,subject,reason,...,neighborhood,neighborhood_services_district,ward,precinct,location_street_name,location_zipcode,latitude,longitude,geom_4326,source
0,101002767874,2019-01-01 00:03:00,2019-01-03 03:30:00,2019-01-01 02:59:43,ONTIME,Closed,Case Closed. Closed date : 2019-01-01 07:59:43...,PRINTED : Knocked down/Eric Huynh,Public Works Department,Street Lights,...,Roxbury,13,Ward 14,1401.0,192 Magnolia St,2121.0,42.31199,-71.073181,0101000020E610000065AD9DFDAEC451C0051BFB45EF27...,Constituent Call
1,101002767875,2019-01-01 00:17:00,,2019-11-24 08:48:34,ONTIME,Closed,Case Closed. Closed date : 2019-11-24 13:48:34...,Fire Hydrant,Boston Water & Sewer Commission,Fire Hydrant,...,Back Bay,14,5,505.0,INTERSECTION Beaver St & Beacon St,,42.355555,-71.072049,0101000020E61000001A659F739CC451C0516121D7822D...,Constituent Call
2,101002767877,2019-01-01 00:31:51,2019-01-03 03:30:00,2019-01-01 21:27:40,ONTIME,Closed,Case Closed. Closed date : 2019-01-02 02:27:40...,Parking Enforcement,Transportation - Traffic Division,Enforcement & Abandoned Vehicles,...,Boston,3,03,,35 Fruit St,2114.0,42.362755,-71.069185,0101000020E61000009E8A6A866DC451C0F2243BBF6E2E...,Citizens Connect App
3,101002767878,2019-01-01 00:42:00,,2019-05-17 08:33:13,ONTIME,Closed,Case Closed. Closed date : 2019-05-17 12:33:13...,Police: Full Notifications,Mayor's 24 Hour Hotline,Notification,...,Dorchester,8,Ward 15,1503.0,35-37 Clarkson St,2125.0,42.30893,-71.066961,0101000020E61000007301041549C451C010DDC5008B27...,Constituent Call
4,101002767879,2019-01-01 01:09:12,2019-01-03 03:30:00,2019-01-01 21:28:11,ONTIME,Closed,Case Closed. Closed date : 2019-01-02 02:28:11...,Parking Enforcement,Transportation - Traffic Division,Enforcement & Abandoned Vehicles,...,East Boston,1,Ward 1,109.0,196 Trenton St,2128.0,42.380799,-71.03197,0101000020E6100000DAA350CD0BC251C0A6639A09BE30...,Citizens Connect App


### Criando chave de acesso para conectar ao bucket S3 via código e testando sua conexão

In [None]:
#define acesso
aws_access_key_id = 'aws_access_key_id'
aws_secret_access_key = 'tS2Ci/aws_secret_access_key/TnwpM6DlZXZEzWO82nEHu'
region_name = 'us-region_name-2'
#cria uma sessão padrão
boto3.setup_default_session(
    aws_access_key_id = aws_access_key_id, 
    aws_secret_access_key= aws_secret_access_key, 
    region_name = region_name
)
#cria cliente s3
s3 = boto3.client('s3')

In [7]:
content = '''
    Hello, s3
'''
with open('hello_s3.txt', 'w+') as f:
    f.write(content)

In [8]:
s3.upload_file('hello_s3.txt', 'aws-datalake-datalake', 'bronze/hello_s3')

### Salvando os arquivos em parquet no S3

In [9]:
for ano, df in dfs.items():
    parquet_buffer = BytesIO() #cria arquivo virtual
    df.to_parquet(parquet_buffer) #converte dataframe para o formato parquet
    #instrui o envio do arquivo para o bucket no S3
    s3.put_object(
        Bucket = 'aws-datalake-datalake', #define nome do bucket do S3
        Key = f'bronze/dados_{ano}.parquet', #define nome e caminho do arquivo
        Body = parquet_buffer.getvalue() #fornece conteúdo do arquivo a ser enviado
    )

In [10]:
#lista objetos no bucket S3
response = s3.list_objects(Bucket='aws-datalake-datalake')

In [11]:
keys = [obj['Key'] for obj in response ['Contents']]
print(keys)

['bronze/', 'bronze/dados_2019.parquet', 'bronze/dados_2020.parquet', 'bronze/dados_2021.parquet', 'bronze/dados_2022.parquet', 'bronze/dados_2023.parquet', 'gold/', 'silver/']


## Camada Gold

In [12]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, when
from pyspark.sql.types import TimestampType
import argparse


def transform_data(database: str, table_source: str, table_target: str) -> None:
    spark = (SparkSession.buider.appName('Boston 311 Service Requests Analysis').enableHiveSupport().getOrCreate())
    df = spark.read.table(f'`{database}`.`{table_source}`')
    df = (
        df.withColumn('open_dt', col('open_dt').cast(TimestampType()))
        .withColumn('closed_dt', col('closed_dt').cast(TimestampType()))
        .withColumn('target_dt', col('target_dt').cast(TimestampType()))
    )
    df = df.withColumn(
            'delay_days', 
            when(
                col('closed_dt') > col('target_dt'),
                (unix_timestamp('closed_dt') - unix_timestamp('target_dt')) / 86400, ).otherwise(0),
        )
    columns_to_keep = [
        'case_enquiry_id',
        'open_dt',
        'closed_dt',
        'target_dt',
        'case_status',
        'ontime',
        'closure_reason',
        'case_title',
        'subject',
        'reason',
        'neighborhood',
        'location_street_name',
        'location_zipcode',
        'latitude',
        'longitude',
        'source',
        'delay_days'

    ]
    df_selected = df.select(columns_to_keep)
    df_selected.createOrReplaceTempView('boston_311_data')
    query = '''
        SELECT * FROM boston_311_data
        WHERE case_status = 'Closed' AND delay_days > 0
        ORDER BY delay_days DESC;
    '''
    result_df = spark.sql(query)
    result_df.write.mode('overwrite').format('parquet').saveAsTable(f'`{database}`.`{table_target}`')
    spark.stop()


if __name__ == '__main__':
    parser = argparse.ArgumentParser(
        description='Trasnformar dados de solicitações de serviço de Bosto 311'

    )
    parser.add_argument('--database', type=str, help='Nome do banco de dados no Glue Data Catalog')
    parser.add_argument('--table_source', type=str, help='Nome da tabela origem no Glue Data Catalog')
    parser.add_argument('--table_target', type=str, help='Nome da tabela dstino no Glue Data Catalog')

    args = parser.parse_args()
    transform_data(args.database, args.table_source, args.table_target)

ModuleNotFoundError: No module named 'pyspark'