### Prepare the Base Data
Adding a descriptive text column for each record. This becomes a retrievable knowledge snippet

In [9]:
%pip install --upgrade --force-reinstall typing_extensions==4.12.2 pydantic==2.7.4 pydantic_core==2.23.3 openai


StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 20, Finished, Available, Finished)

Collecting typing_extensions==4.12.2
  Using cached typing_extensions-4.12.2-py3-none-any.whl.metadata (3.0 kB)
Collecting pydantic==2.7.4
  Using cached pydantic-2.7.4-py3-none-any.whl.metadata (109 kB)
Collecting pydantic_core==2.23.3
  Using cached pydantic_core-2.23.3-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (6.6 kB)
Collecting openai
  Using cached openai-2.6.1-py3-none-any.whl.metadata (29 kB)
Collecting annotated-types>=0.4.0 (from pydantic==2.7.4)
  Using cached annotated_types-0.7.0-py3-none-any.whl.metadata (15 kB)
INFO: pip is looking at multiple versions of pydantic to determine which version is compatible with other requirements. This could take a while.
[31mERROR: Cannot install pydantic==2.7.4 and pydantic_core==2.23.3 because these package versions have conflicting dependencies.[0m[31m
[0m
The conflict is caused by:
    The user requested pydantic_core==2.23.3
    pydantic 2.7.4 depends on pydantic-core==2.18.4

To fix this you could try t

In [2]:
# Step 1: Read the main Gold table
df = spark.read.format("delta").load("Tables/gold_cf_deviation")

# Step 2: Create the textual summary for embedding
from pyspark.sql import functions as F
df = df.withColumn(
    "summary_text",
    F.concat_ws(" ",
        F.lit("On"), F.date_format("hour", "yyyy-MM-dd HH:mm"),
        F.lit("observed CF was"), F.round("observed_cf",3),
        F.lit(", baseline was"), F.round("historical_avg_cf",3),
        F.lit(", deviation"), F.round("deviation_cf",3)
    )
)
df.write.mode("overwrite").format("delta").save("Tables/gold_rag_ready")



StatementMeta(, 0323f8a5-95b5-410a-a5de-5669257d7ea6, 4, Finished, Available, Finished)

Right now the `summary_text` is purely numeric ("on this date, CF was X, baseline was Y…").
We can enrich it with contextual knowledge from your metadata and Zenodo baseline, so when the LLM retrieves records, it can generate explanations. 


In [3]:
# Bring in the Metadata
meta = (
    spark.read.format("delta").load("Tables/silver_meta_capacity_fi")
    .groupBy("type")
    .agg(F.sum("total_capacity_mw").alias("installed_capacity_mw"))
)

# Compute a National-Level Context Snippet
meta_info = meta.collect()
context_text = "Finland's hydropower system includes: " + \
    ", ".join([f"{r['type']} ({int(r['installed_capacity_mw'])} MW)" for r in meta_info]) + "."
print(context_text)



StatementMeta(, b795c0fe-a84c-493f-bffe-822191f9b0b4, 5, Finished, Available, Finished)

Finland's hydropower system includes: HDAM (1345 MW), HROR (1289 MW).


In [4]:
# Add Seasonal / Baseline Context

baseline_month = (
    spark.read.format("delta").load("Tables/gold_zenodo_baseline_monthly")
    .select("month", "baseline_avg_cf")
)

# Build the Enriched Summary Text

df = spark.read.format("delta").load("Tables/gold_cf_deviation") \
    .withColumn("month", F.month("hour"))

df = df.join(baseline_month, on="month", how="left")

df = df.withColumn(
    "summary_text",
    F.concat_ws(" ",
        F.lit(context_text),
        F.lit("On"), F.date_format("hour", "yyyy-MM-dd HH:mm,"),
        F.lit("the observed capacity factor was"), F.round("observed_cf",3),
        F.lit(", compared to a baseline of"), F.round("baseline_avg_cf",3),
        F.lit(". The deviation was"), F.round("deviation_cf",3),
        F.lit(". This measurement falls in month"), F.col("month"),
        F.lit(", historically characterized by seasonal patterns of inflow variation.")
    )
)

df.write.mode("overwrite").format("delta").save("Tables/gold_rag_ready_enriched")



StatementMeta(, b795c0fe-a84c-493f-bffe-822191f9b0b4, 6, Finished, Available, Finished)

Now each record carries:

- Numeric values (hour, observed_cf, deviation_cf)

- Seasonal context (month + baseline CF)

- Structural context (installed capacity + typology summary)

Why This Matters for RAG ?
- The retriever finds relevant time periods and understands seasonal and typological context.

- The LLM can produce explanations like:

    - "October 2025 showed lower run-of-river output, typical during low inflow months."

This makes the AI assistant’s answers context-aware, accurate, and explainable - crucial for domain trust.

Next Steps:

- Generate embeddings from gold_rag_ready_enriched.

- Build Vector Index using that enriched table.

- Connect RAG agent (LLM endpoint) to this richer dataset.

### Generating Embeddings

In [None]:
from openai import AzureOpenAI
import os

# --- VERIFY THESE THREE VALUES ---
# 1. Found on the "Keys and Endpoint" page in your Azure OpenAI resource
AZURE_OPENAI_ENDPOINT = "<endpoint_url>"  # e.g., "https://your-resource-name.openai.azure.com/"

# 2. Found on the "Keys and Endpoint" page (use KEY 1 or KEY 2)
AZURE_OPENAI_API_KEY = "<api_key>"  

# 3. Found on the "Deployments" page in Azure AI Studio. This is YOUR name for the deployment.
DEPLOYMENT_NAME = "text-embedding-3-small" # model deployment name

# --- Client Initialization (should be correct from our last fix) ---
client = AzureOpenAI(
    azure_endpoint=AZURE_OPENAI_ENDPOINT,
    api_key=AZURE_OPENAI_API_KEY,
    api_version="2024-02-01" # Or any other supported API version
)

# --- Making the API call ---
try:
    response = client.embeddings.create(
        input="This is a test embedding.",
        model=DEPLOYMENT_NAME   # Use the deployment name variable here
    )
    print("Embedding created successfully!")
    print("Embedding length:", len(response.data[0].embedding))

except Exception as e:
    print(f"An error occurred: {e}")


StatementMeta(, 3a939653-12c5-4eee-8a94-8927dbd08c9a, 19, Finished, Available, Finished)

Embedding created successfully!
Embedding length: 1536


In [21]:
# --- DATA PROCESSING ---
from pyspark.sql import functions as F
# 3. Load your DataFrame
df = spark.read.format("delta").load("Tables/gold_rag_ready_enriched")
#df.show(3)

model_name = "text-embedding-3-small"
pandas_df = df.select("summary_text").toPandas()
embeddings = []
for text in pandas_df["summary_text"]:
    if text and text.strip():
        try:
            response = client.embeddings.create(model=model_name, input=text)
            embeddings.append(response.data[0].embedding)
        except Exception as e:
            print("Error embedding text:", e)
            embeddings.append(None)
    else:
        embeddings.append(None)

pandas_df["embedding_vector"] = embeddings

# Convert back to Spark DataFrame
df_embedded = spark.createDataFrame(pandas_df)

# Save. Later overwriteSchema can be changed to mergeSchema
df_embedded.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .format("delta") \
    .save("Tables/gold_embeddings_enriched")


StatementMeta(, 3a939653-12c5-4eee-8a94-8927dbd08c9a, 28, Finished, Available, Finished)

In [23]:
spark.read.format("delta").load("Tables/gold_embeddings_enriched").printSchema()


StatementMeta(, 3a939653-12c5-4eee-8a94-8927dbd08c9a, 30, Finished, Available, Finished)

root
 |-- summary_text: string (nullable = true)
 |-- embedding_vector: array (nullable = true)
 |    |-- element: double (containsNull = true)



### Step 2 – Create a Vector Index in Fabric

Now the embeddings have been created, we create index:

Since I could not find graphical / low-code way of doing this in Fabric, I implemented FAISS locally as a vlid substitute :

- Source: choose the Lakehouse → Tables/gold_embeddings_enriched.

- Key column: hour

- Vector column: embedding_vector

- Similarity metric

- Save the index.

In [24]:
%pip install faiss-cpu

StatementMeta(, 3a939653-12c5-4eee-8a94-8927dbd08c9a, 35, Finished, Available, Finished)

Collecting faiss-cpu
  Downloading faiss_cpu-1.12.0-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl.metadata (5.1 kB)
Downloading faiss_cpu-1.12.0-cp311-cp311-manylinux_2_27_x86_64.manylinux_2_28_x86_64.whl (31.4 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m31.4/31.4 MB[0m [31m64.4 MB/s[0m eta [36m0:00:00[0m:00:01[0m00:01[0m
[?25hInstalling collected packages: faiss-cpu
Successfully installed faiss-cpu-1.12.0

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.



In [13]:
# Load embeddings from delta table

import numpy as np

# Load the table that has your embedding column
df = spark.read.format("delta").load("Tables/gold_embeddings_enriched")

# Collect embeddings as numpy array
embeddings = [row.embedding_vector for row in df.select("embedding_vector").collect() if row.embedding_vector is not None]
embeddings = np.array(embeddings, dtype="float32")
print(embeddings.shape)


StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 25, Finished, Available, Finished)

(5040, 1536)


In [17]:
# Build FAISS Index

import faiss

dimension = embeddings.shape[1]  # 1536 for text-embedding-3-small
index = faiss.IndexFlatL2(dimension)  # L2 distance (can also use cosine)

index.add(embeddings)
print("Number of vectors in index:", index.ntotal)


StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 29, Finished, Available, Finished)

Number of vectors in index: 5040


In [None]:
import os
import numpy as np
from openai import AzureOpenAI

client = AzureOpenAI(
    api_key="<api_key>",
    azure_endpoint="<endpoint_url>",
    api_version="2024-05-01-preview"
)
model_name = "text-embedding-3-small"

# Get embedding for the query
#query = "Which days had strong hydropower generation?"
#query = "Periods where observed capacity factor was above baseline"
query = "When did storage plants perform below expectations?"

query_embedding = np.array(
    client.embeddings.create(model=model_name, input=query).data[0].embedding,
    dtype="float32"
)

# Search top 5 matches
distances, indices = index.search(np.array([query_embedding]), k=5)

print("Nearest indices:", indices)
print("Distances:", distances)


StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 30, Finished, Available, Finished)

Nearest indices: [[4706 4698 3266 4688 4708]]
Distances: [[1.4307094 1.4317044 1.4319755 1.4320235 1.4327888]]


In [31]:
# Map Results Back to the Original Data (top 5)
rows = df.select("summary_text").collect()
for rank, i in enumerate(indices[0]):
    print(f"{rank+1}. {rows[i].summary_text[:200]}...")


StatementMeta(, 3a939653-12c5-4eee-8a94-8927dbd08c9a, 43, Finished, Available, Finished)

1. Finland's hydropower system includes: HDAM (1345 MW), HROR (1289 MW). On 2025-10-14 07:00, the observed capacity factor was 0.759 , compared to a baseline of 0.557 . The deviation was 0.267 . This mea...
2. Finland's hydropower system includes: HDAM (1345 MW), HROR (1289 MW). On 2025-10-15 09:00, the observed capacity factor was 0.683 , compared to a baseline of 0.547 . The deviation was 0.192 . This mea...
3. Finland's hydropower system includes: HDAM (1345 MW), HROR (1289 MW). On 2025-10-15 09:00, the observed capacity factor was 0.683 , compared to a baseline of 0.557 . The deviation was 0.192 . This mea...
4. Finland's hydropower system includes: HDAM (1345 MW), HROR (1289 MW). On 2025-10-13 07:00, the observed capacity factor was 0.672 , compared to a baseline of 0.557 . The deviation was 0.181 . This mea...
5. Finland's hydropower system includes: HDAM (1345 MW), HROR (1289 MW). On 2025-10-13 07:00, the observed capacity factor was 0.672 , compared to a baseline of 0.647 . The

In [None]:
# Load and prepare gold table

from pyspark.sql import functions as F

# Load the enriched gold table
df = spark.read.format("delta").load("Tables/gold_rag_ready_enriched")

# Aggregate to one record per hour
df_unique = (
    df.groupBy("hour", "month")
      .agg(
          F.first("observed_cf").alias("observed_cf"),
          F.avg("historical_avg_cf").alias("historical_avg_cf"),
          F.avg("deviation_cf").alias("deviation_cf"),
          F.avg("baseline_avg_cf").alias("baseline_avg_cf"),
          F.first("summary_text").alias("summary_text")
      )
)

# Clean summary text
df_unique = df_unique.withColumn("summary_text", F.trim(F.col("summary_text")))

display(df_unique)

StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 45, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, dc9f8ac1-c630-417f-ae7a-7693e3853130)

In [48]:
# Add a stable numeric index for FAISS alignment. This ensures index = 0, 1, 2, …, N-1
# matching the same positional order you get from your FAISS embedding array.

from pyspark.sql.window import Window
from pyspark.sql import functions as F

w = Window.orderBy(F.monotonically_increasing_id())
df_unique_indexed = df_unique.withColumn("index", F.row_number().over(w) - 1)


# Save this so it’s always available
df_unique_indexed.write.mode("overwrite").format("delta").save("Tables/gold_rag_indexed")


StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 60, Finished, Available, Finished)

In [36]:
# Build FAISS index from the above indexed table

import numpy as np
import faiss

# Load embeddings table
df_embeddings = spark.read.format("delta").load("Tables/gold_embeddings_enriched")

# Collect embeddings in the same order as df_unique_indexed
rows = df_embeddings.select("embedding_vector").collect()
embeddings = np.array([r.embedding_vector for r in rows], dtype="float32")

# Create FAISS index
dimension = embeddings.shape[1]
index = faiss.IndexFlatL2(dimension)
index.add(embeddings)
print("Index size:", index.ntotal)


StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 48, Finished, Available, Finished)

Index size: 5040


In [37]:
# Check what each table contains
print("df_unique_indexed:", df_unique_indexed.count())
print("df_embeddings:", df_embeddings.count())

display(df_unique_indexed.select("summary_text").limit(3))
display(df_embeddings.select("summary_text").limit(3))


StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 49, Finished, Available, Finished)

df_unique_indexed: 168
df_embeddings: 5040


SynapseWidget(Synapse.DataFrame, 85f89679-6e68-43e5-b646-ec544649dac5)

SynapseWidget(Synapse.DataFrame, e6a4cfe9-6f0d-47b6-b26c-89652a295e09)

In [38]:
common = df_unique_indexed.join(df_embeddings, on="summary_text", how="inner").count()
print("Common summary_text rows:", common)


StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 50, Finished, Available, Finished)

Common summary_text rows: 168


In [39]:
# Fix misalignment

from pyspark.sql.functions import monotonically_increasing_id

df_embeddings_aligned = (
    df_embeddings.join(
        df_unique_indexed.select("summary_text", "index"),
        on="summary_text",
        how="inner"
    )
)

# Verify counts again
print("Aligned embeddings count:", df_embeddings_aligned.count())


StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 51, Finished, Available, Finished)

Aligned embeddings count: 168


In [40]:
# Collect a few rows from both datasets and compare indexes
pdf_unique = df_unique_indexed.select("index", "summary_text").orderBy("index").limit(5).toPandas()
pdf_embeds = df_embeddings_aligned.select("index", "summary_text").orderBy("index").limit(5).toPandas()



StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 52, Finished, Available, Finished)

   index                                       summary_text
0      0  Finland's hydropower system includes: HDAM (13...
1      1  Finland's hydropower system includes: HDAM (13...
2      2  Finland's hydropower system includes: HDAM (13...
3      3  Finland's hydropower system includes: HDAM (13...
4      4  Finland's hydropower system includes: HDAM (13...
   index                                       summary_text
0      0  Finland's hydropower system includes: HDAM (13...
1      1  Finland's hydropower system includes: HDAM (13...
2      2  Finland's hydropower system includes: HDAM (13...
3      3  Finland's hydropower system includes: HDAM (13...
4      4  Finland's hydropower system includes: HDAM (13...


Now, I can see the corresponding texts are matching

In [42]:
df_embeddings_aligned.write.mode("overwrite").format("delta").save("Tables/gold_embeddings_aligned")


StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 54, Finished, Available, Finished)

In [43]:
df_embeddings = spark.read.format("delta").load("Tables/gold_embeddings_aligned")


StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 55, Finished, Available, Finished)

In [None]:
# 4️⃣ Run a FAISS Search and Collect Top-k Results
import os
import numpy as np
from openai import AzureOpenAI

client = AzureOpenAI(
    api_key="<api_key>",
    azure_endpoint="<endpoint_url>",
    api_version="2024-05-01-preview"
)
model_name = "text-embedding-3-small"

# Get embedding for the query
#query = "Which days had strong hydropower generation?"
#query = "Periods where observed capacity factor was above baseline"
#query = "When did storage plants perform below expectations?"
query = "When was hydropower output below baseline?"

query_embedding = np.array(
    client.embeddings.create(model=model_name, input=query).data[0].embedding,
    dtype="float32"
)

# Search top 5 matches
distances, indices = index.search(np.array([query_embedding]), k=5)

print("Nearest indices:", indices)
print("Distances:", distances)

StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 56, Finished, Available, Finished)

