# **Knowledge Representation in RAG methods**

Contributors:
* Szymon Pająk
* Tomasz Ogiołda

## Temporary notes

### Plan

1. Introduction
2. Background
  - What is RAG? Why is it used?
  - What kinds of knowledge representations RAG can use?
    - Vectorized embeddings
    - Knowledge graph
    - Combination of both
    - Comparison https://neo4j.com/blog/genai/graphrag-manifesto/

  - Explain the dataflow for both knowledge representations (the whole process, from raw data, to querying the knowledge database)
3. Demo

Tools to be used:

- langchain?
- neo4j

4. Resources

- https://neo4j.com/blog/genai/graphrag-manifesto/
- https://neo4j.com/blog/developer/langchain4j-graphrag-vector-stores-retrievers/
- https://neo4j.com/blog/genai/what-is-retrieval-augmented-generation-rag/
- https://neo4j.com/blog/developer/knowledge-graph-rag-application/
- https://neo4j.com/blog/news/graphrag-ecosystem-tools/

## **RAG quickstart & Motivation**

Some text

In [21]:
!pip install neo4j google-generativeai



In [5]:
from google.colab import userdata

NEO4J_URI = userdata.get('NEO4J_URI')
NEO4J_PASS = userdata.get('NEO4J_PASS')
NEO4J_DB_USER = userdata.get('NEO4J_DB_USER')
GOOGLE_API_KEY = userdata.get('GOOGLE_API_KEY')

In [110]:
from neo4j import GraphDatabase
import google.generativeai as genai

genai.configure(api_key=GOOGLE_API_KEY)

URI = "neo4j+s://3a2f9088.databases.neo4j.io"

embedding_model = genai.GenerativeModel('models/text-embedding-004')
generative_llm = genai.GenerativeModel('gemini-1.5-flash-latest')

def get_db():
  with GraphDatabase.driver(NEO4J_URI, auth=(NEO4J_DB_USER, NEO4J_PASS)) as driver:
      driver.verify_connectivity()
      return driver

## **Data preparation & Indexing**

In [24]:
import kagglehub

path = kagglehub.dataset_download("devdope/900k-spotify")

Downloading from https://www.kaggle.com/api/v1/datasets/download/devdope/900k-spotify?dataset_version_number=3...


100%|██████████| 1.00G/1.00G [00:25<00:00, 41.8MB/s]

Extracting files...





In [44]:
import numpy as np
import pandas as pd

songs_csv = path + '/spotify_dataset.csv'

full_df = pd.read_csv(songs_csv)

In [98]:
np.random.seed(9)

df = full_df.sample(20000)
df = df[['Artist(s)','song', 'text', 'emotion', 'Length', 'Album', 'Genre', 'Energy', 'Popularity', 'Danceability', 'Positiveness']]
df[['Energy', 'Popularity', 'Danceability', 'Positiveness']] = df[['Energy', 'Popularity', 'Danceability', 'Positiveness']].astype(int)/100

In [102]:
df.head()

Unnamed: 0,Artist(s),song,text,emotion,Length,Tempo,Album,Genre,Release Date,Explicit,Energy,Popularity,Danceability,Positiveness,Liveness
465551,The Cast of Mary Poppins,Mary Poppins,"[Verse 1] Rose gold, rose quartz Stone cold, w...",anger,02:02,80,Mary Poppins Original Soundtrack,hip hop,6th February 2018,No,0.61,0.35,0.53,0.92,0.23
5902,98º,If Only She Knew,"If only she knew What was going right, I...",joy,04:27,96,98 Degrees And Rising,hip hop,1st January 1998,No,0.46,0.29,0.76,0.59,0.11
84105,CHASETHEMONEY,Been Dat,[Intro: Lil Yachty] Chase the money 'til a nig...,joy,01:42,115,Slim.E and Friends,hip hop,31st October 2020,Yes,0.42,0.22,0.86,0.51,0.15
31291,"Arthur Sullivan,Richard Lewis/Ian Wallace/Pro ...",And Have I Journeyd for a Month,"[NANKI-POO] And have I journeyed for a month, ...",joy,00:50,86,Gilbert & Sullivan: The Mikado,hip hop,13th June 2011,No,0.04,0.0,0.45,0.31,0.56
48636,Beyond Creation,Ethereal Kingdom,Alone among the living Man is haunted by the k...,sadness,05:19,96,Algorythm,"rock,garage rock",12th October 2018,No,0.96,0.2,0.37,0.09,0.35


