Comme nous avons pu le voir, Apache Airflow est un outil très puissant pour automatiser des workflows. Il permet aussi bien d'automatiser des pipelines ETL que des séquences d'entrainement de modèles. De plus, son intégration avec différentes applications et services (donc ceux de Google Cloud) le rendent très opérationnel.

<blockquote><p>🙋 <b>Ce que nous allons faire</b></p>
<ul>
    <li>Utiliser les opérateurs Google Cloud présents sur Apache Airflow</li>
    <li>Déclencher le pipeline ML sur Airflow dans les deux environnements</li>
</ul>
</blockquote>

<img src="https://media.giphy.com/media/7zYmz8wSLMIV7qpgSU/giphy.gif" />

## La dérive de modèle

Airflow va être très utile pour contrer un phénomène très présent dans les environnements de production : la **dérive de modèle** (ou *model drift*). Il y a en réalité deux types de dérives qui peuvent survenir.

- La **dérive conceptuelle** : dans ce cas de figure, les propriétés statistiques de la variables réponse évoluent. C'est le cas par exemple où l'on considère que la variable réponse n'a plus la même signification au cours du temps. Un exemple classique est la notion de spam sur les réseaux sociaux : au tout début, les spams considérés étaient uniquement les messages comportant des caractères aléatoires. En avançant dans le temps, les spams ont englobés de plus en plus de cas, et plus seulement des messages aléatoires. Puisqu'il y a une **redéfinition de la variable réponse**, le modèle est, en soit, toujours cohérent dans ses prédictions, mais dans notre fait, ses prédictions n'ont plus la même valeur par rapport au problème posé.
- La **dérive des données** : à l'inverse, c'est lorsque les propriétés statistiques de l'estimateur/modèle évoluent. Automatiquement, le modèle n'est donc plus en phase avec le phénomène sous-jacent. Cela est d'autant plus fort qu'il y a une composante temporelle. Prenons par exemple un modèle qui cherche à prédire la durée de trajet dans une ville à forte densité (Paris, Lyon, etc). Au moins d'août, il y a par exemple beaucoup moins de circulation qu'en février. Un modèle, qui se serait entraîné sur quelques mois d'historique, ne fournirait pas des prédictions satisfaisantes en plein été. Cela est du au fait que la durée d'un trajet dépend de la circulation, qui elle-même dépend (en partie) du moment de l'année. Il serait donc indispensable de mettre à jour le modèle avec des données « plus fraîches ».

La méthode la plus efficace pour corriger ces deux dérives est d'entraîner le modèle régulièrement avec des données récentes.

> ❓ Est-ce que cela veut dire que l'on oublie les données plus ancienne ?

Pas forcément. Pour notre exemple de prédiction de comportement utilisateur, on souhaite bien évidemment avoir l'information des comportements récents, car cela reflète le plus fidèlement possible les utilisateurs. Des comportements d'il y a deux mois ou depuis Noël ne sont pas forcément pertinent à l'heure actuelle.

Dans d'autres situations, néanmoins, il est toujours utile de garder une *mémoire* des données plus anciennes. On retrouve alors des situations où l'on pondérise les données en fonction de leur ancienneté, de sorte à conserver des données anciennes sans pour autant leur donner la même importance que les données plus récentes.

Maintenant que nous avons la solution, une question fatidique se pose.

> ❓ À quelle fréquence doit-on rafraîchir le modèle ?

Et forcément, il n'y a pas de réponse toute faite. 😅
En principe, c'est bien entendu le domaine d'application et le cas d'usage qui va définir cette fréquence de rafraîchissement. Dans notre exemple de ECommerce, on souhaite avoir un comportement utilisateur assez récent, sans pour autant prendre trop ancien : si l'on considérait un rafraîchissement tous les deux mois, les événements particuliers comme Noël seront absorbés par tous les autres événements. En ne prenant que quelques jours (2 ou 3), on ne prends plus en compte le cycle hebdomaire semaine/week-end, qui peut potentiellement influencer le comportement des utilisateurs.

Le plus adapté serait donc de considérer une fréquence de rafraîchissement de 1 à 2 semaines.

