# TP : Spark Streaming

Le but de ce TP est d'explorer la librairie de calcul Spark Streaming, et de mettre en évidence les particularités de traitement de flux par rapport aux données statiques.

# Cas d'usage

Nous sommes toujours sur notre produit PomSort. On s'intéresse plus particulièrement au reporting en temps réel : nous allons mettre en place les traitements qui pourraient alimenter ce reporting.

# Données

Les données sont fournies sous forme de fichiers CSV dans le répertoire `data/`. Bien que ce soit des fichiers déjà constitués, nous demanderons à Spark Streaming de les traiter comme des flux, en simulant l'écoulement du temps grâce à une colonne `timestamp`.

Dans cette introduction, on affiche le début des fichiers avec Pandas, pour montrer leur structure.

## Poids des pommes

La balance pèse en continu ce qui passe sur le tapis. Toutes les secondes exactement, on a une mesure : `weight`, exprimée en grammes.

In [24]:
import pandas as pd
pd.read_csv('data/weights/weights.csv').head()

Unnamed: 0,timestamp,weight
0,2025-03-01 00:00:00,236.474553
1,2025-03-01 00:00:01,238.870695
2,2025-03-01 00:00:02,240.635145
3,2025-03-01 00:00:03,237.33909
4,2025-03-01 00:00:04,234.678282


## Diamètres des pommes

Le diamètre est mesuré par un algorithme à chaque fois qu'une pomme est détectée sur le tapis d'arrivée. Les timestamps sont donc irréguliers. On a une mesure à chaque fois : `diameter`, exprimée en centimètres.

In [25]:
pd.read_csv('data/diameters/diameters.csv').head()

Unnamed: 0,timestamp,diameter
0,2025-03-01 00:00:00,11.08
1,2025-03-01 00:00:08,10.44
2,2025-03-01 00:00:22,7.36
3,2025-03-01 00:00:26,6.56
4,2025-03-01 00:00:30,9.17


## Identification des variétés de pommes

L'identification se fait à la détection des pommes, comme pour la mesure du diamètre. Cependant, le modèle de classification étant plus lent que celui de mesure du diamètre, le timestamp est décalé de quelques secondes par rapport à celui-ci.

On a une colonne supplémentaire, `identification_id`, unique pour chaque événement d'identification.

In [26]:
pd.read_csv('data/identifications/identifications.csv').head()

Unnamed: 0,identification_id,timestamp,variety
0,0,2025-03-01 00:00:04,Golden
1,1,2025-03-01 00:00:17,Fuji
2,2,2025-03-01 00:00:33,Fuji
3,3,2025-03-01 00:00:33,Boskoop
4,4,2025-03-01 00:00:41,Fuji


# Initialisation de Spark

On crée la session Spark :

In [None]:
%pip install pyspark

In [None]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("PomSortStreaming") \
    .getOrCreate()

On charge aussi la librairie des fonctions PySpark :

In [None]:
from pyspark.sql import functions as F

# Tutoriel : Création d'un dataframe de streaming pour les poids

Prenez le temps d'exécuter ce tutoriel en en comprenant la logique.

Pour créer un dataframe de streaming, il faut quelques ingrédients :
- le **schéma** : structure (colonnes et types) des données élémentaires qui sont dans le flux
- le **format** de représentation : ici, CSV
- l'**emplacement** : c'est forcément un répertoire. Chaque fichier est donc dans son propre répertoire, pour que Spark Streaming ne les mélange pas

In [151]:
# Le schéma est donné sous forme de chaîne analogue à la définition de colonnes en SQL
weights_schema = 'timestamp timestamp, weight float'

# Le dataframe lui-même
weights = spark.readStream.schema(weights_schema).format('csv').option('header', True).load('data/weights')

Le résultat est un dataframe Spark, qui a la particularité d'être "streamable".

In [27]:
type(weights)

pyspark.sql.dataframe.DataFrame

In [29]:
weights.isStreaming

True

Pour afficher le contenu du dataframe, il faut créer une requête. En streaming, le dataframe ne mémorise pas les données : ce n'est qu'un tuyau branché sur la source, et c'est la requête qui "aspire" les données. Ici elle sort les données sur la console = la sortie de la cellule Jupyter.

Comme on est en streaming, les données sont infinies et la requête ne s'arrête jamais : on lui demande de s'interrompre toute seule au bout de quelques secondes, ce qui est suffisant pour notre volume. Par défaut, sur la console elle n'affiche que 20 lignes de résultat.