In [129]:
import pandas as pd
from concurrent.futures import ThreadPoolExecutor, as_completed
import neo4j # Assuming you have your Neo4j driver imported and 'db' object set up

# Assuming 'get_db' is a function that returns a Neo4j Database instance
# For multithreading, it's often better to get a *new* session per thread/task,
# or ensure the session object is thread-safe.
# For simplicity, we'll assume `db` is the Neo4j GraphDatabase object,
# and we'll create a session inside each worker.

# --- Helper function for processing a single row/song ---
def process_song_row(row_data, song_id, db_uri, db_user, db_password):
    """
    Processes a single song row and ingests it into Neo4j.
    Each call opens its own session to ensure thread safety.
    """
    try:
        # Create a new session for this thread/task
        # It's crucial for thread safety to manage sessions per thread/task.
        # This assumes db_uri, db_user, db_password are passed to the worker.
        driver = neo4j.GraphDatabase.driver(db_uri, auth=(db_user, db_password))
        with driver.session() as session:
            song_title = row_data['song']
            lyrics = str(row_data['text'])
            emotion = row_data['emotion']
            time_length = row_data['Length']
            album_name = row_data['Album']
            energy = row_data['Energy']
            popularity = row_data['Popularity']
            danceability = row_data['Danceability']
            positiveness = row_data['Positiveness']

            # Create/Merge Song (executed once per song)
            # This MERGE needs to be done carefully to avoid race conditions
            # if multiple threads try to create the same song.
            # Using MERGE on a unique constraint like Song.id handles this well.
            session.run("""
                MERGE (s:Song {id: $song_id})
                ON CREATE SET
                    s.title = $song_title,
                    s.lyrics = $lyrics,
                    s.time_length = $time_length,
                    s.energy = $energy,
                    s.popularity = $popularity,
                    s.danceability = $danceability,
                    s.positiveness = $positiveness
                ON MATCH SET
                    s.title = $song_title,
                    s.lyrics = $lyrics,
                    s.time_length = $time_length,
                    s.energy = $energy,
                    s.popularity = $popularity,
                    s.danceability = $danceability,
                    s.positiveness = $positiveness
            """, song_id=song_id, song_title=song_title, lyrics=lyrics,
               time_length=time_length, energy=float(energy) if pd.notna(energy) else None,
               popularity=float(popularity) if pd.notna(popularity) else None,
               danceability=float(danceability) if pd.notna(danceability) else None,
               positiveness=float(positiveness) if pd.notna(positiveness) else None)

            # Artists
            artist_names = []
            if pd.notna(row_data['Artist(s)']):
                artist_names = [name.strip() for name in str(row_data['Artist(s)']).split(',')]
            for artist_name in artist_names:
                if artist_name:
                    session.run("""
                        MERGE (ar:Artist {name: $artist_name})
                        WITH ar
                        MATCH (s:Song {id: $song_id})
                        MERGE (ar)-[:PERFORMED]->(s)
                    """, artist_name=artist_name, song_id=song_id)

            # Album
            if pd.notna(album_name) and album_name.strip():
                session.run("""
                    MERGE (al:Album {name: $album_name})
                    WITH al
                    MATCH (s:Song {id: $song_id})
                    MERGE (s)-[:APPEARS_ON]->(al)
                """, album_name=album_name.strip(), song_id=song_id)

            # Genre
            genre_names = []
            if pd.notna(row_data['Genre']):
                genre_names = [name.strip() for name in str(row_data['Genre']).split(',')]
            for genre_name in genre_names:
                if genre_name:
                    session.run("""
                        MERGE (g:Genre {name: $genre_name})
                        WITH g
                        MATCH (s:Song {id: $song_id})
                        MERGE (s)-[:HAS_GENRE]->(g)
                    """, genre_name=genre_name, song_id=song_id)

            # Emotion
            if pd.notna(emotion) and emotion.strip():
                session.run("""
                    MERGE (e:Emotion {name: $emotion})
                    WITH e
                    MATCH (s:Song {id: $song_id})
                    MERGE (s)-[:EVOKES]->(e)
                """, emotion=emotion.strip(), song_id=song_id)
        driver.close() # Close driver for this task
        return f"Successfully processed song ID: {song_id}"
    except Exception as e:
        return f"Error processing song ID {song_id}: {e}"