## Automatisation sur Airflow

Rappelons-nous de l'automatisation déjà mise en place dans notre infrastructure. D'une part, le pipeline de pré-production, exécuté à chaque mise à jour d'un code sur un des dépôts (du modèle ou de l'API). Le pipeline de pré-production est la succession du pipeline CI/CD construit pour entraîner le modèle et l'envoyer vers MLflow, avec le pipeline qui va automatiquement déployer une image Docker de l'API sur Cloud Run.

<img src="https://blent-learning-user-ressources.s3.eu-west-3.amazonaws.com/training/ml_engineer_facebook/img/pipeline_preprod.png" />

De manière quasi-symmétrique, nous avons également le pipeline de production

<img src="https://blent-learning-user-ressources.s3.eu-west-3.amazonaws.com/training/ml_engineer_facebook/img/pipeline_prod1.png" />

Les petites différences résident dans les branches considérées (`staging` ou `master`) des projets Git, ainsi que sur la plateforme cible qui est Cloud Run dans l'environnement de pré-production et Kubernetes dans l'environnement de production.

Le principal avantage de nos deux pipelines, c'est que mis à part les déclenchements qui sont manuels lors d'un push sur Git, toutes les autres étapes sont réalisés automatiquement. En d'autres termes, il n'y a **pas besoin de ré-écrire toutes les étapes** : il nous suffirait de déclencher automatiquement le build sur Cloud Build pour que les pipelines soient ensuite exécuté en intégralité de manière automatisée.

Et c'est justement notre intérêt ici : à l'aide d'Airflow, nous allons simplement déclencher le build qui va construire le modèle (`purchase-predict`). Avec les pipelines CI/CD que nous avons déjà configuré, le modèle sera ensuite envoyé sur Airflow puis l'API, à son tour, sera conteneurisée pour être ensuite déployée vers la plateforme cible associée. Nous avons donc uniquement besoin de déclencher un build via Airflow pour lancer toute la séquence de manière automatisée.

> ❓ Mais les données, elles, ne changent pas ?

Et oui ! Rappelons-nous que les données les plus à jour sont stockées sur une table BigQuery !

<img src="https://blent-learning-user-ressources.s3.eu-west-3.amazonaws.com/training/ml_engineer_facebook/img/bigquery_10.jpg" />

La seule hypothèse que nous avons ici est que les données arrivent *en continu* sur cette table par d'autres applications. Sauf que cette table ne contient pas les données « prêtes à l'emploi ». Si l'on se rappelle bien, nous avions réalisé un script Spark qui allait justement faire tout le travail de transformation de données pour ensuite créer les fichiers CSV dans le bucket.

<img src="https://blent-learning-user-ressources.s3.eu-west-3.amazonaws.com/training/ml_engineer_facebook/img/dataproc_spark2.png" />

Ces fichiers CSV qui sont ensuite lus par Kedro dans le pipeline `loading` sur le projet `purchase-predict`, et la suite que nous connaissons très bien. 😉

Il nous faut donc, avant de déclencher le build, exécuter la tâche Spark qui va récupérer les données depuis la table BigQuery avec une intervalle de dates spécifié pour ensuite transformer les données et exporter le tout sur un bucket. Ainsi, Airflow devra au préalable exécuter la tâche PySpark dans un cluster Dataproc avant de déclencher le build, que nous pouvons résumer dans le schéma suivant.

<img src="https://blent-learning-user-ressources.s3.eu-west-3.amazonaws.com/training/ml_engineer_facebook/img/airflow_pipelines1.png" />

## Construction du DAG

Pour l'instant, intéressons-nous uniquement au DAG qui va déclencher le pipeline ML dans l'environnement de pré-production. Nous pouvons décomposer notre DAG de manière successive.

- Création d'un cluster Dataproc.
- Une fois le cluster crée, on envoie la tâche PySpark avec les arguments de temporalité.
- On détruit le cluster, et en parallèle, on exécute le build.

