# Exercice Capstone – Company Sales

## Objectifs Exceptions
- Gérer les erreurs classiques avec `try / except / else / finally`.
- Écrire des messages **ciblés** (pas de `except Exception` global).
- Rendre le pipeline robuste : lecture CSV → agrégations → export.

---

### Imports & chemins

In [3]:
from pathlib import Path
import csv

# TODO: définir les chemins des fichiers
SALES_FILE = Path("./files/sales_data.csv")
EMPLOYEES_FILE = Path("./files/employee_data.csv")
REPORT_FILE = Path("./files/report.csv")

print("Files existants:")
print("- Sales_data.csv:", SALES_FILE.exists())
print("- Employee_data.csv:", EMPLOYEES_FILE.exists())

class EmptyOrNoHeaderCSVError(Exception):
    """Erreur personnalisée pour CSV vide ou sans header."""
    pass


Files existants:
- Sales_data.csv: True
- Employee_data.csv: True


### Lecture des CSV

In [4]:
# TODO: implémenter une fonction read_csv_rows(path: Path) -> list[dict]
# Utiliser csv.DictReader
def read_csv_rows(path: Path) -> list[dict]:
    """
    Lecture simple d'un CSV en liste de dictionnaires avec validations:
    - existence du fichier
    - présence des en-têtes
    - présence d'au moins une ligne de données
    Lève : FileNotFoundError, EmptyOrNoHeaderCSVError
    :param path: Path
    :return: List[dict]
    """
    if not path.exists():
        raise FileNotFoundError(f"Le fichier {path} n'existe pas.")

    with open(path, encoding='utf-8') as file:
        reader = csv.DictReader(file)
        rows = list(reader)

        if not reader.fieldnames:
            raise EmptyOrNoHeaderCSVError(f"Le fichier {path} n'a pas d'en-têtes.")
        if not rows:
            raise EmptyOrNoHeaderCSVError(f"Le fichier {path} est vide ou n'a pas de données.")
        return rows

sales_rows = read_csv_rows(SALES_FILE)
employee_rows = read_csv_rows(EMPLOYEES_FILE)

print("Exemples de lignes:")
len(sales_rows), len(employee_rows)

Exemples de lignes:


(100, 10)

### Ventes par employé

In [6]:
# TODO: implémenter aggregate_sales_by_employee(rows: list[dict]) -> dict[str, float]
# Agréger les montants par employee_id
def aggregate_sales_by_employee(rows: list[dict]) -> dict[str, float]:
    """
    Agrège les montants des ventes par employé.
    Colonnes attendues ( casse exacte ): EmployeeID, Amount
    Léve KeyError si les colonnes sont manquantes, ValueError si montant invalide.
    :param rows: list[dict]
    :return: dict[str, float]
    """
    totals: dict[str, float] = {}
    print("Colonnes des ventes:", rows[0].keys())
    print("Nombre de lignes de ventes:", len(rows[0].values()))

    for row in rows:
        try:
            emp_id = row["employee_id"]
            amount = float(row['Amount'])
        except KeyError as e:
             KeyError(f"Colonne manquante dans les données de vente: {e}")
        except ValueError:
            raise ValueError(f"Montant invalide pour l'employé {emp_id}: {row['Amount']}")
        totals[emp_id] = totals.get(emp_id, 0.0) + amount

    return totals

sales_by_employee = aggregate_sales_by_employee(sales_rows)
print("Exemple de ventes par employé:", list(sales_by_employee.items())[:5])


Colonnes des ventes: dict_keys(['SalesID', 'Date', 'Amount', 'EmployeeID'])
Nombre de lignes de ventes: 4


AttributeError: 'KeyError' object has no attribute 'items'

### Index employé → département

In [4]:
# TODO: implémenter build_employee_department_index(rows: list[dict]) -> dict[str, str]
# Construire un dictionnaire {employee_id: department}
def build_employee_department_index(rows: list[dict]) -> dict[str, str]:
    """
    Construit un index employee_id -> department (lecture simple)
    :param rows: list[dict]
    :return: dict[str, str]
    """
    idx: dict[str, str] = {}

    for row in rows:
        try:
            idx[row["EmployeeID"]] = row["Department"]
        except KeyError as e:
            raise KeyError(f"Colonne manquante dans les données des employés: {e}")
    return idx

