# Introdução ao ETL em Python

In [1]:
# Import the required library
import requests

path = 'https://github.com/viniciusperdigao/Engenharia-de-Dados/raw/main/ETL%20em%20Python/base/PPR-2021.zip'
# Get the zip file
response = requests.get(path)

# Print the status code
print(response.status_code)

#Save the file locally 
local_path = "./PPR-ALL.zip"
with open(local_path, "wb") as f:
    f.write(response.content)

200


Acabamos de baixar a fonte dos dados e armazenamos em um diretório local. Mas o que realmente precisamos é do arquivo que ele contém: um CSV contendo dados sobre as transações imobiliárias e suas características.

In [29]:
# Extraindo arquivos de dentro do .zip
from zipfile import ZipFile

path = './PPR-ALL.zip'

with ZipFile(path, mode="r") as f:
  	# Get the list of files and print it
    file_names = f.namelist()
    print(file_names)
    # Extract the CSV file
    csv_file_path = f.extract(file_names[0])
    print(csv_file_path)

['PPR-2021.csv']
/home/vinicius/my_github/Engenharia-de-Dados/ETL em Python/PPR-2021.csv


Alterando cabeçalho e salvando em um novo arquivo .csv

In [43]:
fieldnames = {
    "Date of Sale (dd/mm/yyyy)": "date_of_sale",
    "Address": "address",
    "Postal Code": "postal_code",
    "County": "county",
    "Price (€)": "price",
    "Description of Property": "description",
}

In [46]:
import csv

# Nome do arquivo de entrada e de saída
input_file = "PPR-2021.csv"
output_file = "PPR-2021-Dublin-new-headers.csv"

# Abre o arquivo de entrada para leitura e o arquivo de saída para escrita
with open(input_file, "r", newline="", encoding="windows-1252") as csv_input_file, open(output_file, "w", newline="") as csv_output_file:
    reader = csv.DictReader(csv_input_file)
    writer = csv.DictWriter(csv_output_file, fieldnames.values())

    # Escreve o novo cabeçalho no arquivo de saída
    writer.writeheader()

    # Copia os dados do arquivo de entrada para o arquivo de saída, substituindo os nomes das colunas
    for row in reader:
        new_row = {fieldnames[key]: value for key, value in row.items()}
        writer.writerow(new_row)

print("Cabeçalho alterado com sucesso.")

KeyError: 'Not Full Market Price'

In [47]:
import os

os.makedirs(es)

# Extract

Baixando o novo arquivo de conjunto de dados da Web
Até agora, você baixou um arquivo, descompactou-o, leu e gravou em um CSV. Você fez tudo isso em scripts independentes e independentes. Agora, você vai agrupar tudo em funções para projetar a etapa Extract do seu pipeline ETL.

Agora você tem acesso a um IDE (ambiente de desenvolvimento integrado). Nada a temer: em vez de ter apenas um script, agora você tem todo um diretório ao redor do qual pode navegar para gerenciar seus scripts e dados. Esse diretório crescerá ao longo do curso à medida que você escrever mais scripts para o pipeline ETL.

Mas vamos começar pelo começo. Você vai definir uma primeira função, create_directory_if_not_exists(), para criar o diretório local onde os dados devem ser salvos.

Em seguida, você definirá uma segunda função, download_snapshot(), para baixar o arquivo compactado contendo as informações da transação da casa. Você obterá o arquivo de um URL externo salvo em uma variável chamada source_url. Você salvará o arquivo compactado localmente no local especificado pela variável source_path, aproveitando create_directory_if_not_exists().

Você pode ver as definições source_urldas source_pathvariáveis ​​e no script. Você também recebe uma base_pathvariável, que se refere ao diretório de trabalho atual ( /home/repl/workspace).

In [48]:
# Vamos fazer agora com funções. 

import os
import requests

# Paths
base_path = "."
source_url = "https://assets.datacamp.com/production/repositories/5899/datasets/66691278303f789ca4acd3c6406baa5fc6adaf28/PPR-ALL.zip"
source_path =  f"{base_path}/data/source/downloaded_at=2021-01-01/ppr-all.zip"

# Create a directory at the `path` passed as an argument
def create_directory_if_not_exists(path):
    """
    Create a new directory if it doesn't exists
    """
    # os.path.dirname() returns up to the directory path.
    # In this case it is: f"{base_path}/downloaded_at=2021-01-01"
    # "ppr-all.zip" is excluded
    os.makedirs(os.path.dirname(path), exist_ok=True)

# Write the file obtained to the specified directory
def download_snapshot():
    """
    Download the new dataset from the source
    """
    create_directory_if_not_exists(source_path)
    # Open the .zip file in binary mode
    with open(source_path, "wb") as source_ppr:
        # 'verify=False' skips the verification the SSL certificate
        response = requests.get(source_url, verify=False)
        source_ppr.write(response.content)

# Download the new dataset
download_snapshot()



![image.png](attachment:image.png)

Estrutura de pastas

In [None]:
import os
import csv
import tempfile
from zipfile import ZipFile

