## Vector DB Ingestion
<img src="https://docs.langchain4j.dev/img/logo.svg" alt="LangChain4J" width="200" height="200">

In [4]:
%use dataframe
%useLatestDescriptors
import com.fasterxml.jackson.module.kotlin.jacksonObjectMapper
import com.fasterxml.jackson.module.kotlin.readValue
import dev.langchain4j.data.segment.TextSegment
import dev.langchain4j.store.embedding.EmbeddingStore
import dev.langchain4j.store.embedding.pgvector.PgVectorEmbeddingStore
import dev.langchain4j.model.embedding.*
import dev.langchain4j.model.embedding.onnx.allminilml6v2.AllMiniLmL6V2EmbeddingModel
import dev.langchain4j.data.embedding.*
import dev.langchain4j.store.embedding.*
import dev.langchain4j.store.embedding.filter.MetadataFilterBuilder.metadataKey
import dev.langchain4j.data.document.Document
import dev.langchain4j.data.document.Metadata
import java.util.Arrays
import java.util.concurrent.atomic.AtomicInteger
import org.jetbrains.kotlinx.dataframe.codeGen.generateCode
import org.jetbrains.kotlinx.dataframe.api.*
import org.jetbrains.kotlinx.dataframe.io.*
import java.sql.DriverManager
import kotlin.Metadata as KotlinMetadata
import dev.langchain4j.model.openai.OpenAiEmbeddingModel
import dev.langchain4j.model.openai.OpenAiEmbeddingModelName
import java.util.UUID

import io.kotest.matchers.comparables.shouldBeGreaterThanOrEqualTo
import io.kotest.matchers.shouldBe
import io.kotest.matchers.shouldNotBe
import io.kotest.matchers.string.shouldStartWith
import dev.langchain4j.data.document.splitter.DocumentSplitters


In [2]:
val host = "localhost"
val port = 5430
val user = "user"
val password = "password"
val database = "vector_store"
val jdbcUrl = "jdbc:postgresql://$host:$port/$database"
val dbConfig = DatabaseConfiguration(jdbcUrl, user, password)
val tableName = "talks"
val mapper = jacksonObjectMapper()

// Test Connection
DataFrame.readSqlQuery(dbConfig, "SELECT table_name FROM information_schema.tables").size().nrow  shouldBeGreaterThanOrEqualTo 0
println("Connection to database successful")

Connection to database successful


## 📘 Ingestion for exercises
#### Prepare Dataframe from: [dataset-kotlinconf.json](../main/resources/dataset-kotlinconf.json).


In [22]:
val df = DataFrame.readJson("./data/dataset-jfall.json")
val sessions = df.explode("sessions").select("sessions").rename("sessions").into("session").flatten()
//generateCode enables us to access the individual columns in a type safe way, rather than Strings
sessions.generateCode()
//this shows the schema of the sessions
sessions.schema()

title: String
startsAt: String
endsAt: String
category: List<String>
room: String
speakers: List<String>
description: String

#### Execute Ingestion precedure

In [14]:
val startsAt = sessions.map { java.time.Instant.parse(startsAt).toEpochMilli() }.first()
startsAt

1762412400000

In [11]:
val openAIApiToken = System.getenv("OPENAI_API_KEY") ?: error("OPENAI_API_KEY environment variable not set")
val embeddingModel:EmbeddingModel = OpenAiEmbeddingModel.builder()
    .apiKey(openAIApiToken)
    .modelName(OpenAiEmbeddingModelName.TEXT_EMBEDDING_3_SMALL)
    .build()

// define EmbeddingStore
val embeddingStore: EmbeddingStore<TextSegment> = PgVectorEmbeddingStore.builder()
    .host(host)
    .port(port)
    .user(user)
    .password(password)
    .database(database)
    .table(tableName)
    .dimension(embeddingModel.dimension())
    .dropTableFirst(true)
    .build();



In [15]:

val zone = java.time.ZoneId.systemDefault()
val conferenceDaysSorted = sessions
    .map { java.time.Instant.parse(it.startsAt).atZone(zone).toLocalDate() }
    .distinct()
    .sorted()
val dayIndexByDate = conferenceDaysSorted.mapIndexed { idx, date -> date to (idx + 1) }.toMap()

fun phaseOfDayFrom(hour: Int, durationMinutes: Long): String = when {
    durationMinutes >= 8 * 60 -> "ALL_DAY"
    hour in 5..11 -> "MORNING"
    hour in 12..17 -> "AFTERNOON"
    else -> "EVENING"
}



//AllMiniLmL6V2EmbeddingModel() //("Use AllMiniLmL6V2EmbeddingModel")


// define ingestor
val ingestor:EmbeddingStoreIngestor = EmbeddingStoreIngestor.builder()
    .documentSplitter(DocumentSplitters.recursive(200, 40))
    .embeddingModel(embeddingModel)
    .embeddingStore(embeddingStore)
    .build()

