In [None]:
%pip install neo4j
%pip install pandas

In [1]:
from neo4j import GraphDatabase
import os
from dotenv import load_dotenv

load_dotenv()

URI = os.getenv("NEO4J_URI")
USERNAME = os.getenv("NEO4J_USERNAME")
PASSWORD = os.getenv("NEO4J_PASSWORD")

# Function to run a query
def run_query(tx, query, parameters={}):
    return tx.run(query, parameters).data()

def grap_read_query(query, parameters={}):
    driver = GraphDatabase.driver(URI, auth=(USERNAME, PASSWORD))
    with driver.session() as session:
        result = session.execute_read(run_query, parameters)

    driver.close()
    return result


In [None]:
from pymilvus import MilvusClient
from sentence_transformers import SentenceTransformer
from pymilvus import connections, MilvusClient,model
def emb_text(text):
    model = SentenceTransformer("Muennighoff/SGPT-125M-weightedmean-nli-bitfit")
    embedding = model.encode([text], normalize_embeddings=True)
    return embedding[0].tolist()

def vector_query(query, limit, output_fields, collection_name): 
    # con = connections.connect(
    # alias="default",
    # host='',
    # port='19530'
    # )    
    milvus_client = MilvusClient("vector.db") # Solo para servidor
    search_res = milvus_client.search(
        collection_name=collection_name,
        data=[
            emb_text(query)
        ],  
        limit=limit,
        search_params={"metric_type": "COSINE", "params": {}},  # Inner product distance
        output_fields=output_fields,  # Return the text field
    )
    return search_res[0]

In [None]:
# Realización de proceso de query completa
def get_papers_and_citations(initial_query):
    collection_name = "papers"
    vector_result = vector_query(initial_query, 10, ["Abstract", "Title", "TLDR", "Conference"], collection_name)
    paper_titles = [result["entity"]["Title"] for result in vector_result]

    query_graph = """
        MATCH r=(p:Paper)-[:CITES]-(:Paper)
        WHERE p.title IN $paper_titles
        RETURN r
    """
    driver = GraphDatabase.driver(URI, auth=(USERNAME, PASSWORD), database="merged")

    with driver.session() as session:
        graph_result = session.execute_read(run_query, query_graph, {"paper_titles": paper_titles})

    driver.close()
    return {
        "vector": vector_result,
        "graph": graph_result
    }




In [None]:
results = vector_query("Serverless computing greatly simplifies the use of cloud resources. In particular, Function-as-a-Service (FaaS) platforms enable programmers to develop applications as individual functions that can run and scale independently. Unfortunately, applications that require fine-grained support for mutable state and synchronization, such as machine learning (ML) and scientific computing, are notoriously hard to build with this new paradigm. In this work, we aim at bridging this gap", 100, ["Abstract", "Title", "TLDR", "Conference"], "papers")

for result in results:
    print(result["entity"]["Title"])

In [None]:
global_results = get_papers_and_citations("Stateful serverless computing") 


In [None]:
def get_paper_matches(title, graph):
    cites = []
    cited_by = []
    matches=[]
    for match in graph:
        if match["r"][0]["title"] == title:
            cites.append(match["r"][-1])
            matches.append(match["r"])
        elif match["r"][-1]["title"] == title:
            cited_by.append(match["r"][0])
            matches.append(match["r"])
    
    return {
        "matches": matches,
        "cites": cites,
        "cited_by": cited_by,
    }

In [None]:
import json
def get_merged_results(global_results):
    merged_results = []
    for vector in global_results["vector"]:
        citations = get_paper_matches(
            vector["entity"]["Title"], 
            global_results["graph"]
        )
        matches = citations["matches"]
        cites = citations["cites"]
        cited_by = citations["cited_by"]
        if matches == []: 
            # continue
            matches = None
            cites = None
            cited_by = None
            community_id = None
            connected_component_id = None
            authors = None
            predominant_country = None
            predominant_continent = None
        elif matches[0][0]["title"] == vector["entity"]["Title"]:
            community_id = matches[0][0]["communityId"] 
            connected_component_id = matches[0][0]["connectedComponentId"] 
            # authors = [{"name": author} for author in json.loads(matches[0][0]["Authors"])] 
            authors = [author for author in json.loads(matches[0][0]["Authors"])] 
            predominant_country = json.loads(matches[0][0]["PredominantCountry"])[0]
            predominant_continent = json.loads(matches[0][0]["PredominantContinent"])[0]
        else:
            community_id = matches[0][-1]["communityId"]
            connected_component_id = matches[0][-1]["connectedComponentId"]
            # authors = [{"name": author} for author in json.loads( matches[0][-1]["Authors"] ) ]
            authors = [ author for author in json.loads( matches[0][-1]["Authors"] ) ]
            predominant_country = json.loads(matches[0][-1]["PredominantCountry"])[0]
            predominant_continent = json.loads(matches[0][-1]["PredominantContinent"])[0]

        merged_result = {
            "title": vector["entity"]["Title"],
            "abstract": vector["entity"]["Abstract"],
            "tldr": vector["entity"]["TLDR"],
            "conference": vector["entity"]["Conference"],
            "authors": authors,
            "predominant_country": predominant_country,
            "predominant_continent": predominant_continent,
            "community_id": community_id,
            "connected_component_id": connected_component_id,
            # "matches": matches,
            "cites": cites,
            "cites_total": len(cites) if cites is not None else 0,
            # "cited_by": cited_by,
            "cited_by_total": len(cited_by) if cited_by is not None else 0,
        }
        merged_results.append(merged_result)

    return merged_results