# --- Main ingestion function ---
def ingest_music_data_multithreaded(df, db_uri, db_user, db_password, max_workers=8):
    print(f"Starting multithreaded data ingestion for {len(df)} songs with {max_workers} workers...")

    # It's important to pass connection details to each thread,
    # or ensure your `get_db()` function returns a thread-safe object
    # (e.g., a driver that can manage multiple sessions).
    # For robust multithreading, getting a new session per task is safest.

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = []
        for idx, row in df.iterrows():
            song_id = str(idx) # Ensure ID is a string for consistency
            futures.append(executor.submit(process_song_row, row, song_id, db_uri, db_user, db_password))

        processed_count = 0
        for future in as_completed(futures):
            result = future.result()
            processed_count += 1
            if "Error" in result:
                print(f"Error: {result}")
            # else:
            #     print(result) # Uncomment if you want to see success messages

            if processed_count % 100 == 0:
                print(f"Processed {processed_count}/{len(df)} songs.")

    print(f"Finished multithreaded data ingestion. Total processed: {processed_count}/{len(df)} songs.")


with db.session() as session:
    # Using 'title' and 'artist_names_key' as a composite key for songs might be better if titles aren't unique
    # For simplicity, using 'song_id' assuming 'index' from DataFrame is unique ID for a song
    session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (s:Song) REQUIRE s.id IS UNIQUE")
    session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (ar:Artist) REQUIRE ar.name IS UNIQUE")
    session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (al:Album) REQUIRE al.name IS UNIQUE") # Album names might not be unique across artists
    session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (g:Genre) REQUIRE g.name IS UNIQUE")
    session.run("CREATE CONSTRAINT IF NOT EXISTS FOR (e:Emotion) REQUIRE e.name IS UNIQUE")

    session.run("CREATE INDEX IF NOT EXISTS FOR (s:Song) ON (s.title)")
    session.run("CREATE INDEX IF NOT EXISTS FOR (al:Album) ON (al.name)") # If querying albums by name
    print("Neo4j constraints and basic indexes for music graph ensured.")

ingest_music_data_multithreaded(df.iloc[20:], NEO4J_URI, NEO4J_DB_USER, NEO4J_PASS)


  with db.session() as session:


Neo4j constraints and basic indexes for music graph ensured.
Starting multithreaded data ingestion for 19980 songs with 8 workers...
Processed 100/19980 songs.
Processed 200/19980 songs.
Processed 300/19980 songs.
Processed 400/19980 songs.
Processed 500/19980 songs.
Processed 600/19980 songs.
Processed 700/19980 songs.
Processed 800/19980 songs.
Processed 900/19980 songs.
Processed 1000/19980 songs.
Processed 1100/19980 songs.
Processed 1200/19980 songs.
Processed 1300/19980 songs.
Processed 1400/19980 songs.
Processed 1500/19980 songs.
Processed 1600/19980 songs.
Processed 1700/19980 songs.
Processed 1800/19980 songs.
Processed 1900/19980 songs.
Processed 2000/19980 songs.
Processed 2100/19980 songs.
Processed 2200/19980 songs.
Processed 2300/19980 songs.
Processed 2400/19980 songs.
Processed 2500/19980 songs.
Processed 2600/19980 songs.
Processed 2700/19980 songs.
Processed 2800/19980 songs.
Processed 2900/19980 songs.
Processed 3000/19980 songs.
Processed 3100/19980 songs.
Processe

## **Retrieval**

Some text

In [None]:
# Some code

## **Generation**

Some text

In [None]:
# Some code

## **Challenges & Future Development**

Some text

In [None]:
# Some code