# Code

In [None]:
import json
import re
import os
import pandas as pd

from typing import Union, List

def firestore_to_json(value_dict):
    if not isinstance(value_dict, dict):
        return value_dict

    for key, value in value_dict.items():
        if key == "stringValue":
            return value
        elif key == "integerValue":
            return int(value)
        elif key == "doubleValue":
            return float(value)
        elif key == "booleanValue":
            return value
        elif key == "nullValue":
            return None
        elif key == "timestampValue":
            return value
        elif key == "mapValue":
            if "fields" in value:
                 return {f_key: firestore_to_json(f_val) for f_key, f_val in value["fields"].items()}
            else:
                 return {}
        elif key == "arrayValue":
            if "values" in value:
                return [firestore_to_json(item) for item in value["values"]]
            else:
                return []
        else:
            return {key: value}
    return None


def rename_df(df : pd.DataFrame) -> pd.DataFrame:
    columns = df.columns.tolist()
    if "price_out" in columns:
        df = df.rename(columns={'price_out': 'Price'})
    if "format" in columns:
        df = df.rename(columns={'format': 'Volume'})
    for c in columns:
        df = df.rename(columns={c: c.capitalize()})
    return df


def display_top(df: pd.DataFrame, n=20) -> pd.DataFrame:
    out = df.copy()
    out = out.reset_index(drop=True)
    out.index.name = "Rank"
    out.index = out.index + 1
    return out.head(n)


def parse(filename: str, columns_to_keep: List[str]) -> Union[pd.DataFrame, None]:
    with open(filename, 'r', encoding="utf-8") as file:
        content = file.read()
    
    match = re.search(r'\[2,\s*\[\s*(\{.*?\})\s*\]\s*\]', content, re.DOTALL)

    if match:
        json_part_string = match.group(1)
        try:
            data = json.loads(json_part_string)

            articles_firestore_map = data.get("documentChange", {}) \
                                        .get("document", {}) \
                                        .get("fields", {}) \
                                        .get("articles", {}) \
                                        .get("mapValue", {}) \
                                        .get("fields", {})

            simplified_articles = {}

            for article_id, article_data in articles_firestore_map.items():
                if "mapValue" in article_data and "fields" in article_data["mapValue"]:
                    simplified_article_fields = {}
                    for field_name, field_value_dict in article_data["mapValue"]["fields"].items():
                        simplified_article_fields[field_name] = firestore_to_json(field_value_dict)
                    simplified_articles[article_id] = simplified_article_fields

            final_json_output = json.dumps(simplified_articles, indent=2, ensure_ascii=False)
            df = pd.DataFrame(simplified_articles).T
            df = df[df['article_type'] == 1]
            df = df.reindex(columns=columns_to_keep)
            df = rename_df(df)
            df = df.reset_index(drop=True)
            with open(f"db/{filename.split('.')[0].split('/')[-1]}.json", "w", encoding="utf-8") as out:
                out.write(final_json_output)

            return df

        except json.JSONDecodeError as e:
            print(f"Error decoding JSON part: {e}")
        except KeyError as e:
            print(f"Error navigating the data structure, missing key: {e}")
        except Exception as e:
            print(f"An unexpected error occurred: {e}")

    else:
        print("Could not find the relevant data structure '[2, [{...}]]' in the input string.")


def create_dir_if_absent(dir: str) -> None:
    if not os.path.exists(dir):
        os.makedirs(dir)


YEAR = "2025-2026-Q1"
df = parse(f"db/raw/{YEAR}.txt", ['name', 'price_out', 'format', 'type', 'degree', 'available'])

df = df[df["Available"] == True]
df["Ratio"] = (df['Degree'] * df['Volume']) / df['Price']

df = df.sort_values(by=["Ratio", "Price", "Volume"], ascending=[False, True, False])
df = df.reset_index(drop=True)
df.index.name = "Rank"
df.index = df.index + 1

create_dir_if_absent(f"csv/{YEAR}")
df.to_csv(f"csv/{YEAR}/ranker.csv")

# Top 20, any type of beer

In [None]:
display_top(df[df["Volume"] < 75])

# Top 20, blonde beers

In [None]:
blonde_beers = display_top(df[(df["Type"] == "Blonde") & (df["Volume"] < 75)])
blonde_beers.to_csv(f"csv/{YEAR}/blonde_ranker.csv")
blonde_beers

# Top 20, ambrées

In [None]:
amber_beers = display_top(df[(df["Type"] == "Ambrée") & (df["Volume"] < 75)])
amber_beers.to_csv(f"csv/{YEAR}/amber_ranker.csv")
amber_beers

# Top trappistes

In [None]:
trapist_beers = display_top(df[(df["Type"] == "Trappiste") & (df["Volume"] < 75)])
trapist_beers.to_csv(f"csv/{YEAR}/trapist_ranker.csv")
trapist_beers

# Top 20, brunes

In [None]:
brown_beers = display_top(df[(df["Type"] == "Brune") & (df["Volume"] < 75)])
brown_beers.to_csv(f"csv/{YEAR}/brown_ranker.csv")
brown_beers

# Top 20, fruitées

In [None]:
fruit_beers = display_top(df[(df["Type"] == "Fruitée") & (df["Volume"] < 75)])
fruit_beers.to_csv(f"csv/{YEAR}/fruit_ranker.csv")
fruit_beers

# Top blanches

In [None]:
white_beers = display_top(df[(df["Type"] == "Blanche") & (df["Volume"] < 75)])
white_beers.to_csv(f"csv/{YEAR}/white_ranker.csv")
white_beers

# Top 75cl

In [None]:
big = display_top(df[df["Volume"]  == 75])
big.to_csv(f"csv/{YEAR}/75cl_ranker.csv")
big

# Top 50 du rat (à plus que 5° quand même (big up à Hunter))

In [None]:
rat = display_top(df[(df["Degree"] >= 5) & (df["Volume"] < 75) & (df["Price"] <= 2.5)], n=50)
rat = rat.sort_values(by=["Ratio", "Volume"], ascending=[False, True])
rat