# Projeto CEASA

## Objetivo

Facilitar o acesso aos dados do Centrais de Abastecimento do Estado do Rio de Janeiro S.A (CEASA-RJ) por meio de um processo automatizado de coleta, transformação e análise de dados.

! [diagrama de fluxo do projeto: coleta, transformação, armazenamento e visualização](./img/Diagram.png)

## Motivação

A necessidade de disponibilizar informações do CEASA-RJ de maneira mais acessível, transformando dados presentes em PDFs em formatos utilizáveis para análise e insights mais rápidos.

## Etapas do Projeto
1. Coleta de Dados
2. Transformação de Dados
3. Armazenamento
4. Análise e Visualização

# 0. configurações inicia

In [38]:
!source .venv/bin/activate
# %pip install -r minimal.requirements.txt # type 

In [145]:
# # carregando variaveis do embiente 
# from dotenv import load_dotenv, find_dotenv
# load_dotenv(".env")
# ceasa_lista_pdf

# variaeis de embiente

URL_CEASA   = 'https://www.ceasa.rj.gov.br'
URL_CEASA_cotacao   = URL_CEASA + '/Cota%C3%A7%C3%A3o'

PASTA_PDFs  = "./pdfs/"
PASTA_DADOS = "./dados/"

ceasa_lista_pdf = PASTA_DADOS + "ceasa_lista_pdf.parquet"

In [57]:
# barra de progresso
# def print_percent_done(index, total, progress_state=None, bar_len=15, title='Processando'):
    # # font:https://stackoverflow.com/questions/6169217/replace-console-output-in-python
    # '''
    # index is expected to be 0 based index. 
    # 0 <= index < total
    # '''
    # percent_done = (index+1)/total*100
    # percent_done = round(percent_done, 1)

    # done = round(percent_done/(100/bar_len))
    # togo = bar_len-done

    # done_str = '█'*int(done)
    # togo_str = '░'*int(togo)

    
    # if round(percent_done) != 100:
    #     progress_state = f"\t {index}/{total}status: {progress_state}" if progress_state else ''

    #     print(f'⏳{title}: [{done_str}{togo_str}] {percent_done}% done.{progress_state}', end='\r')

    # else:
    #     print('\t 100% ✅')



# 1. Coletar Dados

consiste nas etapas:
1. Encontar as urls dos documentos pdf
2. com a url do documento, extrair nome e data do documento
3. Baixar e armazenar url, nome do documento e data em uma base de dados

### Coleta linear

In [60]:
import requests
from bs4 import BeautifulSoup

import re #regex
from datetime import datetime as dt
import pandas as pd

def foi_extraida(url):
	"""
	verifica se uma url ja está contida na base de dados.
	"""
	return True if url in((df_ceasa_lista_pdf['URL'].eq(url))) else False

def pegar_links(url):
	reqs = requests.get(url)
	soup = BeautifulSoup(reqs.text, 'html.parser')

	for link in soup.select('#main a'):
		a = str(link.get('href'))
		if ".pdf" in a:
			a = URL_CEASA + a
			if a in urls_na_base:
				continue
			urls.append(a)

		else:
				pegar_links(a)

def extrair_dados_url(url):
	# pega nome do arquivo na URL
	nome_arquivo = url.split("/")[-1]
	nome_arquivo = requests.utils.unquote(nome_arquivo) # type: ignore

	# padrao de dd mm yyyy para data
	matches = re.findall(r'(\d{2})\s(\d{2})\s(\d{4})', nome_arquivo)
	data = dt.strptime("/".join(matches[0]), "%d/%m/%Y")

	return url,"CEASA-RJ_" + nome_arquivo.replace(" ", "_") ,data.strftime("%d-%m-%Y")

def pegar_pdf(url,nome_arquivo):
	# conferir se ja está na base local
	response = requests.get(url)

	if response.status_code == 200:
		with open(PASTA_PDFs+nome_arquivo, "wb") as f:
			f.write(response.content)
		
		return True

def atualizar_csv(url, nome_arquivo,data):
	data_to_file = {
		'URL': [url],
		'NOME_ARQUIVO': [nome_arquivo],
		'DATA': [data]
	} 
	df = pd.DataFrame(data_to_file)
	# df.parquer(ceasa_lista_pdf, mode='a', index=False, header=False)
	df.to_parquet(ceasa_lista_pdf, index=False, engine='fastparquet', append=True)


