In [1]:
import requests
import json
import pandas as pd
import time
from dotenv import load_dotenv
import os
import re
from datetime import datetime
from decimal import Decimal, InvalidOperation
import base64
from datetime import datetime, timedelta
import pytz


pd.options.display.float_format = '{:,.14f}'.format

Pyarrow will become a required dependency of pandas in the next major release of pandas (pandas 3.0),
(to allow more performant data types, such as the Arrow string type, and better interoperability with other libraries)
but was not found to be installed on your system.
If this would cause problems for you,
please provide us feedback at https://github.com/pandas-dev/pandas/issues/54466
        
  import pandas as pd


In [2]:
load_dotenv()
TOKEN_URL = 'https://api.ticket360.com.br/auth/oauth/access_token'
API_URL = 'https://api.ticket360.com.br'
EVENT_ID = '30617'
EVENT_NAME = 'Camarote-essepe-2026-grupo-especial-thiaguinho'
MAX_RETRIES = 3
TIMEOUT = 30
DATA_DIR = "ticket_data"
CONSOLIDATED_FILE = os.path.join(DATA_DIR, "consolidated.json")

In [None]:
def get_access_token():
    """Get access token with treatment errors"""
    for attempt in range(MAX_RETRIES):
        try:
            auth_string = f"{os.getenv('CLIENT_ID')}:{os.getenv('CLIENT_SECRET')}"
            auth_base64 = base64.b64encode(auth_string.encode()).decode()
            
            response = requests.post(
                TOKEN_URL,
                headers={
                    "Authorization": f"Basic {auth_base64}",
                    "Content-Type": "application/x-www-form-urlencoded"
                },
                data={"grant_type": "client_credentials"},
                timeout=TIMEOUT
            )
            response.raise_for_status()
            return response.json().get("access_token")
        except requests.exceptions.Timeout:
            print(f"Timeout (tentativa {attempt + 1}/{MAX_RETRIES})")
            if attempt < MAX_RETRIES - 1:
                time.sleep(5)
        except Exception as e:
            print(f"Error to get token: {type(e).__name__} - {str(e)}")
            return None
    return None

In [None]:
def fetch_report(token, start_date=None, end_date=None):
    '''Search report from API with page and aplly filter conditions'''
    try:
        base_url = f"{API_URL}/sales/reports/consolidated/{EVENT_ID}?filter=status=paid&ticket.status=active"
        
        offset = 0
        limit = 1000
        all_sales = []
        
        while True:
            url = f"{base_url}&limit={limit}&offset={offset}"
            response = requests.get(
                url,
                headers={"Authorization": f"Bearer {token}"},
                timeout=TIMEOUT
            )
            response.raise_for_status()
            data = response.json()
            sales = data.get('sales', [])
            all_sales.extend(sales)
            
            if len(sales) < limit:
                break
            offset += limit
        
        # Convert to dataframe with error treatment
        df_sales = pd.DataFrame(all_sales)
        if "date" in df_sales.columns and not df_sales.empty:
            #If the column is numerical then timestamp in ms
            if pd.api.types.is_numeric_dtype(df_sales["date"]):
                df_sales["date"] = pd.to_datetime(df_sales["date"], errors="coerce", unit="ms")
            else:
                # If the column is already ISO
                df_sales["date"] = pd.to_datetime(df_sales["date"], errors="coerce")
            
            # Apply filter to start_date/end_date if already existis
            if start_date:
                df_sales = df_sales[df_sales["date"].dt.date >= pd.to_datetime(start_date).date()]
            if end_date:
                df_sales = df_sales[df_sales["date"].dt.date <= pd.to_datetime(end_date).date()]
        
        return {"sales": df_sales.to_dict(orient="records")}
    
    except Exception as e:
        print(f"Error to fetch the report: {type(e).__name__} - {str(e)}")
        return None


In [None]:
def normalize_dates(df: pd.DataFrame, date_column: str = "date") -> pd.DataFrame:
    ''''Normalize data to ISO-8601 format (YYYY-MM-DDTHH:MM:SS±HH:MM)'''
    if date_column not in df.columns:
        return df
    
    df[date_column] = pd.to_datetime(df[date_column], utc=True, errors="coerce")
    
    # Convert to string in ISO format
    df[date_column] = df[date_column].dt.strftime("%Y-%m-%dT%H:%M:%S%z")
    
    df[date_column] = df[date_column].str.replace(
        r"(\+)(\d{2})(\d{2})$", r"\1\2:\3", regex=True
    )
    
    return df

In [6]:
def remove_today_data(df):
    hoje = datetime.now().date()

    if "date" in df.columns:
        df["date"] = pd.to_datetime(df["date"], errors="coerce", utc=True)
        df["date_only"] = df["date"].dt.date
        df = df[df["date_only"] != hoje].drop(columns=["date_only"])
    else:
        print("'Date' column not found")

    return df 

In [None]:
def load_consolidated_ndjson(filepath):
    # Read data from JSON file in array format
    if not os.path.exists(filepath):
        return []

    try:
        with open(filepath, "r", encoding="utf-8") as f:
            data = json.load(f)
            if isinstance(data, list):
                return data
            elif isinstance(data, dict):
                return [data] 
            else:
                print(f"Unnexpected format at file {filepath}")
                return []
    except json.JSONDecodeError as e:
        print(f"Error to load {filepath}: {e}")
        return []




