In [22]:
import functools
import itertools
from pathlib import Path
from typing import Iterable

import pandas as pd

# 🐍 Pipelines de données fonctionnels avec Python

> Implémenter des pipelines de traitement de données grâce aux concepts de programmation fonctionnelle inclus nativement avec Python

_Romain Clement - Meetup Python Grenoble - 23/11/2023_

## 🤷‍♂️ Contexte

- Essor du _Data Engineering_
- Paradigme de _graphes orientés acycliques_ (_DAGs_)


## ⚠️ Remarques

- Proposition de patterns
- Small / medium data
- Programmation fonctionnelle _light_

## ♽ DAG ?

_**D**irected **A**cyclic **G**raph_

(_Graphe Orienté Acyclique_)

- Dépendances et ordre d'exécution
- Modularité, réutilisabilité, testabilité
- Pipelines et algorithmes

### ♽ DAG - Représentation

```mermaid
graph LR;
    A-->B;
    A-->C;
    B-->D;
    C-->D;
```

### ♽ DAG - Exemple 1

```mermaid
graph LR;
    A[Compute some stuff];
```

### ♽ DAG - Exemple 2 - Nettoyer un fichier CSV

```mermaid
graph LR;
    A[Load CSV] -- Dataframe --> B[Clean Dataframe];
    B -- Dataframe --> C[Save to CSV];
```

### ♽ DAG - Exemple 2 - Nettoyer un fichier CSV

```mermaid
graph LR;
    subgraph F[Process]
        direction LR
            A[Load CSV] -- Dataframe --> B[Clean Dataframe];
            B -- Dataframe --> C[Save to CSV];
    end
    START[ ] -- Path --> A;
    C -- Path --> END[ ];
    style START fill:#FFFFFF00, stroke:#FFFFFF00;
    style END fill:#FFFFFF00, stroke:#FFFFFF00;
```

### ♽ DAG - Exemple 3 - Web-scraping ETL

```mermaid
graph LR;
    A[Extract data] --> B[Transform data];
    A --> C[Compute metadata];
    B --> D[Load data];
    C --> D;
```

### ♽ DAG - Exemple 4 - Traitement de fichiers (streaming)

```mermaid
graph LR;
    subgraph B[Per file process]
        direction LR
            B1[Read file] --> B2[Process file]
    end
    A[List files] -- files --> B1
```

### ♽ DAG - Exemple 5 - Machine Learning

```mermaid
graph LR;
    A[Load dataset] --> B[Train / test split];
    B -- train set --> C[Train model];
    C -- model --> D[Evaluate model] & E[Register];
    B -- test set --> D;
    D -- metrics --> F[Log];
```

## ƛ Programmation Fonctionnelle

Concepts utiles :

- Tout est fonction
- Fonctions pures
- Immutabilité
- Composition
- Réutilisabilité (_Curryfication_)
- Evaluation paresseuse

### ƛ Concepts fonctionnels en Python

Disponible nativement :

- Fonctions: `def`
- Fonctions d'ordre supérieur: `map`, `filter`, `itertools.reduce`, `lambda`
- Réutilisabilité: `functools.partial`
- Evaluation paresseuse: `yield`, `itertools.tee`
- Typage faible: `typing`

### ƛ Concepts fonctionnels en Python

Non disponible nativement :

- Composition de fonctions
- Structures de données complexes immutables
- Typage fort (~)

## ✨ DAGs + Programmation Fonctionnelle

<!--
    Demonstrate how to leverage functional programming concepts to build data pipelines in Python.
    Explore the use of map, filter, and reduce functions for processing data in a functional manner.
    Discuss the benefits of using functools.partial for creating partially applied functions.
    Showcase the power of generators and iterators for handling large datasets efficiently.
    Explain the concept of lazy evaluation and how it can improve the performance of data pipelines.
-->

## Suite
<!--
Examples and Code Walkthrough (5 minutes)

    Provide practical examples of writing DAGs using functional programming concepts.
    Walk through code snippets to illustrate the application of map, filter, reduce, functools.partial, generators, iterators, and lazy evaluation in building data pipelines.

Advantages and Drawbacks (2 minutes)

    Summarize the advantages of using functional data pipelines in Python:
        Readability and maintainability of code.
        Improved composability and reusability of functions.
        Efficient handling of large datasets through lazy evaluation.
    Discuss potential drawbacks or challenges:
        Learning curve for those unfamiliar with functional programming.
        Potential performance trade-offs in certain scenarios.

Conclusion (2 minutes)

    Recap the key points discussed in the talk.
    Emphasize the power and flexibility of functional programming in designing data pipelines.
    Encourage attendees to explore and experiment with functional programming concepts in their own data processing workflows.
-->

In [11]:
def list_files(directory: Path) -> list[Path]:
    return list(directory.glob("*.jpg"))

def read_file(filepath: Path) -> bytes:
    return filepath.read_bytes()

def process_file(content: bytes) -> None:
    ...

def imperative_pipeline():
    for filepath in list_files("mydir"):
        process_file(read_file(filepath))

In [None]:
def streaming_pipeline():
    map(process_file, map(read_file, list_files))

In [9]:
def load_csv(filename: str) -> pd.DataFrame:
    return pd.read_csv(filename)

def clean_csv(data: pd.DataFrame) -> pd.DataFrame:
    return data.dropna()

def save_csv(filename: str, data: pd.DataFrame) -> None:
    data.to_csv(filename)

def clean_csv_pipeline():
    save_csv("output.csv", clean_csv(load_csv("input.csv")))

## Réutilisabilité

In [21]:
class APIClient: ...

def get_api_key() -> str: ...

def api_client(api_key: str) -> APIClient: ...

def api_get_file(client: APIClient, filename: str) -> bytes: ...

def main():
    client = api_client(get_api_key())
    get_file = functools.partial(api_get_file, client)
    get_file("file1")
    get_file("file2")

## Evaluation paresseuse

In [24]:
def list_files() -> Iterable[Path]:
    return Path().glob("*")

def open_file(filepath: Path) -> bytes:
    return filepath.read_bytes()

def streaming_pipeline() -> None:
    files1, files2 = itertools.tee(list_files(), 2)
    list(map(len, map(open_file, files1)))
    list(map(print, files2))

## 👍 Avantages

- Fonctions Python pures
- Graphes de traitement avec style fonctionnel
- Force une conception générique
- Unités de traitement paramétrables et réutilisables
- Données volumineuses bénéficient du streaming avec les générateurs
- Le traitement des générateurs par évaluation paresseuse
- Test facilité
- Traitement parallèle possible

## 👎 Limitations

- Matérialisation des générateurs
- Résultats intermédiaires
- Introspection

## 🚀 Pour aller plus loin

Orchestrateurs:
- Airflow
- Dagster
- Prefect
- Spark

Bibliothèques:
- [`toolz`](https://toolz.readthedocs.io)
- [`functional-pipeline`](https://functional-pipeline.readthedocs.io)

## References

- [`itertools`](https://docs.python.org/3/library/itertools.html)
- [`functools`](https://docs.python.org/3/library/functools.html)