def atualizar_base_pdf():
	# atualiza as urls de novos PDFf
	pegar_links(URL_CEASA_cotacao)
	
	# Coleta pdf, armazena o arquivo e atualiza a base 
	for index, url in enumerate(urls):
		print(f"{index} de {len(urls)}",end="\t\t\r")

		# if foi_extraida:
		# 	continue
		
		url, nome_arquivo, data = extrair_dados_url(url)
		pegar_pdf(url,nome_arquivo)
		atualizar_csv(url,nome_arquivo,data)
	


try:
	df_ceasa_lista_pdf = pd.read_parquet(ceasa_lista_pdf)
except:
	print(f'criando arquivo: {ceasa_lista_pdf}')

	df_tmp = pd.read_parquet(ceasa_lista_pdf, names = ["URL","NOME_ARQUIVO","DATA"])
	df_tmp.to_parquet(ceasa_lista_pdf,index=False)

df_ceasa_lista_pdf = pd.read_parquet(ceasa_lista_pdf)

urls_na_base = df_ceasa_lista_pdf["URL"].tolist()
urls = []

atualizar_base_pdf()

0


# 2. Transformando Dados

A parte mais deliada é garantir a consistencia dos dados extraidos dos documentos pdf. 

##### Compressão paralelizada (muito custoso)

In [None]:
import PyPDF2
import concurrent.futures

def compress_pdf(input_path, output_path):
    with open(input_path, 'rb') as file:
        pdf_reader = PyPDF2.PdfReader(file)
        pdf_writer = PyPDF2.PdfWriter()

        for page_num in range(len(pdf_reader.pages)):
            page = pdf_reader.pages[page_num]
            page.compress_content_streams()
            pdf_writer.add_page(page)

        with open(output_path, 'wb') as output_file:
            pdf_writer.write(output_file)

def process_pdf(file_name):
    print(f'arquivo atual: {file_name}')
    compress_pdf(PASTA_PDFs + file_name, PASTA_PDFs + file_name)

# Sua lista de arquivos
files_to_process = df_extração["nome_arquivo"].tolist()

# Número máximo de threads (ajuste conforme necessário)
max_threads = 4

# Usando ThreadPoolExecutor para paralelizar
with concurrent.futures.ThreadPoolExecutor(max_threads) as executor:
    futures = [executor.submit(process_pdf, file_name) for file_name in files_to_process]

    # Esperar que todas as threads concluam
    for future in concurrent.futures.as_completed(futures):
        count += 1 # type: ignore
        print_percent_done(count, len(files_to_process), future.result()) # type: ignore


### Lendo Tabelas 

Apartir de 01-03-2023 ouver mudança na formatação da tabela.
trataremos as duas versões de PDF:

In [None]:
import pandas as pd
import pdfplumber

def extrair_tabela_pdf(pdf_path, pagina=0):
    # Abre o PDF com pdfplumber
    with pdfplumber.open(pdf_path) as pdf:
        # Seleciona a página desejada
        page = pdf.pages[pagina]
        
        # Extrai a tabela como uma lista de dicionários
        table_data = page.extract_table()

        # Converte a lista de dicionários para um DataFrame do pandas
        df = pd.DataFrame(table_data)

    return df


fazendo tratamento na tabela 1 do pdf modelo 2

In [None]:
# Cabeçalho
header = ['PRODUTOS', 'TIPO', 'UNIDADE EMBALAGEM', 'VARIAÇÃO ULTIMOS 12 MESES', 'MIN', 'MODAL', 'MAX']

#c Classes de alimentos
classes = ['1. FRUTAS NACIONAIS',
          '2. FRUTAS IMPORTADAS',
          '3. HORTALIÇAS FRUTO',
          '4. HORTALIÇAS FOLHA, FLOR E HASTE',
          '5. HORTALIÇAS RAIZ, BULBO,TUBÉRCULO E RIZOMA',
          '6. OVOS',
          '7. PEIXE']



##### tramento PDF v2

In [147]:
import pandas as pd

# tratamendo geral pdf v2
def tratamento_1_tbv2(df):
    # define cabeçalhos
    df.columns = header
    
    # Remove primeiras linhas
    df = df.iloc[2: , :]
    
    # Adiciona colula classe
    df.loc[:, 'CLASSE'] = ''

    return df