In [8]:
def append_to_consolidated_json(filepath, new_records):
    #Add new records to consolidated.json file keeping the array format
    existing_data = load_consolidated_ndjson(filepath)
    updated_data = existing_data + new_records

    with open(filepath, "w", encoding="utf-8") as f:
        json.dump(updated_data, f, indent=2, ensure_ascii=False, default=str)

In [9]:
def get_last_date_from_consolidated():
    records = load_consolidated_ndjson(CONSOLIDATED_FILE)
    if not records:
        return None
    df = pd.DataFrame(records)
    if "date" not in df.columns or df.empty:
        return None
    df["date"] = pd.to_datetime(df["date"], errors="coerce", utc=True)
    return df["date"].max().date()


In [None]:
def save_to_json(data, filename):
    '''Save the data in JSON file with data treatment'''
    def convert_timestamps(obj):
        if isinstance(obj, pd.Timestamp):
            return obj.isoformat()
        if isinstance(obj, datetime):
            return obj.isoformat()
        return obj
    
    with open(filename, 'w') as f:
        if isinstance(data, pd.DataFrame):
            json_data = data.to_dict(orient='records')
            json.dump(json_data, f, indent=2, default=convert_timestamps)
        else:
            json.dump(data, f, indent=2, default=convert_timestamps)

In [None]:
def consolidate_data(new_data_file):
    ''''Append new data to main file "consolidated.json"'''
    # Carregar dados existentes
    df_new = pd.read_json(CONSOLIDATED_FILE)

    if os.path.exists(CONSOLIDATED_FILE) and os.path.getsize(CONSOLIDATED_FILE) > 2:
        try:
            df_old = pd.read_json(CONSOLIDATED_FILE)
        except ValueError:
            with open(CONSOLIDATED_FILE, "r", encoding="utf-8") as f:
                raw = f.read()
            if "]" in raw:
                raw = raw[:raw.rfind("]")+1]
                df_old = pd.read_json(pd.io.common.StringIO(raw))
            else:
                df_old = pd.DataFrame()
    else:
        df_old = pd.DataFrame()

    if not df_old.empty:
        df_all = pd.concat([df_old, df_new], ignore_index=True)
    else:
        df_all = df_new.copy()

    #Remove duplicates
    keys_prioridade = ['ticket.id', 'ticket.code', 'id']
    subset = [c for c in keys_prioridade if c in df_all.columns]
    if subset:
        if 'date' in df_all.columns:
            df_all = df_all.sort_values('date')
        df_all = df_all.drop_duplicates(subset=subset, keep='last')

    df_all.to_json(CONSOLIDATED_FILE, orient='records', force_ascii=False, indent=2)

    return df_all

In [None]:
def get_latest_date():
    '''Get the latest data from consolidated.json file'''
    if not os.path.exists(CONSOLIDATED_FILE):
        return None
    
    try:
        df = pd.read_json(CONSOLIDATED_FILE)
        if 'date' not in df.columns or df.empty:
            return None
        if pd.api.types.is_integer_dtype(df['date']):
            s = pd.to_datetime(df['date'], unit='ms', utc=True)
        else:
            s = pd.to_datetime(df['date'],errors='coerce',utc=True)

        if s.isna().all():
            return None 
        
        return s.max().date()
    except Exception as e:
        print(f"Error to get last date: {e}")
        return None

        


In [13]:
def save_incremental_file(data, prefix="incremental"):
    timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
    filename = os.path.join(DATA_DIR, f"{prefix}_{timestamp}.json")
    save_to_json(data, filename)
    print(f"Incremental data saved to {filename}")

In [None]:
def run_incremental_pipeline():
    token = get_access_token()
    if not token:
        print("Error to get token")
        return 
    
    # Requisição à API
    report_data = fetch_report(token)
    if not report_data or "sales" not in report_data:
        print("No data returned from API")
        return
    
    df = pd.DataFrame(report_data["sales"])
    if df.empty:
        print("No sales data found")
        return
    
    df = normalize_dates(df, date_column="date")

    last_date = get_last_date_from_consolidated()   # Most updated data alredy saved
    if last_date is None:
        print("No consolidated data found")
        return
    
    start_date = last_date + timedelta(days=1)     
    today = datetime.now().date()

    # Filter the records after the last_date
    df_filtered = df[df["date"].apply(lambda x: pd.to_datetime(x).date() >= start_date)]

    # Remover registros de hoje
    df_filtered = remove_today_data(df_filtered)

    if df_filtered.empty:
        print("Already up to date (no new data)")
        return
    
    # Oldest date avaliable into df_filtered
    min_date = df_filtered["date"].apply(lambda x: pd.to_datetime(x).date()).min()

    if min_date == today:
        print("Already up to date (only today’s data available)")
        return
    
    if min_date > last_date and min_date != today:
        # Append ao consolidated.json
        append_to_consolidated_json(CONSOLIDATED_FILE, df_filtered.to_dict(orient="records"))
        # Save incremental data into new json filte
        save_incremental_file(df_filtered)
        print(f"Incremental file created: {CONSOLIDATED_FILE} ({len(df_filtered)} records)")
    else:
        print("Already up to date (no valid new data)")


In [15]:
if __name__ == "__main__":
    run_incremental_pipeline()

Incremental data saved to ticket_data\incremental_20250908_103132.json
Incremental file created: ticket_data\consolidated.json (28 records)


A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["date"] = pd.to_datetime(df["date"], errors="coerce", utc=True)
A value is trying to be set on a copy of a slice from a DataFrame.
Try using .loc[row_indexer,col_indexer] = value instead

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  df["date_only"] = df["date"].dt.date