import requests

# Settings
base_path = os.path.abspath(__file__ + "/../../")


# START - Paths for new February 2021 data available

# External website file url
source_url = "https://assets.datacamp.com/production/repositories/5899/datasets/66691278303f789ca4acd3c6406baa5fc6adaf28/PPR-ALL.zip"
# Source path where we want to save the .zip file downloaded from the website
source_path = f"{base_path}/data/source/downloaded_at=2021-02-01/PPR-ALL.zip"
# Raw path where we want to extract the new .csv data
raw_path = f"{base_path}/data/raw/downloaded_at=2021-02-01/ppr-all.csv"

# END - Paths for new February 2021 data available


def create_folder_if_not_exists(path):
    """
    Create a new folder if it doesn't exists
    """
    os.makedirs(os.path.dirname(path), exist_ok=True)


def download_snapshot():
    """
    Download the new dataset from the source
    """
    create_folder_if_not_exists(source_path)
    with open(source_path, "wb") as source_ppr:
        response = requests.get(source_url, verify=False)
        source_ppr.write(response.content)


def save_new_raw_data():
    """
    Save new raw data from the source
    """

    create_folder_if_not_exists(raw_path)
    with tempfile.TemporaryDirectory() as dirpath:
        with ZipFile(
            source_path,
            "r",
        ) as zipfile:
            names_list = zipfile.namelist()
            csv_file_path = zipfile.extract(names_list[0], path=dirpath)
            # Open the CSV file in read mode
            with open(csv_file_path, mode="r", encoding="windows-1252") as csv_file:
                reader = csv.DictReader(csv_file)

                row = next(reader)  # Get first row from reader
                print("[Extract] First row example:", row)

                # Open the CSV file in write mode
                with open(
                    raw_path,
                    mode="w",
                    encoding="windows-1252"
                ) as csv_file:
                    # Rename field names so they're ready for the next step
                    fieldnames = {
                        "Date of Sale (dd/mm/yyyy)": "date_of_sale",
                        "Address": "address",
                        "Postal Code": "postal_code",
                        "County": "county",
                        "Price (€)": "price",
                        "Description of Property": "description",
                    }
                    writer = csv.DictWriter(csv_file, fieldnames=fieldnames)
                    # Write headers as first line
                    writer.writerow(fieldnames)
                    for row in reader:
                        # Write all rows in file
                        writer.writerow(row)

# Main function called inside the execute.py script
def main():
    print("[Extract] Start")
    print("[Extract] Downloading snapshot")
    ____
    print(f"[Extract] Saving data from '{source_path}' to '{raw_path}'")
    ____
    print(f"[Extract] End")

Parabéns por concluir a etapa Extract do seu pipeline ETL! Você baixou com sucesso o novo conjunto de dados, extraiu o conteúdo no formato CSV e alterou o cabeçalho para algo mais fácil de usar. E este é apenas o começo de uma jornada emocionante! Em seguida, você explorará como transformar e carregar os dados para que sejam diretamente acionáveis ​​pelos analistas. Você ainda terá a chance de usar o chapéu de Analista de Negócios e obter insights acionáveis ​​para seus acionistas no último capítulo! Por enquanto, mergulhe no próximo capítulo para transformar e limpar os dados extraídos e conclua a etapa Transform do pipeline ETL.

# Transform

In [49]:
# Import the function needed
from sqlalchemy import create_engine
from sqlalchemy.orm import Session

# Create the engine
engine = create_engine("postgresql+psycopg2://dcstudent:S3cretPassw0rd@localhost:5432/campdata-prod")

# Create the session
session = Session(engine)

ModuleNotFoundError: No module named 'sqlalchemy'

Legal! Você sabe como criar um mecanismo e usá-lo para criar uma sessão. Agora você está configurado e pronto para trabalhar e interagir com o banco de dados PostgreSQL!

In [None]:
# Imports 
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String


Base = declarative_base()

# Criando Classe com com a tabela 'ppr_raw_all' e as colunas e seus respectivos atributos. 
class PprRawAll(Base):
    __tablename__ = "ppr_raw_all"
    
    id = Column(Integer, primary_key=True)
    date_of_sale = Column(String(55))
    address = Column(String(255))
    postal_code = Column(String(55))
    county = Column(String(55))
    price = Column(String(55))
    description = Column(String(55))

In [None]:
# Import the submodule required
from datetime import datetime

def transform_case(input_string):
    """
    Lowercase string fields
    """
    # Return the string lowercase
    return input_string.lower()
  
def update_date_of_sale(date_input):
    """
    Update date format from DD/MM/YYYY to YYYY-MM-DD
    """
    # Create a datetime object
    current_format = datetime.strptime(date_input, "%d/%m/%Y")
    # Convert to the expected date format
    new_format = current_format.strftime("%Y-%m-%d")

    return new_format

Congratulations on creating your first two cleaning functions! You can now easily lower all strings and change date formats. Head over to the next exercise to expand the cleaning toolset with two additional cleaning functions.