## Configurazione e avvio sessione Spark

In [None]:
# disattiva il parallelismo a livello delle librerie di calcolo per gestirlo con Spark
import os
os.environ["OMP_NUM_THREADS"] = "1"
os.environ["MKL_NUM_THREADS"] = "1"
os.environ["OPENBLAS_NUM_THREADS"] = "1"

In [2]:
from pyspark.sql import SparkSession
from pyspark.ml.functions import predict_batch_udf
from pyspark.sql.types import ArrayType, FloatType, StringType, StructType, StructField
from pyspark.sql.functions import col
from pyspark import SparkFiles, SparkContext

In [None]:
# configurazioni spark per esecuzione nel cluster yarn
configs = {
    "spark.app.name": "ResNetPredictNotebook",
    "spark.master": "yarn",                    
    "spark.submit.deployMode": "client",       
    "spark.executor.instances": "2",        
    "spark.executor.cores": "1",
    "spark.executor.memory": "1g",
    "spark.executor.memoryOverhead": "512m",
    "spark.driver.memory": "1.5g",
    "spark.sql.shuffle.partitions": "8",
    "spark.default.parallelism": "8",
    "spark.python.worker.reuse": "true",
    # passa env var agli executor
    "spark.executorEnv.OMP_NUM_THREADS": "1",
    "spark.executorEnv.MKL_NUM_THREADS": "1",
    "spark.executorEnv.OPENBLAS_NUM_THREADS": "1",
}
try:
    spark.stop()
except NameError:
    pass

if SparkContext._active_spark_context is not None:
    SparkContext._active_spark_context.stop()
    
builder = SparkSession.builder
for k, v in configs.items():
    builder = builder.config(k, v)

# crea la sessione spark se non disponibile
spark = builder.getOrCreate()
sc = spark.sparkContext

print(dict(sc.getConf().getAll()))

25/08/21 12:14:17 WARN Client: Neither spark.yarn.jars nor spark.yarn.archive is set, falling back to uploading libraries under SPARK_HOME.


