In [17]:
import os
import requests
import json
import datetime

from math import ceil
from copy import deepcopy

total_rows = []

def update_from_date(start_date: datetime.date) -> None:
    today = datetime.datetime.today()

    if start_date == today:
        print("All available data is already collected")
        return False
    end_date = get_end_date(start_date, today)

    print(f"data requested for range {start_date} - {end_date}")
    server_data = get_server_data(start_date, end_date)

    print("data collected, organizing data...")
    rows = format_server_data(server_data, start_date)
    if rows:
        print("data organized, updating db...")
        total_rows.extend(rows)
    else:
        print(f"no data available for range {start_date} - {end_date}")
        print('-----------------------------------------------------')
        return update_from_date(end_date + datetime.timedelta(days=1))
    
    return True


def get_server_data(start_date: datetime.date, end_date: datetime.date) -> dict:
    url = 'http://dw.ceasa.gov.br/saiku/rest/saiku/api/query/execute'
    payload = get_payload(start_date, end_date)
    headers = {
        'Accept': 'application/json, text/javascript, */*; q=0.01',
        'Accept-Encoding': 'gzip, deflate',
        'Accept-Language': 'pt-BR,pt;q=0.9,en-US;q=0.8,en;q=0.7',
        'Connection': 'keep-alive',

        'Content-Length': f'{len(payload)}',
        'Content-Type': 'application/json',
        'Cookie': f'JSESSIONID={get_j_session_id()}',
        'Host': 'dw.ceasa.gov.br',
        'Origin': 'http://dw.ceasa.gov.br',
        'Referer': 'http://dw.ceasa.gov.br/',
        'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) '
                      'Chrome/109.0.0.0 Safari/537.36',
        'X-Requested-With': 'XMLHttpRequest'
    }

    response = requests.request(
        method='POST',
        url=url,
        data=payload,
        headers=headers
    ).json()
    return response


def format_server_data(server_data: dict, start_date: datetime.date) -> list[dict]:
    price_values = []
    price_table = server_data['cellset']
    for date in range(4, server_data['height']):
        for column in range(1, server_data['width']):
            try:
                current_row = [price_table[row][column]['value'] for row in [0, 1, 2, date]]
                current_row[3] = current_row[3].replace(".", "")
                current_row[3] = current_row[3].replace(",", ".")  # Remove ',' for csv
                current_row[0] = current_row[0].replace("/", "_")
                price = float(current_row[3])
            except Exception:
                # Invalid price
                continue
            row = {
                'product': current_row[1],
                'ceasa': current_row[0],
                'price': price,
                'date': datetime.date(year=start_date.year, month=start_date.month, day=int(price_table[date][0]['value'])),
            }
            price_values.append(row)
    return price_values


def get_payload(start_date: datetime.date, end_date: datetime.date) -> str:
    payload_file = open("payload.json", "r")
    payload = json.load(payload_file)

    # SET YEAR
    payload['queryModel']['axes']['FILTER']['hierarchies'][0]['levels']['Ano']['selection']['members'][0][
        'uniqueName'] = f"[06-Ano].[06-Ano].[{start_date.year}]"
    payload['queryModel']['axes']['FILTER']['hierarchies'][0]['levels']['Ano']['selection']['members'][0][
        'caption'] = f"{start_date.year}"

    # SET MONTH
    month_name = number_to_month_name(start_date.month)
    payload['queryModel']['axes']['FILTER']['hierarchies'][1]['levels']['Mes']['selection']['members'][0][
        'uniqueName'] = f"[07-Mes].[07-Mes].[{ceil(start_date.month / 6)}º SEM].[{month_name}]"
    payload['queryModel']['axes']['FILTER']['hierarchies'][1]['levels']['Mes']['selection']['members'][0][
        'caption'] = f"{number_to_month_name(start_date.month)}"

    day_template = payload['queryModel']['axes']['ROWS']['hierarchies'][0]['levels']['Dia Mes']['selection']['members'][
        0]
    query_days = payload['queryModel']['axes']['ROWS']['hierarchies'][0]['levels']['Dia Mes']['selection']['members']
    query_days.clear()
    delta = datetime.timedelta(days=1)

    current_date = start_date
    while current_date <= end_date:
        new_day = deepcopy(day_template)
        new_day['uniqueName'] = f"[08-Dia].[08-Dia].[{current_date.day:02d}]"
        new_day['caption'] = f"{current_date.day:02d}"
        query_days.append(new_day)
        current_date += delta

    return json.dumps(payload)


def get_j_session_id() -> str:
    with requests.Session() as session:
        form_data = {"username": "almir.costa", "password": 123123}
        session.post("http://dw.ceasa.gov.br/saiku/rest/saiku/session", data=form_data)
        return session.cookies.get("JSESSIONID")


def get_start_date() -> datetime.date:
    return datetime.date(year=2015,month=1,day=1)


def get_end_date(start_date: datetime.date, today: datetime.date) -> datetime.date:
    if today.year == start_date.year and today.month == start_date.month:
        end_date = datetime.date(start_date.year, start_date.month, min(today.day - 1, start_date.day + 31))
    else:
        end_date = datetime.date(start_date.year, start_date.month,
                                 min(last_day_of_month(start_date).day, start_date.day + 31))
    return end_date


def number_to_month_name(month: int) -> str:
    dictionary = {
        1: "JANEIRO",
        2: "FEVEREIRO",
        3: "MARÇO",
        4: "ABRIL",
        5: "MAIO",
        6: "JUNHO",
        7: "JULHO",
        8: "AGOSTO",
        9: "SETEMBRO",
        10: "OUTUBRO",
        11: "NOVEMBRO",
        12: "DEZEMBRO",
    }
    return dictionary[month]


def last_day_of_month(date: datetime.date) -> datetime.date:
    next_month = date.replace(day=28) + datetime.timedelta(days=4)
    return next_month - datetime.timedelta(days=next_month.day)


In [20]:
datas = [datetime.date(year=x, month=y,day=1) for x in range(2015, 2023) for y in range(1,13) ]


In [None]:
start_date = get_start_date()
update_from_date(start_date=start_date)