Nearest indices: [[1842 1838 1851 4418 1830]]
Distances: [[0.89200574 0.89233136 0.8925607  0.8926093  0.89261746]]


In [45]:
# 5️⃣ Build the Results DataFrame

from pyspark.sql import Row
import pandas as pd

results = []
for rank, (idx, dist) in enumerate(zip(indices[0], distances[0]), start=1):
    results.append({
        "rank": rank,
        "index": int(idx),        # this now aligns with df_unique_indexed
        "distance": float(dist)
    })

result_df = spark.createDataFrame(results)


StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 57, Finished, Available, Finished)

In [46]:
# 6️⃣ Join with Metadata Using the Numeric Index

result_df = result_df.join(df_unique_indexed, on="index", how="left")
display(result_df)


StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 58, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, 3474846d-9481-45a5-9823-6204a1313d5d)

In [32]:
# Save the index for reuse
faiss.write_index(index, "/mnt/data/hydro_index.faiss")
# Reload anytime
# index = faiss.read_index("/mnt/data/hydro_index.faiss")



StatementMeta(, 3a939653-12c5-4eee-8a94-8927dbd08c9a, 44, Finished, Available, Finished)

RuntimeError: Error in faiss::FileIOWriter::FileIOWriter(const char*) at /project/third-party/faiss/faiss/impl/io.cpp:103: Error: 'f' failed: could not open /mnt/data/hydro_index.faiss for writing: No such file or directory