{'spark.executorEnv.OMP_NUM_THREADS': '1', 'spark.app.name': 'ResNetPredictNotebook', 'spark.driver.memory': '1.5g', 'spark.python.worker.reuse': 'true', 'spark.default.parallelism': '8', 'spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_URI_BASES': 'http://namenode:8088/proxy/application_1755771130202_0001', 'spark.driver.host': 'namenode', 'spark.executor.memory': '1g', 'spark.serializer.objectStreamReset': '100', 'spark.ui.proxyBase': '/proxy/application_1755771130202_0001', 'spark.submit.deployMode': 'client', 'spark.driver.port': '34487', 'spark.ui.filters': 'org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter', 'spark.org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter.param.PROXY_HOSTS': 'namenode', 'spark.driver.appUIAddress': 'http://namenode:4040', 'spark.app.id': 'application_1755771130202_0001', 'spark.executor.memoryOverhead': '512m', 'spark.driver.extraJavaOptions': '-Djava.net.preferIPv6Addresses=false -XX:+IgnoreUnrecognizedVMOp

In [4]:
print("Spark master:", sc.master)

Spark master: yarn


## Calcolo distribuito degli embeddings

In [None]:
import torch
import torchvision.models as models

# carica il modello resnet50 e salva i pesi pre-addestrati in spark
weights = models.ResNet50_Weights.DEFAULT
model = models.resnet50(weights=weights)
state_dict = model.state_dict()
torch.save(state_dict, "/tmp/resnet50_statedict2.pth")

sc.addFile("/tmp/resnet50_statedict2.pth")

In [None]:
def make_resnet_fn():
    """
    Questa funzione viene eseguita una volta per python executor
    Restituisce una funzione "predict"
    """
    import io
    import numpy as np
    from PIL import Image
    import torch
    import torch.nn as nn
    import torchvision.models as models
    import torchvision.transforms as T

    #legge i pesi del modello salvati in precedenza in spark
    local_path = SparkFiles.get("resnet50_statedict2.pth")
    # ottiene le operazioni di preprocessing per resnet50
    weights = models.ResNet50_Weights.DEFAULT
    preprocess = weights.transforms() 
    
    # carica il modello con i pesi pre-addestrati
    model = models.resnet50(weights=None)
    state_dict = torch.load(local_path, map_location="cpu", weights_only=False)
    model.load_state_dict(state_dict)
    #prepara feature_extractor rimuovendo ultimo layer di classificazione
    feature_extractor = nn.Sequential(*list(model.children())[:-1])  
    feature_extractor.eval()
    device = "cpu"
    feature_extractor.to(device)

    def predict(content_bytes_arr: np.ndarray) -> np.ndarray:
        """
        input: numpy array 1D di bytes (batch)
        output: numpy array 2D float32 con shape (batch, 2048)
        """
        # costruisce batch tensor
        imgs = []
        for b in content_bytes_arr:
            try:
                img = Image.open(io.BytesIO(b)).convert("RGB")
            except Exception:
                # se immagine corrotta -> sostituisce con immagine nera
                img = Image.new("RGB", (224,224))
            imgs.append(preprocess(img))

        batch = torch.stack(imgs).to(device)                    # shape (batch, 3, 224, 224)
        # estrazione features
        with torch.no_grad():
            feats = feature_extractor(batch)                    # (batch, 2048, 1, 1)
            feats = feats.reshape(feats.size(0), -1)            # (batch, 2048)
            out = feats.cpu().numpy().astype(np.float32)        # numpy 2D array
        return out

    return predict


In [None]:
# Legge le immagini da hdfs come file binari (binaryFile) e le memorizza in un dataframe
df = spark.read.format("binaryFile").load("hdfs:///user/hadoopuser/flickr30k_images/flickr30k_images/").select("path", "content")

25/08/21 12:16:05 WARN SharedInMemoryCache: Evicting cached table partition metadata from memory due to size constraints (spark.sql.hive.filesourcePartitionFileCacheSize = 262144000 bytes). This may impact query planning performance.


In [None]:
#df.show()
df.summary().show()

                                                                                

+-------+--------------------+
|summary|                path|
+-------+--------------------+
|  count|               31784|
|   mean|                NULL|
| stddev|                NULL|
|    min|hdfs://namenode:9...|
|    25%|                NULL|
|    50%|                NULL|
|    75%|                NULL|
|    max|hdfs://namenode:9...|
+-------+--------------------+



In [None]:
# crea la predict_batch_udf che restituisce una pandas UDF per l'inferenza
resnet_udf = predict_batch_udf(
    make_resnet_fn,
    return_type=ArrayType(FloatType()),
    batch_size=4
)

# Applica la predict_batch_udf alla colonna "content" del dataframe (bytes immagine) e genera gli embedding
df_with_emb = df.withColumn("embedding", resnet_udf(col("content"))).select("path", "embedding")

# Scrive i risultati su hdfs in formatopParquet
df_with_emb.write.mode("overwrite").parquet("hdfs:///user/hadoopuser/flickr_image_embeddings_parquet/")

[Stage 3:=>                                                     (36 + 1) / 1012]

## Upload embeddings su database Milvus

In [None]:
# Carica embeddings dal parquet
df_from_parquet = spark.read.parquet("hdfs:///user/hadoopuser/flickr_image_embeddings_parquet/")

# verifica dataframe
df_from_parquet.printSchema()
df_from_parquet.show()
df_from_parquet.summary().show()

root
 |-- path: string (nullable = true)
 |-- embedding: array (nullable = true)
 |    |-- element: float (containsNull = true)



                                                                                

+--------------------+--------------------+
|                path|           embedding|
+--------------------+--------------------+
|hdfs://namenode:9...|[0.0038882254, 0....|
|hdfs://namenode:9...|[0.0, 0.0, 0.0, 0...|
|hdfs://namenode:9...|[0.04085762, 0.0,...|
|hdfs://namenode:9...|[0.022724897, 0.0...|
|hdfs://namenode:9...|[0.062641725, 0.0...|
|hdfs://namenode:9...|[1.0650856, 0.003...|
|hdfs://namenode:9...|[0.9971608, 0.0, ...|
|hdfs://namenode:9...|[0.0, 0.0, 0.0, 0...|
|hdfs://namenode:9...|[0.48371467, 0.01...|
|hdfs://namenode:9...|[0.11777613, 0.0,...|
|hdfs://namenode:9...|[0.018268155, 0.0...|
|hdfs://namenode:9...|[0.033272073, 0.0...|
|hdfs://namenode:9...|[0.033873044, 0.0...|
|hdfs://namenode:9...|[0.0, 0.0, 0.0, 0...|
|hdfs://namenode:9...|[0.0, 0.047265064...|
|hdfs://namenode:9...|[0.07706165, 0.0,...|
|hdfs://namenode:9...|[0.03996246, 0.13...|
|hdfs://namenode:9...|[2.1814766, 0.0, ...|
|hdfs://namenode:9...|[0.0, 0.0, 0.0067...|
|hdfs://namenode:9...|[0.2139859



+-------+--------------------+
|summary|                path|
+-------+--------------------+
|  count|               31784|
|   mean|                NULL|
| stddev|                NULL|
|    min|hdfs://namenode:9...|
|    25%|                NULL|
|    50%|                NULL|
|    75%|                NULL|
|    max|hdfs://namenode:9...|
+-------+--------------------+



                                                                                

In [None]:
# verifica dimensione embeddings
from pyspark.sql.functions import col, size
bad_dim_count = df_from_parquet.filter(size(col("embedding")) != 2048).count()
print(bad_dim_count)

                                                                                

In [None]:
from pyspark.sql.functions import monotonically_increasing_id
from pymilvus import connections, utility, Collection, FieldSchema, CollectionSchema, DataType
import numpy as np

# Configurazione e creazione della Collezione su Milvus (eseguito sul driver) 

# Parametri di connessione e della collezione
MILVUS_HOST = "192.168.100.4"
MILVUS_PORT = "19530"
COLLECTION_NAME = "image_embeddings_spark"
DIMENSION = 2048 # Dimensione dei vettori ResNet50

# Connessione a Milvus dal driver
print(f"Connessione a Milvus su {MILVUS_HOST}:{MILVUS_PORT}")
connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT)

