In [31]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *
import subprocess
import weaviate
import json
import pandas as pd
import random
from typing import List, Dict, Any
import os
from sentence_transformers import SentenceTransformer

from scripts import weaviate_utils


from weaviate.classes.query import MetadataQuery
from scripts.weaviate_utils import get_external_ips


input_schema = StructType([
    StructField("name", StringType(), True),
    StructField("ip", StringType(), True),
    StructField("port", StringType(), True),
    StructField("query", StringType(), True),
])

output_schema = StructType([
    StructField("rag_text", StringType(), True),
    StructField("certainity", FloatType(), True),
    StructField("distance", FloatType(), True),
])


vector_schema = ArrayType(FloatType())


os.environ["COLLECTION_RETRIEVAL_STRATEGY"] = "LocalOnly"


spark = SparkSession.builder.appName("weaviate_deneme").getOrCreate()
embedding_model = SentenceTransformer('sentence-transformers/all-MiniLM-L6-v2')
services = weaviate_utils.get_external_ips()
grpc_ip = services["grpc"]["ip"]

25/04/15 02:04:32 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


In [32]:
def generate_embedding(query: str) -> list:
    try:
        q_embedding = embedding_model.encode(query).tolist()
        return q_embedding
    except Exception as e:
        print(f"err: {e}")
        return []

generate_embedding_udf = F.udf(generate_embedding, vector_schema)

In [33]:

def search_weaviate(cluster_name: str, cluster_ip: str, cluster_port: str, grpc_ip:str, query_vector: list) -> dict:
    try:
        with  weaviate.connect_to_custom(    
            http_host=cluster_ip,
            http_port=cluster_port,
            http_secure=False,
            grpc_host=grpc_ip,
            grpc_port=50051,
            grpc_secure=False)    as client:
            
            if not client.is_ready():
                raise Exception("Weaviate instance is not ready.")

            chunks = client.collections.get(f"dist_data_{cluster_name[-1]}")
    
            results = chunks.query.near_vector(
                query_vector,
                limit=1,
                return_metadata=MetadataQuery(distance=True, certainty=True),

                
            )

            client.close()
        
        del chunks, client

    except Exception as e:
        print(f"Error connecting to Weaviate instance: {e}")
        return []
    
    if not results:
        print("empty results")
        return []
    
    return_arr = []
    for result in results.objects:
        metadata = result.metadata
        distance = metadata.distance
        certainity = metadata.certainty
        rag_text = result.properties['context']
        
        return_arr.append(
            [rag_text, certainity, distance]
        )

        
    return return_arr
   


search_weaviate_udf = F.udf(search_weaviate, ArrayType(output_schema))

In [34]:
query = "What is the capital of France?"

df = spark.createDataFrame([services[service] for service in services if service != "grpc"], input_schema)  ## spark df yarat
df = df.withColumn("query", F.col("query").cast(StringType())).withColumn("query", F.lit(query))            ## lit ile query'yi ekle
df = df.withColumn("query_embedding", generate_embedding_udf(F.col("query")))                               ## udf ile embeddingleri al

df = df.withColumn("result_struct", search_weaviate_udf(
    F.col("name"),
    F.col("ip"),
    F.col("port"),
    F.lit(grpc_ip),
    F.col("query_embedding")
))

df = df.withColumn("exploded_result", F.explode("result_struct")) \
  .withColumn("rag_text", F.col("exploded_result.rag_text")) \
    .withColumn("certainity", F.col("exploded_result.certainity")) \
    .withColumn("distance", F.col("exploded_result.distance")) \
    .select("name", "ip", "port", "rag_text", "certainity", "distance") \
    

df.show(truncate=False)
    

Error connecting to Weaviate instance: Query call with protocol GRPC search failed with message <AioRpcError of RPC that terminated with:
	status = StatusCode.UNKNOWN
	details = "could not find class Dist_data_5 in schema"
	debug_error_string = "UNKNOWN:Error received from peer  {grpc_message:"could not find class Dist_data_5 in schema", grpc_status:2, created_time:"2025-04-15T02:04:48.228342539+03:00"}"
>.


+------------------+-------------+----+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+----------+----------+
|name              |ip           |port|rag_text                                                                                                                                                                                                                                                                                                                                                                          |certainity|distance  |
+------------------+-------------+----+-----------------------------------------------------------------------------------------------

                                                                                