def tratamento_2_tbv2(df, arquivo='', data_pdf = '', url='' ):
    # preencher class
    df.reset_index(drop=True, inplace=True)

    # Colulas de arquivo e data
    df['DATA'] = data_pdf
    df['ARQUIVO'] = arquivo
    df['URL'] = url
    
    # preenche classe
    def preenche_classe(df):
        df = df.reset_index(drop=True)    
        
        current_class = ''  # Para rastrear a classe atual
        for index, row in df.iterrows():
            if row['PRODUTOS'] in classes:
                current_class = row['PRODUTOS']
            df.at[index,'CLASSE'] = current_class
    
        df = df[df['PRODUTOS'] != df['CLASSE']]
        df = df.reset_index(drop=True)    
    
        return df
    
    df = preenche_classe(df)

    
    # Replace all instances of "," with "." in columns: 'MIN', 'MODAL', 'MAX'
    df['MIN'] = df['MIN'].str.replace(",", ".", case=False, regex=False)
    df['MODAL'] = df['MODAL'].str.replace(",", ".", case=False, regex=False)
    df['MAX'] = df['MAX'].str.replace(",", ".", case=False, regex=False)

    # Replace all instances of "" with "0" in columns: 'MIN', 'MODAL', 'MAX'
    df.loc[df['MIN'].str.lower() == "".lower(), 'MIN'] = "0"
    df.loc[df['MODAL'].str.lower() == "".lower(), 'MODAL'] = "0"
    df.loc[df['MAX'].str.lower() == "".lower(), 'MAX'] = "0"

    # Função para limpar os valores da coluna
    def limpar_valor(valor):
        # Remover símbolos duplicados
        valor = ''.join(ch for ch, _ in zip(valor, valor[1:] + '\0') if ch != _)
        # Remover espaços em branco duplicados
        valor = ' '.join(valor.split())
        return valor

    # Aplicar a função de limpeza à coluna
    df['MIN'] = df['MIN'].apply(limpar_valor)
    df['MAX'] = df['MAX'].apply(limpar_valor)
    df['MODAL'] = df['MODAL'].apply(limpar_valor)

    # df = df.applymap(lambda x: limpar_valor(x))  # Remove espaços no início e no final
    df = df.map(lambda x: str(x).strip())  # Remove espaços no início e no final
    df = df.map(lambda x: ' '.join(str(x).split()))  # Remove espaços duplicados


    # Change column type to float32 for columns: 'MIN', 'MODAL', 'MAX'
    df = df.astype({'MIN': 'float32', 'MODAL': 'float32', 'MAX': 'float32'})

    # Replace all instances of "S/C" with "0" in column: 'VARIAÇÃO ULTIMOS 12 MESES'
    df['VARIAÇÃO ULTIMOS 12 MESES'] = df['VARIAÇÃO ULTIMOS 12 MESES'].str.replace("S/C", "0", case=False, regex=False)

    # Replace all instances of "" with "" in column: 'VARIAÇÃO ULTIMOS 12 MESES'
    df['VARIAÇÃO ULTIMOS 12 MESES'] = df['VARIAÇÃO ULTIMOS 12 MESES'].str.replace("", "", case=False, regex=False)

    df['VARIAÇÃO ULTIMOS 12 MESES'] = df['VARIAÇÃO ULTIMOS 12 MESES'].str.replace('%', '')  # Remover o símbolo de percentagem
    df['VARIAÇÃO ULTIMOS 12 MESES'] = df['VARIAÇÃO ULTIMOS 12 MESES'].str.replace(',', '.')  # Remover o símbolo de percentagem
    df['VARIAÇÃO ULTIMOS 12 MESES'] = pd.to_numeric(df['VARIAÇÃO ULTIMOS 12 MESES']) / 100  # Converter para float e dividir por 100
    df = df.astype({'VARIAÇÃO ULTIMOS 12 MESES': 'float32'})


    # Convert text to uppercase in columns: 'PRODUTOS', 'TIPO'
    df['PRODUTOS'] = df['PRODUTOS'].str.upper()
    df['TIPO'] = df['TIPO'].str.upper()

    # Change column type to string for columns: 'PRODUTOS', 'TIPO'
    df = df.astype({'PRODUTOS': 'string', 'TIPO': 'string','UNIDADE EMBALAGEM': 'string'})

    # Convert text to uppercase in column: 'UNIDADE EMBALAGEM'
    df['UNIDADE EMBALAGEM'] = df['UNIDADE EMBALAGEM'].str.upper()

    # Replace all instances of "," with "." in column: 'UNIDADE EMBALAGEM'
    df['UNIDADE EMBALAGEM'] = df['UNIDADE EMBALAGEM'].str.replace(",", ".", case=False, regex=False)

    # Round columns 'VARIAÇÃO ULTIMOS 12 MESES', 'MIN' and 2 other columns (Number of decimals: 4)
    df = df.round({'VARIAÇÃO ULTIMOS 12 MESES': 4, 'MIN': 2, 'MODAL': 2, 'MAX': 2})

    # Change column type to datetime64[ns] for column: 'DATA
    df = df.astype({'CLASSE': 'string', 'ARQUIVO': 'string', 'URL': 'string','DATA': 'datetime64[ns]'})

    # Sort by column: 'DATA' (descending)
    df = df.sort_values(['DATA'], ascending=[False])

    df = df.reset_index(drop=True)

    return df