# Controlla se la collezione esiste già e, in caso, la elimina per rieseguire lo script
if utility.has_collection(COLLECTION_NAME):
    print(f"Collezione '{COLLECTION_NAME}' esistente. Verrà eliminata e ricreata.")
    utility.drop_collection(COLLECTION_NAME)

# Definisce lo schema della collezione
fields = [
    FieldSchema(name="id", dtype=DataType.INT64, is_primary=True, auto_id=False),
    FieldSchema(name="path", dtype=DataType.VARCHAR, max_length=65535), 
    FieldSchema(name="embedding", dtype=DataType.FLOAT_VECTOR, dim=DIMENSION)
]
schema = CollectionSchema(fields, description="Image embeddings generated with Spark")

# Crea la collezione
print(f"Creazione della collezione '{COLLECTION_NAME}'...")
collection = Collection(name=COLLECTION_NAME, schema=schema)
print("Collezione creata con successo.")

In [None]:
def upload_partition_to_milvus(partition):
    """
    Funzione per il caricamento distribuito su Milvus
    verrà eseguita su ogni partizione (worker) del DataFrame
    """
    from pymilvus import connections, Collection
    
    # ogni worker deve stabilire la propria connessione a Milvus
    connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT)
    collection = Collection(name=COLLECTION_NAME)
    
    rows = list(partition)
    if not rows:
        connections.disconnect("default")
        return

    # estrae i dati in colonne
    ids = [row["id"] for row in rows]
    paths = [row["path"] for row in rows]
    vectors = [row["embedding"] for row in rows]
    
    data_to_insert = [ids, paths, vectors]
    
    # inserisce i dati della partizione
    collection.insert(data_to_insert)
    
    # disconnette il worker
    connections.disconnect("default")