Since monotonically increasing id didnt fix the problem, we proceeded with sequential unique ids, and now recreating the embeddings

In [51]:
df_embeddings.columns

StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 63, Finished, Available, Finished)

['summary_text', 'embedding_vector']

In [54]:
# -------------------------------------------------
# 1. Drop any stray "index" from df_embeddings
# -------------------------------------------------
if "index" in df_embeddings.columns:
    df_embeddings = df_embeddings.drop("index")

# -------------------------------------------------
# 2. Pull the column from df_unique_indexed and give it a temp name
# -------------------------------------------------
df_right = (
    df_unique_indexed
    .select("summary_text", "index")
    .withColumnRenamed("index", "right_index")
)

# optional: materialise to avoid any lazy‑evaluation surprises
df_right.cache()

# -------------------------------------------------
# 3. Join
# -------------------------------------------------
df_embeddings_aligned = (
    df_embeddings
    .join(df_right, on="summary_text", how="inner")
)

# -------------------------------------------------
# 4. Rename back to the desired name
# -------------------------------------------------
df_embeddings_aligned = df_embeddings_aligned.withColumnRenamed("right_index", "index")

# -------------------------------------------------
# 5. Verify (optional)
# -------------------------------------------------
print("Final columns :", df_embeddings_aligned.columns)