def pdf_v2_para_tabela(pdf_v2, data=None, url=''):        
    # Compreensão de lista para criar a lista de dataframes
    list_df_tratamento_1_tbv2 = [ tratamento_1_tbv2( extrair_tabela_pdf(PASTA_PDFs + pdf_v2, i)) for i in range(5) ]

    # Concatenar os dataframes da lista
    df = pd.concat(list_df_tratamento_1_tbv2.copy())
    return tratamento_2_tbv2(df,pdf_v2,data,url) # type: ignore


df = pdf_v2_para_tabela('CEASA-RJ_Boletim_diário_de_preços__31_07_2023.pdf')
df

ValueError: Unable to parse string "#VALOR!" at position 150

In [143]:
from datetime import datetime as dt
import os


# Suprime os avisos SettingWithCopyWarning
pd.options.mode.chained_assignment = None



def extrair_tb_pdf_V2():
    # comparar com
    df_pdfs_v2 = filtar_para_novos_pdf()
    # df_pdfs_v2 = pd.read_parquet(ceasa_lista_pdf)
    
    df_pdfs_v2 = df_pdfs_v2[df_pdfs_v2['DATA']  >= '2023-03-01']
    df_pdfs_v2 = df_pdfs_v2[:]
    
    len = df_pdfs_v2.shape[0]
    lista_pdf_v2 = []
    try:
        for index, row in df_pdfs_v2.iterrows():
            arquivo = row['NOME_ARQUIVO']

            data = row['DATA']
            url = row['URL']
            
            print(f"{index} de {len}", data, end="\t\t\r")

            df_tmp = pdf_v2_para_tabela(
                arquivo, data, url)

            lista_pdf_v2.append(df_tmp)
    except:
        print("erro")

    finally:
        df = pd.concat(lista_pdf_v2)
        return df

def filtar_para_novos_pdf():

    df_pdfs_v2 = pd.read_parquet(ceasa_lista_pdf)
    df_dados_pdfs_v2 = PASTA_DADOS + "dados_pdfs_v2.parquet"
    
    if os.path.exists(df_dados_pdfs_v2):

        df_pdfs_v2 = pd.read_parquet(ceasa_lista_pdf)

        df_dados_pdfs_v2 = pd.read_parquet(PASTA_DADOS + "dados_pdfs_v2.parquet")
        df_dados_pdfs_v2 = df_dados_pdfs_v2.groupby(['DATA']).agg(C=('DATA', 'first')).reset_index()

        df_pdfs_v2 = df_pdfs_v2[~df_pdfs_v2['DATA'].isin(df_dados_pdfs_v2['DATA'])]

        # df_dados_pdfs_v2 = pd.read_parquet(df_dados_pdfs_v2)
        # # df_dados_pdfs_v2 = df_dados_pdfs_v2["URL"].drop_duplicates()
        # print(df_pdfs_v2.shape[0]," / ",df_dados_pdfs_v2.shape[0])

        # df_pdfs_v2=pd.merge(df_pdfs_v2,df_dados_pdfs_v2,on='DATA', how='left')

        # # df_pdfs_v2 = df_pdfs_v2[~df_pdfs_v2['URL'].isin(df_dados_pdfs_v2)]

    return df_pdfs_v2


df = extrair_tb_pdf_V2()

try:
    # df.to_csv(PASTA_DADOS + 'dados_pdfs_v2.csv',index=False, mode='a')
    df.to_parquet(PASTA_DADOS + 'dados_pdfs_v2.parquet',index=False,engine='fastparquet', append=True)
except:
    # df.to_csv(PASTA_DADOS + 'dados_pdfs_v2.csv',index=False)
    df.to_parquet(PASTA_DADOS + 'dados_pdfs_v2.parquet',index=False)



erroe 103 2023-07-31 00:00:00		


ValueError: No objects to concatenate