In [152]:
def show_result(df, timeout=30, complete=False):
    writer = df.writeStream
    if complete:
        writer = writer.outputMode('complete')
    query = writer.format("console").start()
    query.awaitTermination(timeout=timeout)

show_result(weights)

25/03/23 17:10:56 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-06aa1693-48d1-4c99-b624-813020eec342. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/03/23 17:10:56 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+---------+
|          timestamp|   weight|
+-------------------+---------+
|2025-03-01 00:00:00|236.47455|
|2025-03-01 00:00:01| 238.8707|
|2025-03-01 00:00:02|240.63515|
|2025-03-01 00:00:03| 237.3391|
|2025-03-01 00:00:04|234.67828|
|2025-03-01 00:00:05|233.77025|
|2025-03-01 00:00:06|245.13695|
|2025-03-01 00:00:07|236.12573|
|2025-03-01 00:00:08|238.05058|
|2025-03-01 00:00:09|245.78108|
|2025-03-01 00:00:10|  238.143|
|2025-03-01 00:00:11| 245.8881|
|2025-03-01 00:00:12|235.54253|
|2025-03-01 00:00:13|236.20477|
|2025-03-01 00:00:14|245.88734|
|2025-03-01 00:00:15|237.60182|
|2025-03-01 00:00:16| 239.8839|
|2025-03-01 00:00:17|238.97714|
|2025-03-01 00:00:18| 239.6219|
|2025-03-01 00:00:19|242.91342|
+-------------------+---------+
only showing top 20 rows



# Exercices

## Création des autres dataframes

En s'inspirant de l'exemple pour les poids, créer un dataframe pour les 2 autres sources : `diameters`, `identifications`.

Attention à bien spécifier le schéma ; pour cela se référer aux fichiers ou aux extraits Pandas en haut du notebook. Voici les types de données utiles pour le schéma :
- pour un timestamp : `timestamp`
- pour un nombre flottant : `float`
- pour un nombre entier : `int`
- pour une chaîne de caractères : `string`

In [140]:
diameters = ...

In [141]:
identifications = ...

## Calcul d'une moyenne glissante

Le flux étant infini, une moyenne globale comme en Pandas n'a pas de sens. Il faut forcément la calculer sur des fenêtres temporelles.

En Spark Streaming, cela se fait au moyen d'un regroupement (`groupBy`) sur la fenêtre temporelle. Une fenêtre glissante est construite comme suit :

```
F.window('nom_de_la_colonne_timestamp', 'durée_de_la_fenêtre', 'durée_de_glissement')
```

Sortir la moyenne glissante des diamètres, avec une fenêtre de 1 min qui se décale de 30 s à chaque fois.

In [153]:
diameter_moving_avg = ...

# Comme il y a un groupBy, la requête doit attendre l'arrivée des données complètes avant de sortir un résultat fiable
show_result(diameter_moving_avg, complete=True)