//: produce list of Documents each containing the session title
val documents:List<Document> = sessions.map {
    val startZoned = java.time.Instant.parse(startsAt).atZone(zone)
    val endZoned = java.time.Instant.parse(endsAt).atZone(zone)
    val startLocal = startZoned.toLocalDateTime()
    val endLocal = endZoned.toLocalDateTime()
    val conferenceDay = startZoned.toLocalDate()
    val durationMinutes = java.time.Duration.between(startZoned, endZoned).toMinutes()
    val phaseOfDay = phaseOfDayFrom(startLocal.hour, durationMinutes)

    Document.from("title:$title, description:$description",
        dev.langchain4j.data.document.Metadata(mapOf<String, Any>(
            "id" to UUID.randomUUID().toString().replace("-", ""),
            "title" to title,
            "room" to room,
            "category" to category.joinToString(),
            "category" to speakers.joinToString(),
            // Extra derived metadata in local date-time (no UTC suffix)
            "startsAtLocalDateTime" to java.time.format.DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(startLocal),
            "endsAtLocalDateTime" to java.time.format.DateTimeFormatter.ISO_LOCAL_DATE_TIME.format(endLocal),
            "startsAt" to java.time.Instant.parse(startsAt).toEpochMilli(),
            "endsAt" to java.time.Instant.parse(endsAt).toEpochMilli(),
            "conferenceDay" to conferenceDay.toString(),
            "dayIndex" to (dayIndexByDate[conferenceDay] ?: 1),
            "phaseOfDay" to phaseOfDay,
            "durationMinutes" to durationMinutes.toInt(),
        ))
    )
}

// execute ingestion
val ingestionResult = ingestor.ingest(documents)

println("Ingested a total of ${documents.size} documents with a total token count of: ${ingestionResult.tokenUsage().totalTokenCount()}")


Ingested a total of 71 documents with a total token count of: 4178


#### Verify Ingestion


In [17]:
val dbDf = DataFrame.readSqlTable(dbConfig, tableName)
dbDf.select( "text", "embedding", "embedding_id", "metadata").head(10)

text,embedding,embedding_id,metadata
title:Revving Up with Java: A Beginne...,"[-0.020378873,0.027413664,-0.00345909...",6c2ede9a-458a-40cf-90b5-cc6d8d0098aa,"{  ""phaseOfDay"" : ""MORNING"",  ""star..."
This beginner‑friendly talk introduce...,"[-0.0069729052,0.016010482,0.01309274...",630b9ed1-a353-4732-be96-5d3a7791e66d,"{  ""phaseOfDay"" : ""MORNING"",  ""star..."
title:Catching the 137-Killer: A Java...,"[0.0011748782,0.059644945,-0.00058037...",ec9dc6f0-2d07-4177-a500-50e97cb9c040,"{  ""phaseOfDay"" : ""MORNING"",  ""star..."
"Learn about JVM memory areas, contain...","[0.022722434,0.049607567,0.067292884,...",65dca831-85b7-4e27-a584-110108d138bd,"{  ""phaseOfDay"" : ""MORNING"",  ""star..."
title:Documentation Is a Team Sport –...,"[0.011785803,0.048529778,-0.006459527...",4d5a5aca-f918-4956-86b0-5134fe60ba40,"{  ""phaseOfDay"" : ""MORNING"",  ""star..."
This talk explores what different rol...,"[0.007829867,0.02853109,0.031095402,-...",25c73a31-d569-4cad-a7f1-0360cd7997c8,"{  ""phaseOfDay"" : ""MORNING"",  ""star..."
title:Be more productive with Intelli...,"[-0.035801046,0.06568833,-0.007295544...",57ad747a-b2d2-4fe7-bace-911957a2bf97,"{  ""phaseOfDay"" : ""MORNING"",  ""star..."
"Gradle, Spring, Git and databases.","[-0.032662544,0.045115143,0.09803867,...",4eaf6945-5fbd-4a32-a12c-d14b8bfa9ce5,"{  ""phaseOfDay"" : ""MORNING"",  ""star..."
"Gradle, Spring, Git and databases. Se...","[-0.0069827777,0.02162491,0.06995544,...",cddbad8b-5d9a-411c-89aa-bc8334c74190,"{  ""phaseOfDay"" : ""MORNING"",  ""star..."
title:When ORM Becomes OMG: Performan...,"[-0.0034041237,0.066089325,0.05124473...",c6ae5dd9-4a71-4c71-9037-2235c212559c,"{  ""phaseOfDay"" : ""MORNING"",  ""star..."


In [20]:
//verify db


val searchQuery:String = "Sessions about Agentic AI and MCP Model context Protocol"
// embed the search
val searchQueryEmbedding = embeddingModel.embed(searchQuery).content()

// create the search request with metadata for: `category` containsString `Server-side`
val request: EmbeddingSearchRequest = EmbeddingSearchRequest.builder()
    .queryEmbedding(searchQueryEmbedding)
    .maxResults(5)
    .minScore(0.5)
    //.filter(metadataKey("category").containsString("Server-side"))
    .build()


// perform the search via the EmbeddingStore
println("Looking for talks similar to: $searchQuery...\n")
val relevant:List<EmbeddingMatch<TextSegment>> = embeddingStore.search(request).matches()
//relevant.forEach{
//    println("Matched text: ${it.embedded().text()} with similarity score: ${it.score()}")
//}

val res = relevant.groupBy { it.embedded().metadata().getString("title") }.map { (_, sameDocs) ->
    sameDocs.sortedByDescending { it.score() }.first()
}.also { println("Unique titles found: \n-\t${it.joinToString("\n-\t") { it.embedded().metadata().getString("title").toString()}}") }



Looking for talks similar to: Sessions about Agentic AI and MCP Model context Protocol...

Unique titles found: 
-	Supercharge your LLM with Java: Model Context Protocol (MCP) in Action
-	Level Up Your LangChain4j Apps for Production
-	From Scratch to Scalable: Building Smarter AI Agents with Frameworks
-	Enterprise Gen AI with Embabel
-	How to build your own fun and absurd pair programmer