Comme mentionné plus haut, Airflow dispose <a href="https://airflow.apache.org/docs/apache-airflow-providers-google/stable/operators/cloud/dataproc.html" target="_blank">d'opérateurs Dataproc</a> pour créer, supprimer des clusters Dataproc ou encore envoyer des tâches. Pour cela, nous devons installer les dépendances supplémentaires Google Cloud sous Airflow.

<div class="alert alert-block alert-info">
    La référence de tous les opérateurs et hooks est <a href="https://airflow.apache.org/docs/apache-airflow-providers/operators-and-hooks-ref/index.html" target="_blank">disponible ici</a>.
</div>

Commençons par construire un premier DAG `pipeline_ml_staging` avec seulement deux tâches : une pour créer un cluster Dataproc, et une autre pour le supprimer.

Dans un premier temps, nous importons le module `dataproc_operator`, installé via `pip`, qui va nous permettre d'interagir avec le service Dataproc de Google Cloud.

Nous définissons ensuite plusieurs variables.

- `BUCKET`, qui est le nom du bucket où sont stockés les données, les fichiers de configuration, etc.
- `CLUSTER_NAME_TEMPLATE`, qui est le modèle de nom qui sera attribué au cluster Dataproc que nous allons créer.
- `CLUSTER_CONFIG` qui va contenir les informations du cluster qui sera crée.

Notons dans le dictionnaire `default_args`, la présence des champs `project_id` et `region`. Ici, nous spécifions au DAG entier le nom du projet Google Cloud, qui sera ensuite hérité pour chaque tâche : nous n'aurons donc pas besoin de re-spécifier le nom du projet à chaque instanciation de tâche. Cette variable doit être présente en tant que variable sur Airflow.

<img src="https://blent-learning-user-ressources.s3.eu-west-3.amazonaws.com/training/ml_engineer_facebook/img/airflow_pipelines2.png" />

Nous pouvons ensuite spécifier la clé de la variable ainsi que sa valeur.

<img src="https://blent-learning-user-ressources.s3.eu-west-3.amazonaws.com/training/ml_engineer_facebook/img/airflow_pipelines3.png" />

Elle sera ensuite automatiquement ajoutée.

<img src="https://blent-learning-user-ressources.s3.eu-west-3.amazonaws.com/training/ml_engineer_facebook/img/airflow_pipelines4.png" />

<div class="alert alert-block alert-info">
    Les variables seront <b>automatiquement cryptées</b> sur Airflow si elles contiennent le mot KEY ou SECRET.
</div>

Continuons ensuite la construction du DAG.

La première tâche `task_create_dataproc` va démarrer un cluster Dataproc. Nous ré-utilisons le modèle de nom auquel nous collons la date d'exécution du DAG au format `YYYYMMDD` grâce à la macro `{{ ds_nodash }}`. Ensuite nous précision la version de l'image Dataproc (Spark 3.2), et nous choisissons ensuite 2 machines avec `worker_config` (depuis `CLUSTER_CONFIG`) de type `n1-standard-4` (pareil pour la machine qui gère le cluster `master_config`). Pour les versions gratuites de GCP, on sélectionnera plutôt `n1-standard-1` ou `n1-standard-2` pour éviter de dépasser les quotas imposés.

Tout comme nous l'avions fait sur l'interface, le paramètre `idle_delete_ttl` fixé à 3600 indique que le cluster sera automatiquement détruit au bout de 1 heure (3600 secondes) si aucune tâche n'est en cours d'exécution.

La seconde tâche `task_delete_dataproc` va simplement supprimer le cluster créer précédemment. Attention toutefois, nous spécifions la règle de déclenchement `all_done`, qui signifie que même si des tâches parentes n'ont pas pu être effectués, on supprimera quand même le cluster Dataproc pour éviter de consommer des crédits.

Il ne reste plus qu'à tester nos tâches ... mais nous avons oublié l'authentification ! Heureusement, Airflow a pensé à tout et dispose de **connexions**.

Les connexions (visibles dans la barre de navigation `Admin -> Connections`) permet d'établir des connexions sécurisés de manière globale sans à chaque fois re-définir dans le code les méthodes d'authentification. Cela permet donc de faciliter la maintenance.

Éditons la connexion nommée `google_cloud_default` dans la liste des connexions.

