In [1]:
# -*- coding: utf-8 -*-

import os
import json
import pika
import logging
import pymongo
import pandas as pd
from datetime import datetime
from dotenv import load_dotenv
from azure.storage.blob import BlobClient

In [2]:
logging.basicConfig(
    format='%(asctime)s %(levelname)s %(message)s',
    level=logging.INFO,
    datefmt='%Y-%m-%d %H:%M:%S'
)

load_dotenv()

True

In [3]:
# Realizando conexão com o banco
if os.getenv("MONGO_CONN_STR") is None:
    logging.error("Unable to get environment variables.")
    # exit()

client = pymongo.MongoClient(
    os.getenv("MONGO_CONN_STR"), serverSelectionTimeoutMS=5000)

try:
    client.server_info()
except Exception:
    logging.error("Unable to connect to Mongo server.")
    # exit()

database = client.get_database()

In [4]:
# Extraindo coleções
cart_collection = database.get_collection("carts")
demand_collection = database.get_collection("demands")


In [5]:
# Extraindo demandas que fecham na data de hoje
today = datetime.today()
tomorrow = today.replace(day=today.day + 1, hour=0,
                         minute=0, second=0, microsecond=0)
today = today.replace(hour=0, minute=0, second=0, microsecond=0)

demands = demand_collection.find({
    "end_date": {"$gte": today, "$lt": tomorrow}
})

closed_demands = [str(demand["_id"]) for demand in demands]
if len(closed_demands) == 0:
    logging.info("There's no demand closing today!")
    # exit()

In [6]:
# Extraindo carrinhos fechados das demandas fechadas
carts = cart_collection.find({
    "state" : { "$eq" : "closed"},
    "demand_id": { "$in": closed_demands}
  })

In [7]:
# Transformando as coleções em dataframes
df_carts = pd.DataFrame(list(carts))

logging.info(str(len(df_carts)) + " orders for today.")

2023-05-03 12:25:10 INFO 3 orders for today.


In [8]:
# Pré-processamento
df_carts = df_carts.rename(columns={'_id': 'cart_id'})

In [9]:
# Expandindo a coluna de produtos
df_exploded = df_carts.explode(column="products")

In [10]:
# Extraindo atributos dos produtos e colocando-os em outras colunas
def getAtrr(row, atrribute):
  rowDict = dict(row)
  return rowDict[atrribute]

df_exploded["product_id"] = df_exploded["products"].apply(lambda x: getAtrr(x, "_id"))
df_exploded["quantity"] = df_exploded["products"].apply(lambda x: getAtrr(x, "quantity"))

In [11]:
# Montando o nome final do produto e adicionando-o na coluna descrição
def getProductDesc(row):
  measurementsList = list(row["products"]["measurements"])
  newList = []
  for m in measurementsList:
    newList.append(" ".join(list(m.values())))

  normsList = list(row["products"]["norms"])
  
  return row["products"]["name"] + " " + " ".join(newList) + " " + " ".join(normsList)
  
df_exploded["description"] = df_exploded.apply(lambda x: getProductDesc(x), axis=1)

In [12]:
# Exportando o dataframe final para planilha em Excel na Azure
def uploadToStorage(file_name):
    try:
        blob = BlobClient.from_connection_string(conn_str=os.getenv(
            "AZURE_CONN_STR"), container_name="cisab-consolidados", blob_name=file_name)
    except:
        logging.error("Unable to connect with Blob Storage.")
        # exit()

    try:
        with open(file_name, "rb") as data:
            blob.upload_blob(data)
    except:
        logging.error("Unable to upload the file to Blob Storage.")
        # exit()

In [13]:
# Enviando evento para o RabbitMQ mandar o email
def createAndSendEvent(connection, file_name):
    channel = connection.channel()

    to = os.getenv("RABBITMQ_TO")
    event = {
        "pattern": "send_email",
        "data": {
        "message": {
            "to": to,
            "subject": "Consolidado de pedidos",
            "body": "A demanda fechou e você pode baixar o consolidado de pedidos pelo link: <a href='" + os.getenv("AZURE_BLOB_STORAGE") + file_name + "'>clique aqui</a>"
        }
        }
    }

    channel.queue_declare(queue='notifier')
    channel.basic_publish(exchange='',
                        routing_key='notifier',
                        body=json.dumps(event, ensure_ascii=False))

    logging.info(f"Email has been sent to {to}")

    channel.close()
    
def sendEmail(file_name):
    try:
        credentials = pika.PlainCredentials(os.getenv("RABBITMQ_USER"),
                                            os.getenv("RABBITMQ_PASSWORD"))
        connection = pika.BlockingConnection(
            pika.ConnectionParameters(os.getenv("RABBITMQ_CONN_STR"),
                                    int(os.getenv("RABBITMQ_PORT")),
                                    '/',
                                    credentials))
        
        createAndSendEvent(connection, file_name)

    except:
        logging.error("Unable to connect with RabbitMQ.")
        # exit()

In [14]:
# Gerando a planilha a partir do dataframe final
def generateSheet(df, demand_id):
    df_demand = df[df.demand_id == demand_id].reset_index()
    
    if(len(df_demand) > 0):
        df_pivot = df_demand.pivot_table(index="Produto", values="Quantidade", columns="Município / Autarquia")
        df_pivot["Total"] = df_pivot.sum(axis=1)
        df_pivot = df_pivot.sort_values(by="Produto")

        format_data = "%Y-%m-%d %H-%M-%S"
        file_name = datetime.strftime(datetime.today(), format_data) + \
            " " + df_demand.loc[0, "demand_name"] + ".xlsx"
        df_pivot.to_excel(file_name)

        uploadToStorage(file_name)
        sendEmail(file_name)
    else:
        logging.warning("There weren't any orders for " + demand_id)

In [15]:
# Pós-processamento
newNames = {"county_name": "Município / Autarquia", "description": "Produto", "quantity": "Quantidade"}
df = df_exploded.rename(columns=newNames)

In [16]:
# Executando processos finais para cada demanda fechada no dia
for demand in closed_demands:
    generateSheet(df, demand)

2023-05-03 12:25:11 INFO Request URL: 'https://catalogv2.blob.core.windows.net/cisab-consolidados/2023-05-03%2012-25-10%20El%C3%A9trico%202023.xlsx'
Request method: 'PUT'
Request headers:
    'Content-Length': '5054'
    'x-ms-blob-type': 'REDACTED'
    'If-None-Match': '*'
    'x-ms-version': 'REDACTED'
    'Content-Type': 'application/octet-stream'
    'Accept': 'application/xml'
    'User-Agent': 'azsdk-python-storage-blob/12.14.1 Python/3.10.5 (Windows-10-10.0.19044-SP0)'
    'x-ms-date': 'REDACTED'
    'x-ms-client-request-id': 'b1bfcc43-e9c6-11ed-8c7e-3052cb8340ee'
    'Authorization': 'REDACTED'
A body is sent with the request
2023-05-03 12:25:11 INFO Response status: 201
Response headers:
    'Content-Length': '0'
    'Content-MD5': 'REDACTED'
    'Last-Modified': 'Wed, 03 May 2023 15:25:10 GMT'
    'ETag': '"0x8DB4BEA958E6CAE"'
    'Server': 'Windows-Azure-Blob/1.0 Microsoft-HTTPAPI/2.0'
    'x-ms-request-id': 'ef8a28a5-e01e-0000-3bd3-7d5baa000000'
    'x-ms-client-request-id'