In [None]:
# Caricamento embeddings distribuito con Spark 

# aggiunge una colonna con ID univoci al dataframe letto da parquet
df_with_ids = df_from_parquet.withColumn("id", monotonically_increasing_id())

# riesegue la cache del df per evitare di ricalcolarlo
df_with_ids.cache()

print("Inizio del caricamento distribuito dei dati su Milvus...")
# applica la funzione "upload_partition_to_milvus" a ogni partizione del dataframe
df_with_ids.foreachPartition(upload_partition_to_milvus)
print("Caricamento dati completato.")

In [None]:
# Creazione dell'indice e caricamento in memoria (eseguito sul driver) 

# assicura che tutti i dati siano stati scritti su disco in Milvus
print("Flushing dei dati...")
collection.flush()
print(f"Numero totale di entità nella collezione: {collection.num_entities}")

# definisce i parametri per l'indice
index_params = {
    "metric_type": "COSINE",       # Metrica di distanza (cosine)
    "index_type": "IVF_FLAT",  
    "params": {"nlist": 1024}  
}

# crea l'indice sul campo vettoriale
print(f"Creazione dell'indice {index_params['index_type']}...")
collection.create_index(field_name="embedding", index_params=index_params)
print("Indice creato con successo.")

# carica la collezione in memoria per renderla disponibile alle ricerche
print("Caricamento della collezione in memoria...")
collection.load()
print("Collezione caricata e pronta per le ricerche.")

# disconnessione dal driver
connections.disconnect("default")

In [None]:
# Verifica che il DB risponda correttamente a una ricerca di similarità

print(f"Connessione a Milvus su {MILVUS_HOST}:{MILVUS_PORT}...")
connections.connect("default", host=MILVUS_HOST, port=MILVUS_PORT)

if not utility.has_collection(COLLECTION_NAME):
    print(f"ERRORE: La collezione '{COLLECTION_NAME}' non è stata trovata.")
else:
    print("1. Conteggio delle entità")
    collection = Collection(name=COLLECTION_NAME)
    collection.flush() # Assicura che gli ultimi dati scritti siano indicizzati
    print(f"La collezione '{COLLECTION_NAME}' contiene {collection.num_entities} entità.")

    print("2. Stato della collezione e dell'indice")
    print(f"Indici presenti: {collection.indexes}")
    
    load_state = utility.get_query_segment_info(COLLECTION_NAME)[0].state
    print(f"Stato di caricamento in memoria: {load_state}")

    print("3. Recupero di un'entità di esempio")
    sample_entity = collection.query(
      expr = "id == 0",
      output_fields = ["id", "path"]
    )
    if sample_entity:
        print("Dati recuperati per id=0:")
        print(sample_entity)
        sample_path = sample_entity[0]['path']
    else:
        print("Nessuna entità trovata con id=0.")
        sample_path = None

    print("4. Test di ricerca per similitudine")
    if sample_path:
        # input per la ricerca: il vettore appena recuperato
        vector_to_search = collection.query(expr=f"path == '{sample_path}'", output_fields=["embedding"])[0]['embedding']
        
        search_params = {
            "metric_type": "COSINE",
            "params": {"nprobe": 10}, 
        }

        # esegue la ricerca
        results = collection.search(
            data=[vector_to_search],          # vettore da cercare
            anns_field="embedding",           # campo su cui cercare
            param=search_params,
            limit=3,                          # ricerca dei 3 risultati più simili
            output_fields=["path"]            # restituisce il path delle immagini trovate
        )
        
        # Se il primo risultato ha distanza 1 e corrisponde all'immagine di partenza, la ricerca funziona
        print("La ricerca ha prodotto i seguenti risultati:")
        for i, hit in enumerate(results[0]):
            print(f"-- Risultato {i+1}:")
            print(f"   ID: {hit.id}")
            print(f"   Distanza: {hit.distance:.4f} ")
            print(f"   Path: {hit.entity.get('path')}")
            
connections.disconnect("default")

In [18]:
spark.stop()