In [None]:
global_results = get_papers_and_citations("Stateful serverless computing")
merged_results = get_merged_results(global_results)


In [None]:
# Inicia sesión de spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, desc, row_number
from pyspark.sql.window import Window

spark = SparkSession.builder \
    .appName("Milvus-PySpark") \
    .config("spark.driver.memory", "2g") \
    .getOrCreate()

In [None]:
df_initial = spark.createDataFrame(merged_results)
df_initial.printSchema()

In [None]:
def get_communities_count_df(df):
    return df.select("community_id")\
        .where(col("community_id").isNotNull())\
        .groupBy("community_id")\
        .count()\
        .orderBy("count", ascending=False)

def get_connected_components_count_df(df):
    return df.select("connected_component_id")\
        .where(col("connected_component_id").isNotNull())\
        .groupBy("connected_component_id")\
        .count()\
        .orderBy("count", ascending=False)

def get_top_authors_df(df, top=3):
    return df.select("authors")\
        .where( col("author").isNotNull())\
        .groupBy("author")\
        .count()\
        .orderBy("count", ascending=False)\
        .limit(top)

def get_top_countries_df(df, top=3):
    return df.select("predominant_country")\
        .where( col("predominant_country").isNotNull())\
        .groupBy("predominant_country")\
        .count()\
        .orderBy("count", ascending=False)\
        .limit(top)

def get_top_institutions_df(df, top=3):
    return df.select("institution")\
        .where( col("institution").isNotNull())\
        .groupBy("institution")\
        .count()\
        .orderBy("count", ascending=False)\
        .limit(top)

In [None]:
# Rutinas posteriores al procesamiento inicial
# Obtención de los nodos asociados a cada una de las componentes conexas
def get_component_node_data(connected_components):
    query_graph = """
        MATCH (p:Paper)-[:HAS_INSTITUTION]->(i:Institution)-[:LOCATED_IN]->(c:Country)
        WHERE p.connectedComponentId IN $connected_components
        RETURN p, i, c
    """
    driver = GraphDatabase.driver(URI, auth=(USERNAME, PASSWORD), database="merged")

    with driver.session() as session:
        graph_result = session.execute_read(run_query, query_graph, {"connected_components": connected_components})

    driver.close()
    return graph_result

# Obtención de los datos de comunidades
def get_communities_node_data(communities):
    query_graph = """
        MATCH (p:Paper)-[:HAS_INSTITUTION]->(i:Institution)-[:LOCATED_IN]->(c:Country)
        WHERE p.communityId IN $communities
        RETURN p, i, c
    """
    driver = GraphDatabase.driver(URI, auth=(USERNAME, PASSWORD), database="merged")

    with driver.session() as session:
        graph_result = session.execute_read(run_query, query_graph, {"communities": communities})

    driver.close()
    return graph_result

# Obtención conjunta de los datos relacionados con los nodos recuperados, por agregación
def get_related_nodes_data(df):
    communities = [row["community_id"] for row in get_communities_count_df(df).select("community_id").collect()]
    connected_components = [row["connected_component_id"] for row in get_connected_components_count_df(df).select("connected_component_id").collect()]

    communities_nodes = get_communities_node_data(communities)
    connected_components_nodes = get_component_node_data(connected_components)

    return {
        "communities": communities_nodes,
        "connected_components": connected_components_nodes,
    }

# Generación de DF de los datos relacionados con los nodos recuperados, por agregación
def get_related_nodes_df(df):
    nodes_data = get_related_nodes_data(df_initial)

    communities_df = spark.createDataFrame([{ **node["p"], "institution": node["i"], "country": node["c"] } for node in nodes_data["communities"]])
    connected_components_df = spark.createDataFrame([{ **node["p"], "institution": node["i"], "country": node["c"] }  for node in nodes_data["connected_components"]])

    return {
        "communities": communities_df,
        "connected_components": connected_components_df
    }


In [None]:
# Cálculos relacionados con las comunidades y componentes conexas
df_initial.persist()
related_nodes_dfs = get_related_nodes_df(df_initial)
communities_df = related_nodes_dfs["communities"]
connected_components_df = related_nodes_dfs["connected_components"]

df_initial.unpersist()

In [None]:
communities_df.unpersist()
communities_df.persist()

# Obtenciónn de las pripcipales instituciones
window = Window.partitionBy("community_id", "institution_name").orderBy("count")

communities_top_institution_df = communities_df\
    .select("title", col("communityId").alias("community_id"), col("institution.name").alias("institution_name"))\
    .distinct()\
    .groupBy("community_id", "institution_name")\
    .count()\
    .withColumn("row_number", row_number().over(window))\
    .orderBy("count", ascending=False)\
    .select("community_id", "institution_name", "count")\
    .where(col("row_number") <= 5)

window = Window.partitionBy("community_id").orderBy(desc("count"))
communities_top_countries_df = communities_df\
    .select("title", col("communityId").alias("community_id"), col("country.name").alias("country_name"))\
    .distinct()\
    .groupBy("community_id", "country_name")\
    .count()\
    .withColumn("row_number", row_number().over(window))\
    .orderBy("community_id", desc("count"))\
    .select("community_id", "country_name", "count")\
    .where(col("row_number") <= 5)

communities_df.unpersist()


In [None]:
communities_top_institution_df.show(100)

In [None]:
communities_df.select("institution.name").show()

In [None]:
communities_df.show()