# -------------------------------------------------
# 6. Write – force schema overwrite
# -------------------------------------------------
df_embeddings_aligned.write \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .format("delta") \
    .save("Tables/gold_embeddings_aligned")

StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 66, Finished, Available, Finished)

Final columns : ['summary_text', 'embedding_vector', 'index']


In [56]:
# Build FAISS index from gold_embeddings_aligned

df_embeddings_aligned = (
    spark.read.format("delta").load("Tables/gold_embeddings_aligned")
    .withColumn("index", F.col("index").cast("long"))
)
rows = df_embeddings_aligned.orderBy("index").select("embedding_vector").collect()
embeddings = np.array([r.embedding_vector for r in rows], dtype="float32")


dimension = embeddings.shape[1]
index = faiss.IndexFlatL2(dimension)
index.add(embeddings)


StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 68, Finished, Available, Finished)

In [57]:
# Step 5 — Run Retrieval Again and Join

result_df = result_df.join(df_unique_indexed, on="index", how="left")
display(result_df)


StatementMeta(, 2153eb38-f185-4447-a969-32cf25ef7c07, 69, Finished, Available, Finished)

SynapseWidget(Synapse.DataFrame, d668091d-6b47-4f84-afe5-8f94d7d63ef1)

### Step 3: Set Up the LLM Client