In [4]:
pip install numpy pandas python-dotenv rich google-api-python-client google-auth-oauthlib google-auth-httplib2 tenacity openpyxl

Collecting openpyxl
  Using cached openpyxl-3.1.5-py2.py3-none-any.whl.metadata (2.5 kB)
Collecting et-xmlfile (from openpyxl)
  Using cached et_xmlfile-2.0.0-py3-none-any.whl.metadata (2.7 kB)
Using cached openpyxl-3.1.5-py2.py3-none-any.whl (250 kB)
Using cached et_xmlfile-2.0.0-py3-none-any.whl (18 kB)
Installing collected packages: et-xmlfile, openpyxl
[2K   [38;2;114;156;31m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2/2[0m [openpyxl]━━[0m [32m1/2[0m [openpyxl]
[1A[2KSuccessfully installed et-xmlfile-2.0.0 openpyxl-3.1.5
Note: you may need to restart the kernel to use updated packages.


In [6]:
#!/usr/bin/env python3
# -*- coding: utf-8 -*-

"""
Pipeline de Pedidos (Ubuntu): Sheets + Drive + Gmail
- Lê pedidos do Google Sheets
- Lê tabela de produtos (Drive: Google Sheet OU Excel)
- Calcula totais por cliente
- Envia e-mails personalizados via Gmail API

Segurança:
- OAuth tokens em ~/.config/pedidos-google/
- Escopos mínimos
- Logs estruturados
"""

import base64
import io
import os
import sys
import time
import locale
from dataclasses import dataclass
from email.message import EmailMessage
from typing import Optional, Tuple, List

import numpy as np
import pandas as pd
from tenacity import retry, wait_exponential, stop_after_attempt
from dotenv import load_dotenv
from rich.console import Console
from rich.table import Table

from google.oauth2.credentials import Credentials
from google_auth_oauthlib.flow import InstalledAppFlow
from googleapiclient.discovery import build
from googleapiclient.http import MediaIoBaseDownload
from google.auth.transport.requests import Request

# ---------- Config ----------
load_dotenv()
CFG_DIR = os.path.expanduser("~/.config/pedidos-google")  #caminho para a pasta onde os arquivos de segurança serão guardados
CLIENT_SECRET = os.path.join(CFG_DIR, "client_secret.json") #caminho completo para o client_secret.json
TOKEN_PATH    = os.path.join(CFG_DIR, "token.json") #caminho completo para o token.json

SCOPE_SHEETS = ["https://www.googleapis.com/auth/spreadsheets.readonly"]  #define permissão apenas para ler a planilha
SCOPE_DRIVE  = ["https://www.googleapis.com/auth/drive.readonly"] #define permissão apenas para ler o drive
SCOPE_GMAIL  = ["https://www.googleapis.com/auth/gmail.send"] #define permissão apenas para enviar e-mails


# CONFIGURAÇÕES DEFINIDAS NO ARQUIVO .ENV
SHEETS_PEDIDOS_ID   = os.getenv("SHEETS_PEDIDOS_ID") #ID da planilha onde estão os pedidos, encontrado na url da planilha
SHEETS_PEDIDOS_RANGE= os.getenv("SHEETS_PEDIDOS_RANGE", "A:Z") #define o intervalo de celulas que será lido na planilha de pedidos
DRIVE_PROD_FOLDER_ID= os.getenv("DRIVE_PRODUTOS_FOLDER_ID") #Define o ID de uma pasta dentro do drive, utilizado para aumenta a eficiencia do script,
# pois irá buscar apenas na pasta em especifico
PRODUTOS_FILENAME   = os.getenv("PRODUTOS_FILENAME", "").strip() #nome do arquivo que contem a lista de produtos
PRODUTOS_SHEET_RANGE= os.getenv("PRODUTOS_SHEET_RANGE", "A:Z") #o intervalo de celulas na planilha de produtos
SENDER_EMAIL        = os.getenv("SENDER_EMAIL") #define o email que será utlizado para realizar os envios
USE_PLUS            = os.getenv("USE_PLUS_ADDRESSING", "true").lower() == "true"

# Localização monetária (pt_BR). Se indisponível, cai no formato padrão.
try:
    locale.setlocale(locale.LC_ALL, "pt_BR.UTF-8")
except locale.Error:
    pass

console = Console()

@dataclass
class Services:
    sheets: any
    drive: any
    gmail: any

#função para obter o token de autenticação
def _ensure_creds(scopes: List[str]) -> Credentials:
    if not os.path.exists(CLIENT_SECRET): #verifica se o arquivo json, a identidade do app existe
        raise FileNotFoundError(f"Credenciais OAuth não encontradas em {CLIENT_SECRET}") #se não existir ele retorna um erro
    creds = None
    if os.path.exists(TOKEN_PATH): #verifica se o arquivo token.json existe, se existir significa que o usuario ja autorizou o app antes
        creds = Credentials.from_authorized_user_file(TOKEN_PATH, scopes) #tenta carregar as infos do arquivo e verifica se as permissoes coincidem com as solicitadas
    if not creds or not creds.valid: #verifica se o arquivo não existe ou se as credenciais ainda são invalidas
        if creds and creds.expired and creds.refresh_token: #verifica se a credencial existe, verifica se está expirada e verifica se ela possui um refresh_token.
            creds.refresh(Request())  #se todas as validações forem verdadeiras ele usa o metodo refresh, que pega o refresh_token, envia para os servidores da google, se este refresh_token for valido, recebe um novo access token 
        else: #este é o fluxo de primeira autorização
            flow = InstalledAppFlow.from_client_secrets_file(CLIENT_SECRET, scopes) #cria o objeto flow, que gerencia o processo de autenticação, usando o client_secret.json para saber qual app esta solicitando autorização
            creds = flow.run_local_server(port=0) #este é o metodo que inicia um servidor web temporario no computador, abre o navegador e solicita o acesso
        os.makedirs(CFG_DIR, exist_ok=True) #verifica se o diretorio existe antes de tentar escrever
        with open(TOKEN_PATH, "w") as f: #abre o arquivo token.json no mode de escrita
            f.write(creds.to_json()) #o metodo .to_json converte o objeto de credenciais que possuo o access token e o refresh token para um arquivo json, que é salvo no arquivo
    return creds


def get_services() -> Services: #define a função e promete retornar um objeto Services, definido anteriormente
    # É comum criar tokens separados por escopo. Aqui usamos um token abrangente único.
    scopes = list(set(SCOPE_SHEETS + SCOPE_DRIVE + SCOPE_GMAIL)) #junta todas as permissões
    creds = _ensure_creds(scopes) #obtem o objeto de credencial
    return Services( #monta o objeto de clientes da api com a função build
        sheets=build("sheets", "v4", credentials=creds), #A função build espera três argumentos, primeiro o nome da api desejada, segundo a versão da api e por ultimo as credenciais
        drive=build("drive", "v3", credentials=creds),
        gmail=build("gmail", "v1", credentials=creds),
    )

def read_sheet_df(svc_sheets, spreadsheet_id: str, rng: str) -> pd.DataFrame:
    resp = svc_sheets.spreadsheets().values().get(
        spreadsheetId=spreadsheet_id, range=rng
    ).execute() # esta é uma chamada encadeada para a api do google, .spreadsheets acessa o recurso de planilhas, .values especifica que estamos interessados nos valores das celulas,
    # .get indica que a operação que queremos fazer é de leitura,os argumentos spreadsheetId e range dizem exatamente oque ler e por fim o .execute é oque envia a requisição para o goolgle
    values = resp.get("values", []) #O resp é um dicionario que não retorna somente os valor da celula, mas retorna outras coisas tambem, então chamamos a chave values para pegar somente os valores
    if not values: # se não tiver nenhum valor ele retorna um dataframe vazio
        return pd.DataFrame()
    # Primeira linha vira cabeçalho
    df = pd.DataFrame(values[1:], columns=values[0]) # esta é um forma de criar um df onde os valores da primeira linha viram colunas e o restante completa as linhas do df
    return df

def drive_find_file(drive, name: str, parent_folder_id: Optional[str]) -> Optional[dict]: #os parametros solicitam o objeto de serviço autenticado para o google drive, o nome do arquivo e o id da pasta de forma opcional
    q = []
    if name:
        q.append(f"name = '{name.replace("'", "\\'")}'") #adiciona o criterio do nome para a busca na api, caso o nome possua '
        
    if parent_folder_id:
        q.append(f"'{parent_folder_id}' in parents")
        
    q.append("trashed = false") # Adiciona um filtro para garantir que não estamos buscando arquivos na lixeira
    query = " and ".join(q) #junta toda a consulta em uma unica string unindo os elementos da lista q com o separador and
    resp = drive.files().list(q=query, fields="files(id, name, mimeType)").execute() #Executa a busca na API do Google Drive, o fields filtra oque a api do google retorna
    files = resp.get("files", []) #extrai a lista de arquivos do dicionario resp
    return files[0] if files else None #retorna o primeiro elemento da lista e se estiver vazio retorna None

def download_drive_file(drive, file_id: str) -> bytes: #recebe a api do drive e o id do arquivo
    request = drive.files().get_media(fileId=file_id) #solicita o binario de um arquivo com o get_media
    fh = io.BytesIO() #cria um buffer de bytes na memoria
    downloader = MediaIoBaseDownload(fh, request) # MediaIoBaseDownload é uma classe da biblioteca do google criada para realizar downloads de forma eficiente, fh é o destino do download e request é a origem
    done = False
    while not done:
        status, done = downloader.next_chunk() #aqui dentro do loop o next_chunk baixa o proximo pedaço do arquivo o salva no fh, o status recebe a porcentagem do download e done fica true apos o ultimo pedaço ser baixado
    return fh.getvalue() #Este método extrai a sequência completa de bytes que foi acumulada no buffer BytesIO e a retorna

def read_produtos_df(services: Services) -> Tuple[pd.DataFrame, bool]:
    """
    Retorna (df_produtos, is_google_sheet).
    Se PRODUTOS_FILENAME estiver vazio, assumimos que Produtos também é uma aba no mesmo spreadsheet dos pedidos.
    """
    if not PRODUTOS_FILENAME:
        # Ler na mesma planilha de pedidos (aba Produtos)
        df = read_sheet_df(services.sheets, SHEETS_PEDIDOS_ID, PRODUTOS_SHEET_RANGE)
        return df, True
    # Procurar arquivo no Drive
    meta = drive_find_file(services.drive, PRODUTOS_FILENAME, DRIVE_PROD_FOLDER_ID)
    if not meta:
        raise FileNotFoundError(f"Arquivo '{PRODUTOS_FILENAME}' não encontrado no Drive/pasta indicada.") #lança um erro se nenhum arquivo com o nome especificado for encontrado
    if meta["mimeType"] == "application/vnd.google-apps.spreadsheet":
        # É Google Sheet: ler via Sheets API
        # Precisaremos do spreadsheetId -> o id do arquivo funciona
        df = read_sheet_df(services.sheets, meta["id"], PRODUTOS_SHEET_RANGE)
        return df, True
    else:
        # É binário (ex.: Excel): baixar e ler com pandas
        content = download_drive_file(services.drive, meta["id"])
        df = pd.read_excel(io.BytesIO(content))
        return df, False

def format_currency(value: float) -> str:
    try:
        return locale.currency(value, grouping=True)
    except Exception:
        return f"R$ {value:,.2f}".replace(",", "X").replace(".", ",").replace("X", ".")

def plus_address(addr: str) -> str:
    if not USE_PLUS:
        return addr
    at = addr.find("@")
    if at <= 0:
        return addr
    return f"{addr[:at]}+cliente{addr[at:]}"

def build_email(to_addr: str, subject: str, body_text: str, sender: str) -> dict:
    msg = EmailMessage()
    msg["To"] = to_addr
    msg["From"] = sender
    msg["Subject"] = subject
    msg.set_content(body_text)
    raw = base64.urlsafe_b64encode(msg.as_bytes()).decode("utf-8")
    return {"raw": raw}

@retry(wait=wait_exponential(multiplier=1, min=2, max=30), stop=stop_after_attempt(5))
def gmail_send_message(gmail, user_id: str, message: dict) -> dict:
    return gmail.users().messages().send(userId=user_id, body=message).execute()

def main() -> int:
    if not SHEETS_PEDIDOS_ID:
        console.print("[red]Configure SHEETS_PEDIDOS_ID no .env[/red]")
        return 2

    services = get_services()

    # 1) Leitura dos pedidos
    df_ped = read_sheet_df(services.sheets, SHEETS_PEDIDOS_ID, SHEETS_PEDIDOS_RANGE) #realiza a leitura da planilha de pedidos
    if df_ped.empty:
        console.print("[yellow]Nenhum pedido encontrado.[/yellow]")
        return 0

    # Normalizações: remover primeira coluna se for timestamp e garantir coluna de e-mail
    first_col = df_ped.columns[0]
    # Heurística: muitas respostas de formulário têm timestamp na primeira coluna
    if "endereço de e-mail" not in first_col.lower():
        # manter timestamp em backup se quiser auditar
        df_ped = df_ped.drop(columns=[first_col], errors="ignore")

    # Nome da coluna de e-mail (compatível com seu script)
    email_col = None
    for c in df_ped.columns:
        if "endereço de e-mail" in c.lower():
            email_col = c
            break
    if not email_col:
        raise ValueError("Não encontrei coluna de e-mail no Sheet de pedidos.")

    df_ped = df_ped.fillna(0)

    #2) Produtos
    df_prod, is_sheet = read_produtos_df(services)
    if df_prod.empty:
        raise ValueError("Tabela de produtos está vazia.")
   
    eMailsClientesVetor=np.array(df_ped['Endereço de e-mail']);
    arrayPedido=df_ped.to_numpy()
    arrayProdutos=df_prod.to_numpy()
    
    return
    #3) Envio de e-mails
    enviados = 0
    table = Table(title="Resumo de Cobranças")
    table.add_column("Cliente", justify="left")
    table.add_column("Total", justify="right")
    console.print()

    for idx, row in df_ped.iterrows():
        to_raw = str(row[email_col]).strip() # Pega o endereço de e-mail da linha atual e remove quaisquer espaços em branco do inicio ou final da string
        if not to_raw or "@" not in to_raw: # Se o to_raw estiver vazio, ou não tiver o @ ele não continua a execução
            continue
        to_addr = plus_address(to_raw)

        total_cobranca = 0
        linhas = ["Saudações!", "Seu pedido está sendo atendido:"] # Este é o array de linhas, cada item representa oque será uma linha no corpo do e-mail
        for j in range(len(arrayPedido[idx]) -1):
            try:
                arrayPedido[idx][j+1] = int(arrayPedido[idx][j+1])
            except:
                arrayPedido[idx][j+1] = 0
            if arrayPedido[idx][j+1] > 0 and j < len(arrayPedido[idx]):
                totalProduto = arrayPedido[idx][j+1]*arrayProdutos[j][4]
                linhas.append(f"Produto: {arrayProdutos[j][0]} | Quantidade: {arrayPedido[idx][j+1]} | Valor: {totalProduto:,.2f}")
                total_cobranca += totalProduto
        linhas.append("")
        linhas.append(f"Total a pagar: {format_currency(float(total_cobranca))}")

        subject = f"Confirmação de pedido para: {to_raw}"
        body = "\n".join(linhas)
        msg = build_email(to_addr, subject, body, SENDER_EMAIL)

        try:
            gmail_send_message(services.gmail, "me", msg)
            enviados += 1
        except Exception as e:
            console.print(f"[red]Falha ao enviar para {to_addr}: {e}[/red]")

        # Opcional: pequena pausa educada (cotas/antispam)
        time.sleep(0.2)

        table.add_row(to_addr, format_currency(float(total_cobranca)))

    console.print(table)
    console.print(f"[green]E-mails enviados:[/green] {enviados}")

    return 0

if __name__ == "__main__":
    try:
        sys.exit(main())
    except KeyboardInterrupt:
        sys.exit(130)


SystemExit: 0

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