emp_to_dept = build_employee_department_index(employee_rows)
list(emp_to_dept.items())[:5]

[('1', 'Sales'),
 ('2', 'Operations'),
 ('3', 'Customer Service'),
 ('4', 'Operations'),
 ('5', 'Finance')]

### Ventes par département

In [5]:
# TODO: implémenter aggregate_sales_by_department(sales_by_emp: dict[str, float], emp_to_dept: dict[str, str]) -> dict[str, float]
# Agréger les ventes par département
def aggregate_sales_by_department(sales_by_employee: dict[str, float], emp_to_dept: dict[str, str]) -> dict[str, float]:
    """
    Jointure simple employee_id -> department, puis agrégation par department.
    Employées sans department sont ignorées, les ignorés (comptabilisés).
    :param sales_by_employee: dict[str, float]
    :param emp_to_dept: dict[str, str]
    :return: dict[str, float]
    """
    totals: dict[str, float] = {}
    skipped = 0

    for emp_id, total in sales_by_employee.items():
        dep = emp_to_dept.get(emp_id)
        if dep is None:
            skipped += 1
            continue  # Employée sans département, on ignore
        totals[dep] = totals.get(dep, 0.0) + total

    if skipped:
        print(f"Attention: {skipped} employés sans département ont été ignorés.")
    return totals

sales_by_department = aggregate_sales_by_department(sales_by_employee, emp_to_dept)
sales_by_department

{'Customer Service': 197261.0,
 'Operations': 104549.0,
 'Sales': 57520.0,
 'Finance': 123519.0,
 'IT': 78752.0}

### Écriture du rapport

In [None]:
# TODO: implémenter write_report(path: Path, dept_totals: dict[str, float])
# Écrire un CSV department,total_sales
def write_report(path: Path, dept_totals: dict[str, float]) -> None:
    """
    Écrit un rapport CSV simple department, total_sales (formatée à 2 décimales)
    :param path: Path
    :param dept_totals: dict[str, float]
    """
    with open(path, 'w', encoding='utf-8', newline='') as file:
        writer = csv.writer(file)
        writer.writerow(["department", "total_sales"])
        for dept, total in dept_totals.items():
            writer.writerow([dept, f"{total:.2f}"])  # Formatage à 2 décimales

write_report(REPORT_FILE, sales_by_department)
REPORT_FILE.resolve()

### Vérifications

In [6]:
# TODO: vérifier que report.csv existe et contient au moins un header + 1 ligne
if REPORT_FILE.exists():
    with open(REPORT_FILE, encoding='utf-8') as file:
        lines = file.readlines()
    assert len(lines) >= 2, "Le fichier report.csv ne contient pas de données"
else:
    print("Le fichier report.csv n'existe pas")

Le fichier report.csv n'existe pas


## Bonus
- Trier les départements par ventes décroissantes.
- Sauvegarder dans un fichier `report_sorted.csv`.
- (Optionnel) Si matplotlib est installé, tracer un bar chart des ventes.


In [None]:
# Bonus : trier par ventes décroissantes et écrire report_sorted.csv
def sorted_totals_desc(deps_totals: dict[str, float]) -> list[tuple[str, float]]:
    """
    Trie les départements par ventes décroissantes.
    :param deps_totals: dict[str, float]
    :return: list[tuple[str, float]]
    """
    return sorted(deps_totals.items(), key=lambda item: item[1], reverse=True)

sorted_depts = sorted_totals_desc(sales_by_department)

REPORT_SORTED_FILE = Path("./files/report_sorted.csv")
with open(REPORT_SORTED_FILE, 'w', encoding='utf-8', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(["department", "total_sales"])
    for dept, total in sorted_depts:
        writer.writerow([dept, f"{total:.2f}"])  # Formatage à 2 décimales

print("Rapport trié : ", REPORT_SORTED_FILE.resolve())
sorted_depts[:5]  # Top 5 départements