<div class="alert alert-block alert-warning">
    Pour que le type de connexion Google Cloud apparaisse, il faut redémarrer le <code>webserver</code> et le <code>scheduler</code>.
</div>

Pour cela, nous allons créer un nouveau <a href="https://console.cloud.google.com/iam-admin/serviceaccounts" target="_blank">compte de service</a> spécifique à Airflow avec les autorisations suivantes.

- Administrateur Dataproc
- Utilisateur du compte de service
- Lecteur des objets de l'espace de stockage
- Lecteur de dépôt source
- Compte de service Cloud Build

Ce dernier rôle nous sera utile pour exécuter Cloud Build à partir d'un fichier YAML présent sur le bucket. Créons une nouvelle clé JSON et insérons là dans Airflow (`Keyfile JSON`).

<img src="https://blent-learning-user-ressources.s3.eu-west-3.amazonaws.com/training/ml_engineer_facebook/img/airflow_pipelines5.png" />

Maintenant que tout est configuré, nous devrions voir notre DAG apparaître avec les deux tâches.

<img src="https://blent-learning-user-ressources.s3.eu-west-3.amazonaws.com/training/ml_engineer_facebook/img/airflow_pipelines6.png" />

Ouvrons dans un nouvel onglet <a href="https://console.cloud.google.com/dataproc/clusters" target="_blank">l'interface Dataproc</a> et essayons de tester notre première tâche.

Si tout est correctement configuré, nous pouvons voir le cluster apparaître dans la région `us-central1`.

<img src="https://blent-learning-user-ressources.s3.eu-west-3.amazonaws.com/training/ml_engineer_facebook/img/airflow_pipelines7.png" />

Testons maintenant la tâche suivante pour supprimer le cluster.

Essayons maintenant de glisser au milieu la tâche PySpark qui va transformer le jeu de données.

L'opérateur `DataprocSubmitPySparkJobOperator` va nous permettre d'envoyer une tâche PySpark au cluster spécifié en y indiquant des paramètres additionnels. En l'occurrence ici, nous ajoutons le JAR BigQuery car nous avons besoin de récupérer les trajets présents sur BigQuery.

Le paramètre `argments` nous permet d'indiquer la fenêtre temporelle sur laquelle nous allons récolter les données. Notons qu'ici nous choisissons l'intervalle de planification du DAG (qui correspond ici à une semaine).

N'oublions pas qu'Airflow exécute **toujours un DAG à la fin de sa période d'exécution !** Ainsi, le DAG correspond au lundi 04 janvier 2021 à 5h (valeur de `ds`) sera en réalité exécuté le lundi 11 janvier 2021 à 5h. C'est pour cela que nous choisissons `next_ds` pour récolter les observations du 04/01 au 11/01, sinon nous aurions un décalage d'une semaine dans le passé.

Testons alons avec un backfill.

Avec ce backfill, ce sera l'exécution du lundi 11/11/2019 qui sera déclenchée. Une fois le cluster en cours d'exécution, nous pouvons voir la tâche PySpark envoyé par Airflow.

<img src="https://blent-learning-user-ressources.s3.eu-west-3.amazonaws.com/training/ml_engineer_facebook/img/airflow_pipelines8.png" />

Cette tâche peut être plus longue en terme d'exécution puisque nous avons plus de données à cette période par rapport à début octobre 2019. Si les temps de calcul sont trop longs, pour tester, nous pouvons réduire la taille de la fenêtre.

Une fois la tâche terminé, nous devrions voir nos données toutes fraîches dans <a href="https://console.cloud.google.com/storage/browser" target="_blank">notre bucket</a>.

<img src="https://blent-learning-user-ressources.s3.eu-west-3.amazonaws.com/training/ml_engineer_facebook/img/airflow_pipelines9.png" />

Dans le même temps, le cluster Dataproc est supprimé par la dernière tâche Airflow.

Il ne reste plus qu'à exécuter notre build via Cloud Build, et Kedro ira directement chercher ces fichiers CSV. Tout comme nous avions exécuté le build du `purchase-predict-api` à partir du `cloudbuild.yaml` du projet `purchase-predict`, nous allons faire la même chose ici avec l'opérateur `CloudBuildCreateBuildOperator`.