25/03/23 17:13:22 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-7eef7735-c061-4603-83ca-0df5ad334f99. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/03/23 17:13:22 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+------------------+
|              window|     avg(diameter)|
+--------------------+------------------+
|{2025-02-28 23:59...| 8.859999895095825|
|{2025-03-01 00:00...| 9.064999878406525|
|{2025-03-01 00:00...| 9.436666568120321|
|{2025-03-01 00:01...|10.418000030517579|
|{2025-03-01 00:01...| 9.878571510314941|
|{2025-03-01 00:02...| 9.532222323947483|
|{2025-03-01 00:02...|10.434285708836146|
|{2025-03-01 00:03...| 11.19714287349156|
|{2025-03-01 00:03...| 9.570000065697563|
|{2025-03-01 00:04...| 8.932500004768372|
|{2025-03-01 00:04...| 9.982222292158339|
|{2025-03-01 00:05...|10.485714367457799|
|{2025-03-01 00:05...|10.891666650772095|
|{2025-03-01 00:06...| 10.23857137135097|
|{2025-03-01 00:06...|10.612857137407575|
|{2025-03-01 00:07...|  9.96571431841169|
|{2025-03-01 00:07...| 8.835000097751617|
|{2025-03-01 00:08...|  8.99750006198883|
|{2025-03-01 00:08...

## Jointure simple entre 2 dataframes

La jointure classique sur des champs fonctionne de la même manière qu'en Spark classique.

Créer un dataframe qui associe, pour chaque diamètre mesuré, la mesure de poids qui a eu lieu au même instant.

In [154]:
diameters_with_weights = ...

show_result(diameters_with_weights)

25/03/23 17:15:14 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-d490562e-3134-415d-9e40-e0462f9265fe. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/03/23 17:15:14 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+--------+----------+
|          timestamp|diameter|    weight|
+-------------------+--------+----------+
|2025-03-01 00:07:08|   10.65| 232.90999|
|2025-03-01 00:08:39|    7.52| 91.157524|
|2025-03-01 00:06:01|   12.61| 343.35535|
|2025-03-01 00:08:09|   11.43|  284.0077|
|2025-03-01 00:02:57|   13.01| 419.60422|
|2025-03-01 00:04:23|    8.59| 123.86406|
|2025-03-01 00:08:46|    8.13|105.523026|
|2025-03-01 00:01:41|   11.31| 323.81674|
|2025-03-01 00:00:59|    9.74| 193.77864|
|2025-03-01 00:01:54|   12.92|   410.609|
|2025-03-01 00:07:20|   12.32| 363.54865|
|2025-03-01 00:00:26|    6.56| 60.788555|
|2025-03-01 00:04:11|    7.74|  94.62217|
|2025-03-01 00:04:51|   11.22|  250.1715|
|2025-03-01 00:09:12|   10.47| 224.13348|
|2025-03-01 00:09:31|    9.18|   160.633|
|2025-03-01 00:03:38|   10.67| 232.72371|
|2025-03-01 00:02:28|    8.52| 126.13467|
|2025-03-01 00:04:34|

## Jointure sur des fenêtres

On va faire une jointure similaire, mais sur le dataframe d'identification des variétés (`identifications`). Pour rappel, les timestamps ne sont pas les mêmes que ceux de la mesure du diamètre : il faut trouver un moyen d'associer les timestamps "proches".

La stratégie retenue est la suivante :
- affectation des mesures de diamètre à une fenêtre temporelle assez grande
- affectation des événements d'identification à une fenêtre identique
- jointure sur la fenêtre
  - chaque fenêtre du résultat "contiendra" _n_ mesures de diamètre et _n'_ événements d'identification
- pour chaque timestamp mesure de diamètre, on cherche l'événement d'identification postérieur le plus proche

### Affectation aux fenêtres

Créer un dataframe dérivé de `diameters_with_weights`, qui contient une colonne supplémentaire : une fenêtre temporelle d'1 minute. La fenêtre doit être non glissante, pour que chaque mesure soit associée à une et une seule fenêtre.

Pour créer une fenêtre non glissante : `F.window('nom_de_la_colonne_timestamp', 'durée_de_la_fenêtre')`

In [155]:
diameters_with_weights_1m = ...
show_result(diameters_with_weights_1m)

25/03/23 17:26:37 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-6899a0e5-fdda-4822-890d-c636b7d15517. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/03/23 17:26:37 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+--------+----------+--------------------+
|          timestamp|diameter|    weight|              window|
+-------------------+--------+----------+--------------------+
|2025-03-01 00:07:08|   10.65| 232.90999|{2025-03-01 00:07...|
|2025-03-01 00:08:39|    7.52| 91.157524|{2025-03-01 00:08...|
|2025-03-01 00:06:01|   12.61| 343.35535|{2025-03-01 00:06...|
|2025-03-01 00:08:09|   11.43|  284.0077|{2025-03-01 00:08...|
|2025-03-01 00:02:57|   13.01| 419.60422|{2025-03-01 00:02...|
|2025-03-01 00:04:23|    8.59| 123.86406|{2025-03-01 00:04...|
|2025-03-01 00:08:46|    8.13|105.523026|{2025-03-01 00:08...|
|2025-03-01 00:01:41|   11.31| 323.81674|{2025-03-01 00:01...|
|2025-03-01 00:00:59|    9.74| 193.77864|{2025-03-01 00:00...|
|2025-03-01 00:01:54|   12.92|   410.609|{2025-03-01 00:01...|
|2025-03-01 00:07:20|   12.32| 363.54865|{2025-03-01 00:07...|
|2025-03-01 00:00:26|

Faire de même à partir du dataframe `identifications`

In [156]:
identifications_1m = ...
show_result(identifications_1m)

25/03/23 17:28:25 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-005824ff-8c1b-4d54-8a34-3cd0801301d2. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/03/23 17:28:25 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+-----------------+-------------------+------------+--------------------+
|identification_id|          timestamp|     variety|              window|
+-----------------+-------------------+------------+--------------------+
|                0|2025-03-01 00:00:04|      Golden|{2025-03-01 00:00...|
|                1|2025-03-01 00:00:17|        Fuji|{2025-03-01 00:00...|
|                2|2025-03-01 00:00:33|        Fuji|{2025-03-01 00:00...|
|                3|2025-03-01 00:00:33|     Boskoop|{2025-03-01 00:00...|
|                4|2025-03-01 00:00:41|        Fuji|{2025-03-01 00:00...|
|                5|2025-03-01 00:00:46|     Boskoop|{2025-03-01 00:00...|
|                6|2025-03-01 00:00:52|        Fuji|{2025-03-01 00:00...|
|                7|2025-03-01 00:01:06|      Golden|{2025-03-01 00:01...|
|                8|2025-03-01 00:01:20|Granny smith|{2025-03-01 00:01...|
|              

### Jointure sur la fenêtre

Nos 2 dataframes dérivés ont maintenant une fenêtre temporelle commune, sur laquelle on peut les joindre. Faire cette jointure, et filtrer le résultat pour ne garder que les lignes dont `identification_timestamp` est postérieur ou égal à `timestamp`.

Pour filtrer, utiliser la méthode `filter()` d'un dataframe.

Lors de la référence à `identifications_1m`, renommer sa colonne `timestamp` en `identification_timestamp`, pour éviter les ambiguïtés avec le `timestamp` de `diameters_with_weights_1m`. Le renommage se fait avec la fonction `withColumnRenamed()` appliquée à un dataframe.

In [170]:
diameters_with_weights_and_identifications = ...

show_result(diameters_with_weights_and_identifications)

25/03/23 17:53:13 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-f818c996-8b9f-48b5-8107-78fdd2716126. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/03/23 17:53:13 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+--------------------+-------------------+--------+---------+-----------------+------------------------+------------+
|              window|          timestamp|diameter|   weight|identification_id|identification_timestamp|     variety|
+--------------------+-------------------+--------+---------+-----------------+------------------------+------------+
|{2025-03-01 00:05...|2025-03-01 00:05:10|   10.37|192.08461|               37|     2025-03-01 00:05:15|Granny smith|
|{2025-03-01 00:05...|2025-03-01 00:05:05|   11.06|268.72205|               37|     2025-03-01 00:05:15|Granny smith|
|{2025-03-01 00:05...|2025-03-01 00:05:20|     9.5|200.91382|               38|     2025-03-01 00:05:21|      Canada|
|{2025-03-01 00:05...|2025-03-01 00:05:10|   10.37|192.08461|               38|     2025-03-01 00:05:21|      Canada|
|{2025-03-01 00:05...|2025-03-01 00:05:05|   11.06|268.72205|               3

### Raccrochage de l'événement postérieur le plus proche

Cette étape est un peu plus compliquée. On va regrouper les données de `diameters_with_weights_and_identifications` par `timestamp` (timestamp de mesure du diamètre), et appliquer une fonction Python à tout le groupe pour trouver le timestamp `identification_timestamp` postérieur le plus proche.

Cette fonction Python doit prendre un dataframe Pandas en entrée, en retourner un nouveau. PySpark va mettre bout à bout tous les dataframes Pandas résultant pour chaque valeur de `timestamp`.

Pour ce résultat, les colonnes doivent être :
- `timestamp` (celui du regroupement)
- `diameter` (valeur du diamètre)
- `identification_id` (ID de l'événement postérieur le plus proche)
- `variety` (variété de pomme associée)
- 
Il faudra aussi donner à PySpark le schéma du dataframe ainsi retourné.

In [171]:
def process_group(df_pandas):
    # Se rappeler qu'ici, on traite du dataframe *Pandas* et pas Spark
    return ...

diameters_with_weights_and_single_identification = ...

show_result(diameters_with_weights_and_single_identification, timeout=60)

25/03/23 17:54:51 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-9c8b22aa-08e8-475c-88bb-8b4d9c10efc0. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/03/23 17:54:51 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-------------------+--------+-----------------+------------+
|          timestamp|diameter|identification_id|     variety|
+-------------------+--------+-----------------+------------+
|2025-03-01 00:07:08|   10.65|               52|Granny smith|
|2025-03-01 00:08:39|    7.52|               63|     Boskoop|
|2025-03-01 00:06:01|   12.61|               43|      Canada|
|2025-03-01 00:08:09|   11.43|               58|Granny smith|
|2025-03-01 00:04:23|    8.59|               32|        Fuji|
|2025-03-01 00:08:46|    8.13|               65|        Fuji|
|2025-03-01 00:01:41|   11.31|               10|Granny smith|
|2025-03-01 00:07:20|   12.32|               53|      Golden|
|2025-03-01 00:00:26|    6.56|                2|        Fuji|
|2025-03-01 00:04:11|    7.74|               30|     Boskoop|
|2025-03-01 00:04:51|   11.22|               36|      Golden|
|2025-03-01 00:09:12|   10.47|     

## Jointure avec un dataframe statique

Il y a un fichier en plus, dans le fichier `data/truth/truth.csv` : les "vraies" variétés identifiées après annotation par un expert :

In [172]:
pd.read_csv('data/truth/truth.csv').head()

Unnamed: 0,identification_id,true_variety
0,0,Golden
1,1,Fuji
2,2,Fuji
3,3,Boskoop
4,4,Fuji


Lire le fichier sous forme de dataframe statique (non streaming) :

In [173]:
truth = ...
truth.show()

+-----------------+------------+
|identification_id|true_variety|
+-----------------+------------+
|                0|      Golden|
|                1|        Fuji|
|                2|        Fuji|
|                3|     Boskoop|
|                4|        Fuji|
|                5|     Boskoop|
|                6|        Fuji|
|                7|Granny smith|
|                8|Granny smith|
|                9|      Golden|
|               10|Granny smith|
|               11|        Fuji|
|               12|      Golden|
|               13|      Golden|
|               14|        Fuji|
|               15|Granny smith|
|               16|     Boskoop|
|               17|      Canada|
|               18|      Golden|
|               19|     Boskoop|
+-----------------+------------+
only showing top 20 rows



In [174]:
truth.isStreaming

False

Nous pouvons maintenant faire une jointure entre notre dataframe de streaming et celui-ci, pour avoir en même temps l'identification algorithmique et la vraie valeur.

In [175]:
final_df = ...
show_result(final_df, timeout=60)

25/03/23 17:59:18 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-0cd0dc69-ca60-4a30-9306-e372e8c6bbc0. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/03/23 17:59:18 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
                                                                                

-------------------------------------------
Batch: 0
-------------------------------------------
+-----------------+-------------------+--------+------------+------------+
|identification_id|          timestamp|diameter|     variety|true_variety|
+-----------------+-------------------+--------+------------+------------+
|               51|2025-03-01 00:07:08|   10.65|      Golden|      Golden|
|               58|2025-03-01 00:08:39|    7.52|Granny smith|Granny smith|
|               43|2025-03-01 00:06:01|   12.61|      Canada|      Canada|
|               58|2025-03-01 00:08:09|   11.43|Granny smith|Granny smith|
|               12|2025-03-01 00:02:57|   13.01|      Golden|      Golden|
|               29|2025-03-01 00:04:23|    8.59|     Boskoop|     Boskoop|
|               58|2025-03-01 00:08:46|    8.13|Granny smith|Granny smith|
|                7|2025-03-01 00:01:41|   11.31|      Golden|Granny smith|
|                0|2025-03-01 00:00:59|    9.74|      Golden|      Golden|
|  

## Récupération explicite des données par itération

In [182]:
final_df.writeStream.foreach(print).start().awaitTermination(timeout=120)

25/03/23 19:11:58 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-29580635-e7c1-42c4-8abc-f3d3a21b43a9. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
25/03/23 19:11:58 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
Row(identification_id=12, timestamp=datetime.datetime(2025, 3, 1, 0, 2, 57), diameter=13.010000228881836, variety='Golden', true_variety='Golden')
Row(identification_id=29, timestamp=datetime.datetime(2025, 3, 1, 0, 4, 23), diameter=8.59000015258789, variety='Boskoop', true_variety='Boskoop')
Row(identification_id=58, timestamp=datetime.datetime(2025, 3, 1, 0, 8, 46), diameter=8.130000114440918, variety='Granny smith', true_variety='Granny smith')
Row(identification_

False