# Projet Prévision Météo en Australie - MLOps Juillet 2024

Ce projet déploie un modèle **Random Forest** dans une application de prévision de pluie à J+1 sur une ville donnée en Australie. Le projet intègre des outils MLOps tels que **Airflow**, **MLflow** pour la pipeline data-model, **Prometheus** et **Grafana** pour le monitoring des ressources machines, ainsi que **FastAPI** et **Streamlit** pour l'inférence.

## Table des matières

- [Description du projet](#description-du-projet)
- [Architecture du projet](#architecture-du-projet)
- [Les DAGs Airflow](#les-dags-airflow)
- [Outils utilisés](#outils-utilisés)
- [Installation et utilisation](#installation-et-utilisation)
  - [Version de production](#version-de-production)
  - [Version de développement](#version-de-développement)
- [CI/CD](#cicd)
- [Monitoring](#monitoring)
- [Contribution](#contribution)
- [Licence](#licence)
- [Contact](#contact)

---

## Description du projet

Le projet vise à prédire la probabilité de pluie le lendemain pour une ville spécifique en Australie. Il s'appuie sur un modèle **Random Forest** entraîné sur des données météorologiques actualisées quotidiennement. Les principaux composants du projet sont :

- **Airflow** pour orchestrer les pipelines de données (ETL) et l'entraînement du modèle.
- **MLflow** pour gérer les expériences de machine learning et suivre les performances des modèles.
- **FastAPI** et **Streamlit** pour fournir une interface utilisateur pour les prédictions et une interface administrateur pour gérer les mises à jour et les entraînements.
- **Prometheus** et **Grafana** pour le monitoring des ressources serveurs et la visualisation des métriques.
- Utilisation de **Docker** pour la containerisation et de **Docker Hub** pour le déploiement des images.

---

## Architecture du projet

![Architecture du Projet][] <!-- Assurez-vous d'inclure une image représentant l'architecture de votre projet -->

Le projet est entièrement containerisé, ce qui facilite le déploiement et la scalabilité. L'architecture se compose des éléments suivants :

- **Scraping des données** : Récupération quotidienne des relevés météorologiques via des scripts Python.
- **Pipeline ETL avec Airflow** : Extraction, transformation et chargement des données dans une base de données PostgreSQL.
- **Entraînement du modèle avec MLflow** : Entraînement hebdomadaire du modèle Random Forest, comparaison avec le modèle précédent selon le F1-score, et déploiement du meilleur modèle.
- **API d'inférence avec FastAPI** : Fournit des prédictions basées sur le modèle déployé.
- **Interface utilisateur avec Streamlit** : Permet aux utilisateurs de faire des prédictions et aux administrateurs de lancer manuellement une recuperation des données du jour et un entrainement selection du meilleur modele.
- **Monitoring avec Prometheus et Grafana** : Collecte et visualisation des métriques du système et des performances du modèle.
- **CI/CD avec GitHub Actions** : Tests automatisés et déploiement continu sur Docker Hub.

---

## Les DAGs Airflow

- **DAG de collecte des données (quotidien)**
  - **Tâches** :
    - Scraping du site météorologique pour obtenir les relevés journaliers.
    - Nettoyage et préparation des données.
    - Insertion des données dans la base de données PostgreSQL.
- **DAG d'entraînement du modèle (hebdomadaire)**
  - **Tâches** :
    - Chargement des données depuis la base de données.
    - Entraînement du modèle Random Forest avec MLflow.
    - Comparaison avec le modèle précédent en utilisant le F1-score.
    - Enregistrement du meilleur modèle pour l'inférence.
- **DAG combiné (exécution manuelle)**
  - **Tâches** :
    - Exécution des tâches de collecte des données.
    - Entraînement du modèle et sélection du meilleur.
  - **Utilisation** :
    - Peut être déclenché depuis le panneau administrateur de l'application Streamlit pour forcer une mise à jour du modèle.
- **DAG de tests unitaires**
  - **Tâches** :
    - Exécution de la suite de tests pour valider le bon fonctionnement des pipelines et du modèle.

---

## Outils utilisés

- **Langage** : Python 3.8+
- **Outils MLOps** :
  - **Apache Airflow** : Orchestration des pipelines ETL et des entraînements.
  - **MLflow** : Gestion des expériences de machine learning et suivi des modèles.
- **Développement Web** :
  - **FastAPI** : Création de l'API d'inférence.
  - **Streamlit** : Interface utilisateur pour les prédictions et les actions administratives.
- **Monitoring** :
  - **Prometheus** : Collecte des métriques système.
  - **Grafana** : Visualisation des métriques via des tableaux de bord.
- **Gestion des données** :
  - **PostgreSQL** : Base de données pour stocker les données préparées.
- **Containerisation et Déploiement** :
  - **Docker** et **Docker Compose** : Containerisation des services.
  - **Docker Hub** : Stockage et distribution des images Docker.
  - **GitHub Actions** : Intégration continue et déploiement continu (CI/CD).

---

## Installation et utilisation

### Pré-requis

- **Docker** et **Docker Compose** installés sur votre machine.
- **Make** installé pour utiliser les Makefiles.

### Version de production

1. **Initialiser Airflow** :

   ```bash
   make -f Makefile.prod init-airflow
   ```

2. **Démarrer les services** :

   ```bash
   make -f Makefile.prod start
   ```

### Version de développement

1. **Initialiser Airflow** :

   ```bash
   make -f Makefile.dev init-airflow
   ```

2. **Démarrer les services** :

   ```bash
   make -f Makefile.dev start
   ```

---

## CI/CD

Le projet utilise **GitHub Actions** pour l'intégration continue et le déploiement continu :

- **Tests automatisés** : À chaque push ou pull request, les tests unitaires sont exécutés pour s'assurer que le code est fonctionnel.
- **Build des images Docker** : Les images Docker sont construites et testées.
- **Déploiement sur Docker Hub** : Si les tests réussissent, les images sont poussées sur Docker Hub avec un nouveau tag de version.

---

## Monitoring

**Prometheus** collecte les métriques système, telles que l'utilisation du CPU, de la mémoire et des ressources réseau. **Grafana** est utilisé pour visualiser ces métriques à travers des tableaux de bord personnalisables.

- **Accéder à Grafana** :

  Rendez-vous sur `http://localhost:3000` et connectez-vous avec les identifiants par défaut (configurés dans le docker-compose).

- **Dashboard permettant de visualiser entre autres** :

  - Utilisation du CPU.
  - Utilisation de la mémoire.
  - Utilisation disque.
  - Utilisation réseau.
  - Performances des services Docker

---


## Licence

Ce projet est sous licence MIT - voir le fichier [LICENSE](./LICENSE) pour plus de détails.

---

Ce projet a été développé par l'équipe suivante :

- Shirley GERVOLINO ([GitHub](https://github.com/Shirley687) / [LinkedIn](https://www.linkedin.com/in/))
- Tristan ([GitHub](https://github.com/tristandatascience) / [LinkedIn](https://www.linkedin.com/in/))
- Prudence Amani ([GitHub](https://github.com/) / [LinkedIn](https://www.linkedin.com/in/))
- Stéphane Los ([GitHub](https://github.com/hil-slos) / [LinkedIn](https://fr.linkedin.com/in/losstephane/))

---

*Ce projet a été réalisé dans le cadre du programme MLOps de Juillet 2024.*


In [None]:
%%capture
!pipenv install dlt[lancedb]==0.5.1a0
!pipenv install sentence-transformers

In [None]:
import requests
import dlt

qa_dataset = requests.get("https://github.com/DataTalksClub/llm-zoomcamp/blob/main/01-intro/documents.json?raw=1").json()

@dlt.resource
def qa_documents():
  for course in qa_dataset:
    yield course["documents"]

pipeline = dlt.pipeline(pipeline_name="from_json", destination="lancedb", dataset_name="qanda")

load_info = pipeline.run(qa_documents, table_name="documents")

print(load_info)

In [None]:
import lancedb

db = lancedb.connect("./.lancedb")
print(db.table_names())

In [None]:

%%capture
!pipenv install dlt[lancedb]==0.5.1a0
!pipenv install sentence-transformers
import requests
import dlt

qa_dataset = requests.get("https://github.com/DataTalksClub/llm-zoomcamp/blob/main/01-intro/documents.json?raw=1").json()

@dlt.resource
def qa_documents():
  for course in qa_dataset:
    yield course["documents"]

pipeline = dlt.pipeline(pipeline_name="from_json", destination="lancedb", dataset_name="qanda")

load_info = pipeline.run(qa_documents, table_name="documents")

print(load_info)
import lancedb

db = lancedb.connect("./.lancedb")
print(db.table_names())
db_table = db.open_table("qanda___documents")

db_table.to_pandas()
import os
from dlt.destinations.adapters import lancedb_adapter

os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL_PROVIDER"] = "sentence-transformers"
os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL"] = "all-MiniLM-L6-v2"

pipeline = dlt.pipeline(pipeline_name="from_json_embedded", destination="lancedb", dataset_name="qanda_embedded")

load_info = pipeline.run(lancedb_adapter(qa_documents, embed=["text", "question"]), table_name="documents")
print(load_info)
db = lancedb.connect("./.lancedb")
print(db.table_names())
db_table = db.open_table("qanda_embedded___documents")

db_table.to_pandas()
!yes | dlt init rest_api lancedb
import os
from google.colab import userdata

os.environ["SOURCES__REST_API__NOTION__API_KEY"] = userdata.get("SOURCES__REST_API__NOTION__API_KEY")

os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL_PROVIDER"] = "sentence-transformers"
os.environ["DESTINATION__LANCEDB__EMBEDDING_MODEL"] = "all-MiniLM-L6-v2"

os.environ["DESTINATION__LANCEDB__CREDENTIALS__URI"] = ".lancedb"
from datetime import datetime, timezone

class PostBodyPaginator(BasePaginator):
    def __init__(self):
        super().__init__()
        self.cursor = None

    def update_state(self, response: Response) -> None:
        # Assuming the API returns an empty list when no more data is available
        if not response.json():
            self._has_next_page = False
        else:
            self.cursor = response.json().get("next_cursor")
            if self.cursor is None:
                self._has_next_page = False

    def update_request(self, request: Request) -> None:
        if request.json is None:
            request.json = {}

        # Add the cursor to the request body
        request.json["start_cursor"] = self.cursor

@dlt.resource(name="employee_handbook")
def rest_api_notion_resource():
    notion_config: RESTAPIConfig = {
        "client": {
            "base_url": "https://api.notion.com/v1/",
            "auth": {
                "token": dlt.secrets["sources.rest_api.notion.api_key"]
            },
            "headers":{
            "Content-Type": "application/json",
            "Notion-Version": "2022-06-28"
            }
        },
        "resources": [
            {
                "name": "search",
                "endpoint": {
                    "path": "search",
                    "method": "POST",
                    "paginator": PostBodyPaginator(),
                    "json": {
                        "query": "Homework: Employee handbook",
                        "filter": {
                            "property": "object",
                            "value": "page"
                        },
                        "sort": {
                            "direction": "ascending",
                            "timestamp": "last_edited_time"
                        }
                    },
                    "data_selector": "results"
                }
            },
            {
                "name": "page_content",
                "endpoint": {
                    "path": "blocks/{page_id}/children",
                    "paginator": JSONResponsePaginator(),
                    "params": {
                        "page_id": {
                            "type": "resolve",
                            "resource": "search",
                            "field": "id"
                        }
                    },
                }
            }
        ]
    }

    yield from rest_api_source(notion_config,name="employee_handbook")

def extract_page_content(response):
    block_id = response["id"]
    last_edited_time = response["last_edited_time"]
    block_type = response.get("type", "Not paragraph")
    if block_type != "paragraph":
        content = ""
    else:
        try:
            content = response["paragraph"]["rich_text"][0]["plain_text"]
        except IndexError:
            content = ""
    return {
        "block_id": block_id,
        "block_type": block_type,
        "content": content,
        "last_edited_time": last_edited_time,
        "inserted_at_time": datetime.now(timezone.utc)
    }

@dlt.resource(
    name="employee_handbook",
    write_disposition="merge",
    primary_key="block_id",
    columns={"last_edited_time":{"dedup_sort":"desc"}}
    )
def rest_api_notion_incremental(
    last_edited_time = dlt.sources.incremental("last_edited_time", initial_value="2024-06-26T08:16:00.000Z",primary_key=("block_id"))
):
    # last_value = last_edited_time.last_value
    # print(last_value)

    for block in rest_api_notion_resource.add_map(extract_page_content):
        if not(len(block["content"])):
            continue
        yield block

def load_notion() -> None:
    pipeline = dlt.pipeline(
        pipeline_name="company_policies",
        destination="lancedb",
        dataset_name="notion_pages",
        # full_refresh=True
    )

    load_info = pipeline.run(
        lancedb_adapter(
            rest_api_notion_incremental,
            embed="content"
        ),
        table_name="homework",
        write_disposition="merge"
    )
    print(load_info)

load_notion()
import lancedb

db = lancedb.connect(".lancedb")
dbtable = db.open_table("notion_pages___homework")

dbtable.to_pandas()
dbtable.to_pandas().sort_values(by="last_edited_time", ascending=False)
#Q1 14 
#Q2 2024-07-05 23:33:00+00:00
!curl -fsSL https://ollama.com/install.sh | sh
!nohup ollama serve > nohup.out 2>&1 &
%%capture
!ollama pull llama2-uncensored
!pip install ollama
import ollama

def retrieve_context_from_lancedb(dbtable, question, top_k=2):

    query_results = dbtable.search(query=question).to_list()
    context = "\n".join([result["content"] for result in query_results[:top_k]])

    return context

def main():
  # Connect to the lancedb table
  db = lancedb.connect(".lancedb")
  dbtable = db.open_table("notion_pages___employee_handbook")

  # A system prompt telling ollama to accept input in the form of "Question: ... ; Context: ..."
  messages = [
      {"role": "system", "content": "You are a helpful assistant that helps users understand policies inside a company's employee handbook. The user will first ask you a question and then provide you relevant paragraphs from the handbook as context. Please answer the question based on the provided context. For any details missing in the paragraph, encourage the employee to contact the HR for that information. Please keep the responses conversational."}
  ]

  while True:
    # Accept user question
    question = input("You: how many PTO days are the employees entitled to in a year?")

    # Retrieve the relevant paragraphs on the question
    context = retrieve_context_from_lancedb(dbtable,question,top_k=2)

    # Create a user prompt using the question and retrieved context
    messages.append(
        {"role": "user", "content": f"Question: '{question}'; Context:'{context}'"}
    )

    # Get the response from the LLM
    response = ollama.chat(
        model="llama2-uncensored",
        messages=messages
    )
    response_content = response['message']['content']
    print(f"Assistant: {response_content}")

    # Add the response into the context window
    messages.append(
        {"role": "assistant", "content":response_content}
    )
main()
#Q3

In [None]:
print('essai print \n')