Commençons par importer cet opérateur.

La variable `CLOUD_BUILD_STEP_ARGS` contient les arguments de la commande `bash` de l'étape qui va exécuter un autre build depuis le dépôt `purchase-predict`. Notons ici que contrairement à l'API, nous ne fournissons pas le dossier dans `gcloud builds submit` mais le contenu du dossier compressé, car par défaut `gcloud builds submit` ne garde pas les sous-dossiers vides dont `conf/local` dans Kedro, alors que ce dernier en a besoin.

Esuite, nous devons notamment y substituer des variables, dont le `SHORT_SHA` (qui sera généré manuellement lors de l'appel de la tâche Airflow), l'adresse du serveur MLflow avec `_MLFLOW_SERVER` et enfin le nom de la branche Git (ici `staging`).

Rajoutons la tâche dans le DAG et connectons-la avec les autres.

À l'instar du fichier `cloudbuild.yaml`, dans le paramètre `body` de la tâche, nous définissons les étapes du build. Ici, même principe, il n'y a qu'une seule étape qui va elle-même déclencher le build de `purchase-predict`.

<img src="https://blent-learning-user-ressources.s3.eu-west-3.amazonaws.com/training/ml_engineer_facebook/img/airflow_pipelines10.png" />

Avant d'exécuter un backfill, rajoutons la variable `MLFLOW_SERVER` sur Airflow. Par ailleurs, il faut penser à nettoyer les tâches du DAG avant de faire le backfill, qui est considéré comme succès, pour retester l'intégralité du DAG.

<div class="alert alert-block alert-info">
    En comptabilisant le calcul Spark et tous les builds, cela peut prendre une trentaine de minutes. Pour ne pas être bloqué par la limite de temps pour <code>purchase-predict</code>, on peut augmenter le <code>timeout</code> ou changer le type de machine pour ne obtenir une plus puissante.
</div>

Et après toute cette attente, nous allons finalement pouvoir profiter de la dernière version mise à jour sur Cloud Run (car nous sommes dans l'environnement de pré-production).

<img src="https://blent-learning-user-ressources.s3.eu-west-3.amazonaws.com/training/ml_engineer_facebook/img/airflow_pipelines11.png" />

Magique, non ? 😲

Il ne reste plus qu'à faire le même DAG mais pour l'environnement de production, qui bien entendu ne devrait pas poser de difficulté car le déploiement sur Kubernetes est géré par Cloud Build.

En résumé, le fichier `pipeline_ml_production.py` est très proche de ce que nous venons de faire.

> ❓ Est-ce que l'on ne pourrait pas optimiser les deux DAGs ?

Bien sûr ! Si l'on regarde en détails, on réalise ici deux fois le même calcul Spark. Nous pourrions par exemple fusionner sur un seul DAG, ou encore avoir un DAG qui fait le traitement Spark, puis ensuite déclencher les deux autres DAGs par la suite.

Ici nous avons fait le choix de la flexibilité si, par exemple, on ne souhaiterai pas lancer les deux calculs en même temps.

<div class="alert alert-block alert-warning">
    Dans le cadre de la formation, il faut faire attention à vérifier que le cluster est bien supprimé à la fin. En effet, n'oublions pas qu'<i>in fine</i>, ce sont des ressources qui nécessitent des crédits.
</div>

Pour terminer, on peut ensuite envoyer ce DAG dans Airflow sur le serveur (en pensant à installer les dépendances supplémentaires que nous avons fait au tout début).

## ✔️ Conclusion

Et voilà ! Cela représente l'aboutissement de l'approche MLOps ! Tu peux être très fier(e) de toi pour tout ce chemin parcouru ! 🥳

- Nous avons pu créer et supprimer des clusters Dataproc depuis Airflow.
- Nous avons ensuite intégré le déclenchement Cloud Build directement dans le DAG Airflow.

> ➡️ Il y aurait encore tant de choses à voir... mais pour terminer, il est important de voir quelques bonnes pratiques, notamment au niveau de la **sécurité** ou encore du **monitoring** !