In [20]:
!pip install faker numpy pandas python-box scikit-learn
import numpy as np
import pandas as pd
from faker import Faker
from box import Box
import json
import random



In [21]:
config = Box({
    "num_students": 2000,
    "schools": ["KV_"+str(i) for i in range(1, 21)],
    "classes": ["11A", "11B", "12A", "12B"],
    "concepts": {
        "core": ["molarity", "stoichiometry", "acid_base", "indicators"],
        "secondary": ["dilution", "ph_scale", "neutralization"]
    },
    "error_types": {
        "procedural": ["wrong_indicator", "burette_reading", "endpoint_missed"],
        "conceptual": ["molarity_calculation", "ph_interpretation", "stoichiometry_error"]
    }
})

experiment_config = {
    "titration": {
        "procedural_errors": ["wrong_indicator", "burette_handling"],
        "conceptual_errors": ["molarity_calc", "equivalence_point"],
        "related_concepts": ["acid_base", "indicators", "stoichiometry"]
    },
    "calorimetry": {
        "procedural_errors": ["insulation_missing", "thermometer_placement"],
        "conceptual_errors": ["heat_loss_calculation", "specific_heat"],
        "related_concepts": ["thermodynamics", "energy_transfer"]
    }
}

In [22]:
fake = Faker()
np.random.seed(42)

def generate_student():
    student = {
        "student_id": f"STU_{fake.unique.bothify(text='????####')}",
        "school": np.random.choice(config.schools),
        "class": np.random.choice(config.classes),
        "error_profile": np.random.choice(["novice", "theory_weak", "careless"], p=[0.3, 0.4, 0.3]),
        "experiment_history": []  # Always initialize as empty list
    }

    # Generate 2-3 random experiments per student
    experiments = np.random.choice(
        list(experiment_config.keys()),
        size=np.random.randint(2, 4),  # Adjusted to 2-3 experiments
        replace=True  # Allow sampling with replacement
    )

    for exp in experiments:
        student["experiment_history"].append(
            generate_experiment_data(exp, student["error_profile"])
        )

    return student

def generate_experiment_data(exp_type, profile):
    num_errors = np.random.poisson(lam=2) + 1
    errors = []

    for _ in range(num_errors):
        # Use experiment-specific error config
        if np.random.rand() < 0.6 if profile=="novice" else 0.3:
            error_type = "procedural"
            specific_error = np.random.choice(experiment_config[exp_type]["procedural_errors"])
        else:
            error_type = "conceptual"
            specific_error = np.random.choice(experiment_config[exp_type]["conceptual_errors"])

        errors.append({
            "error_type": error_type,
            "specific_error": specific_error,
            "concepts": get_related_concepts(exp_type, specific_error),
            "severity": np.random.randint(1,5)
        })

    return {
        "experiment_type": exp_type,
        "errors": errors
    }

def get_related_concepts(exp_type, specific_error):
    # Map specific errors to related concepts
    if specific_error in experiment_config[exp_type]["procedural_errors"]:
        return experiment_config[exp_type]["related_concepts"][:1]  # First concept
    else:
        return experiment_config[exp_type]["related_concepts"][1:]  # Remaining concepts

In [23]:
students = [generate_student() for _ in range(config.num_students)]

# Validate student records
for student in students:
    if "experiment_history" not in student:
        print(f"Invalid student record: {student['student_id']}")
        student["experiment_history"] = []  # Fix missing key

# Add peer cluster patterns
for school in config.schools:
    # 1 common error per experiment type
    common_errors = {
        exp: np.random.choice(experiment_config[exp]["procedural_errors"])
        for exp in experiment_config
    }

    # Apply to 30% students from this school
    for student in students:
        if student["school"] == school and np.random.rand() < 0.3:
            if "experiment_history" not in student:
                student["experiment_history"] = []  # Ensure key exists
            for exp_history in student["experiment_history"]:
                if np.random.rand() < 0.4:  # 40% chance to add common error
                    exp_type = exp_history["experiment_type"]
                    exp_history["errors"].append({
                        "error_type": "procedural",
                        "specific_error": common_errors[exp_type],
                        "concepts": experiment_config[exp_type]["related_concepts"][:1],
                        "severity": 3
                    })

In [24]:
with open("labmate_dataset.json", "w") as f:
    json.dump({"students": students}, f, indent=2)

!cp labmate_dataset.json "/content/drive/MyDrive/"  # Save to Google Drive

cp: cannot create regular file '/content/drive/MyDrive/': No such file or directory


In [25]:
print(f"Total Students: {len(students)}")
print(f"Total Errors: {sum(len(exp['errors']) for s in students for exp in s['experiment_history'])}")
print("\nSample Student:")
print(json.dumps(students[0], indent=2))

Total Students: 2000
Total Errors: 15851

Sample Student:
{
  "student_id": "STU_VkKy6496",
  "school": "KV_7",
  "class": "12B",
  "error_profile": "careless",
  "experiment_history": [
    {
      "experiment_type": "calorimetry",
      "errors": [
        {
          "error_type": "procedural",
          "specific_error": "insulation_missing",
          "concepts": [
            "thermodynamics"
          ],
          "severity": 4
        },
        {
          "error_type": "procedural",
          "specific_error": "insulation_missing",
          "concepts": [
            "thermodynamics"
          ],
          "severity": 4
        },
        {
          "error_type": "procedural",
          "specific_error": "thermometer_placement",
          "concepts": [
            "thermodynamics"
          ],
          "severity": 4
        }
      ]
    },
    {
      "experiment_type": "titration",
      "errors": [
        {
          "error_type": "procedural",
          "specific_error

In [26]:
!pip install --upgrade scikit-learn



In [27]:
from sklearn.preprocessing import OneHotEncoder

def build_feature_matrix(students):
    # Initialize feature matrix
    feature_matrix = []

    # Extract features for each student
    for student in students:
        features = {}

        # School and class
        features["school"] = student["school"]
        features["class"] = student["class"]

        # Error counts per experiment type
        for exp in experiment_config:
            for error_type in ["procedural", "conceptual"]:
                key = f"{exp}_{error_type}"
                features[key] = sum(
                    1 for exp_history in student["experiment_history"]
                    for error in exp_history["errors"]
                    if exp_history["experiment_type"] == exp and error["error_type"] == error_type
                )

        # Concept mastery scores
        for concept in config.concepts.core + config.concepts.secondary:
            features[concept] = sum(
                1 for exp_history in student["experiment_history"]
                for error in exp_history["errors"]
                if concept in error["concepts"]
            )

        feature_matrix.append(features)

    # Convert to DataFrame
    df = pd.DataFrame(feature_matrix)

    # One-hot encode categorical features
    encoder = OneHotEncoder()  # Remove `sparse=False`
    encoded_features = encoder.fit_transform(df[["school", "class"]]).toarray()  # Convert to dense array
    encoded_df = pd.DataFrame(encoded_features, columns=encoder.get_feature_names_out(["school", "class"]))

    # Combine with numerical features
    final_df = pd.concat([df.drop(columns=["school", "class"]), encoded_df], axis=1)

    return final_df

# Build feature matrix
feature_matrix = build_feature_matrix(students)
print(feature_matrix.head())

   titration_procedural  titration_conceptual  calorimetry_procedural  \
0                     2                     0                       3   
1                     0                     0                       6   
2                     4                     0                       0   
3                     7                     0                       3   
4                     3                     4                       0   

   calorimetry_conceptual  molarity  stoichiometry  acid_base  indicators  \
0                       0         0              0          2           0   
1                       0         0              0          0           0   
2                       0         0              0          4           0   
3                       0         0              0          7           0   
4                       0         0              4          3           4   

   dilution  ph_scale  ...  school_KV_4  school_KV_5  school_KV_6  \
0         0         0  ...   

In [28]:
!pip install lightfm
from lightfm import LightFM
from scipy.sparse import csr_matrix

def prepare_cf_data(feature_matrix):
    # Create user-item matrix (student x error_type)
    error_types = list({f"{exp}_{et}" for exp in experiment_config for et in ["procedural", "conceptual"]})
    user_item_matrix = np.zeros((len(feature_matrix), len(error_types)))

    for i, row in feature_matrix.iterrows():
        for j, error_type in enumerate(error_types):
            user_item_matrix[i][j] = row[error_type]

    return csr_matrix(user_item_matrix), error_types

# Prepare data
user_item_matrix, error_types = prepare_cf_data(feature_matrix)

# Train model
model = LightFM(loss='warp')  # Weighted Approximate-Rank Pairwise
model.fit(user_item_matrix, epochs=20)

# Predict procedural errors for titration
def predict_errors(student_id, experiment_type="titration"):
    student_idx = feature_matrix.index[feature_matrix.index == student_id].tolist()[0]
    scores = model.predict(student_idx, np.arange(len(error_types)))

    # Filter for procedural errors in titration
    target_errors = [et for et in error_types if et.startswith(f"{experiment_type}_procedural")]
    target_indices = [error_types.index(et) for et in target_errors]

    # Get top 3 predictions
    top_indices = np.argsort(scores[target_indices])[-3:]
    return [target_errors[i] for i in top_indices]

# Example usage
student_id = feature_matrix.index[0]  # First student
print(f"Predicted errors for student {student_id}: {predict_errors(student_id)}")

Predicted errors for student 0: ['titration_procedural']


In [29]:
from sklearn.model_selection import train_test_split
from sklearn.metrics import precision_score, recall_score
import numpy as np
from lightfm import LightFM
from scipy.sparse import csr_matrix

# Previous code...

# Split data into train/test sets
train_matrix, test_matrix = train_test_split(user_item_matrix, test_size=0.2, random_state=42)

# Train model on training data
model = LightFM(loss='warp')
model.fit(train_matrix, epochs=20)

# Evaluate on test data
def evaluate_model(model, test_matrix, error_types, experiment_type="titration"):
    precision_scores = []
    recall_scores = []

    for student_idx in range(test_matrix.shape[0]):
        # Get true errors
        true_errors_indices = test_matrix[student_idx].nonzero()[1]
        true_errors = [error_types[i] for i in true_errors_indices if error_types[i].startswith(f"{experiment_type}_procedural")]

        # Predict errors
        predicted_errors = predict_errors(student_idx, experiment_type)

        # Check if there are any true errors
        if not true_errors:
            # If no true errors, skip precision/recall calculation for this student
            continue

        # Calculate precision and recall
        # We need to convert the lists of strings into an array that the precision and recall functions understand
        true_array = np.array([1 if error in true_errors else 0 for error in error_types if error.startswith(f"{experiment_type}_procedural")])
        predicted_array = np.array([1 if error in predicted_errors else 0 for error in error_types if error.startswith(f"{experiment_type}_procedural")])

        precision = precision_score(true_array, predicted_array, average='micro', zero_division=0) # Added the zero_division parameter to resolve divide by zero warnings
        recall = recall_score(true_array, predicted_array, average='micro', zero_division=0) # Added the zero_division parameter to resolve divide by zero warnings

        precision_scores.append(precision)
        recall_scores.append(recall)

    return np.mean(precision_scores), np.mean(recall_scores)

# Evaluate
precision, recall = evaluate_model(model, test_matrix, error_types)
print(f"Precision: {precision:.2f}, Recall: {recall:.2f}")

Precision: 1.00, Recall: 1.00


In [30]:
# Check error type distribution
error_counts = feature_matrix[[f"{exp}_{et}" for exp in experiment_config for et in ["procedural", "conceptual"]]].sum()
print("Error Type Distribution:")
print(error_counts)

# Check concept mastery distribution
concept_counts = feature_matrix[config.concepts.core + config.concepts.secondary].sum()
print("\nConcept Mastery Distribution:")
print(concept_counts)

Error Type Distribution:
titration_procedural      6852
titration_conceptual       857
calorimetry_procedural    7246
calorimetry_conceptual     896
dtype: int64

Concept Mastery Distribution:
molarity             0
stoichiometry      857
acid_base         6852
indicators         857
dilution             0
ph_scale             0
neutralization       0
dtype: int64


In [31]:
#RAG Implementation
#Data Preprocessing
import json

# Load the JSON file (update the filename if needed)
with open("labmate_dataset.json", "r") as f:
    data = json.load(f)
    students = data.get("students", [])

documents = []
for student in students:
    student_id = student.get("student_id", "UnknownID")
    for experiment in student.get("experiment_history", []):
        experiment_type = experiment.get("experiment_type", "UnknownExperiment")
        for error in experiment.get("errors", []):
            error_type = error.get("error_type", "UnknownError")
            specific_error = error.get("specific_error", "NoDetail")
            concepts = ", ".join(error.get("concepts", []))
            severity = error.get("severity", "N/A")

            doc = (f"Student: {student_id}; Experiment: {experiment_type}; "
                   f"Error: {error_type} - {specific_error}; Concepts: {concepts}; "
                   f"Severity: {severity}")
            documents.append(doc)

print(f"Total Documents Created: {len(documents)}")


Total Documents Created: 15851


In [32]:
!pip install faiss-cpu




In [33]:
import faiss
print("FAISS version:", faiss.__version__)


FAISS version: 1.10.0


In [34]:
!pip install sentence-transformers




In [36]:
#Retreival
import numpy as np
import faiss
from sentence_transformers import SentenceTransformer
import json

# Load data from labmate_data.json
# Changed filename from 'labmate_dataset.json' to 'labmate_data.json'
with open("labmate_dataset.json", "r") as f:
    labmate_data = json.load(f)
    students = labmate_data.get("students", [])

# Create a list of documents by iterating over each student's experiment history
documents = []
for student in students:
    student_id = student.get("student_id", "UnknownID")
    for experiment in student.get("experiment_history", []):
        experiment_type = experiment.get("experiment_type", "UnknownExperiment")
        for error in experiment.get("errors", []):
            error_type = error.get("error_type", "UnknownError")
            specific_error = error.get("specific_error", "NoDetail")
            concepts = ", ".join(error.get("concepts", []))
            severity = error.get("severity", "N/A")
            # Construct a document string capturing key details
            doc = (f"Student: {student_id}; Experiment: {experiment_type}; "
                   f"Error Type: {error_type}; Specific Error: {specific_error}; "
                   f"Concepts: {concepts}; Severity: {severity}")
            documents.append(doc)

print(f"Total Documents Created: {len(documents)}")

# Load a pretrained SentenceTransformer model
model = SentenceTransformer('all-MiniLM-L6-v2')

# Generate embeddings for all documents
doc_embeddings = model.encode(documents)

# Create a FAISS index with L2 distance
dimension = doc_embeddings.shape[1]
index = faiss.IndexFlatL2(dimension)
index.add(np.array(doc_embeddings, dtype=np.float32))

# Save the FAISS index for later use (optional)
faiss.write_index(index, "faiss_index.bin")
print("FAISS index created and saved.")

# Function to retrieve context given a student query
def retrieve_context(query, top_k=3):
    query_vec = model.encode([query])
    distances, indices = index.search(np.array(query_vec, dtype=np.float32), top_k)
    return [documents[i] for i in indices[0]]

# Example usage
query = "Why did my titration fail?"
context_docs = retrieve_context(query)
print("Retrieved Context:", context_docs)

Total Documents Created: 15851
FAISS index created and saved.
Retrieved Context: ['Student: STU_jdIY8865; Experiment: titration; Error Type: procedural; Specific Error: wrong_indicator; Concepts: acid_base; Severity: 4', 'Student: STU_TqDW5488; Experiment: titration; Error Type: procedural; Specific Error: wrong_indicator; Concepts: acid_base; Severity: 1', 'Student: STU_xwLa9843; Experiment: titration; Error Type: procedural; Specific Error: wrong_indicator; Concepts: acid_base; Severity: 3']


In [37]:
def retrieve_context(query, top_k=3):
    query_vec = model.encode([query])
    distances, indices = index.search(np.array(query_vec, dtype=np.float32), top_k)
    # Retrieve the corresponding documents
    return [documents[i] for i in indices[0]]

# Example query
query = "Why did my titration fail?"
context_docs = retrieve_context(query)
print("Retrieved Context:")
for doc in context_docs:
    print(doc)


Retrieved Context:
Student: STU_jdIY8865; Experiment: titration; Error Type: procedural; Specific Error: wrong_indicator; Concepts: acid_base; Severity: 4
Student: STU_TqDW5488; Experiment: titration; Error Type: procedural; Specific Error: wrong_indicator; Concepts: acid_base; Severity: 1
Student: STU_xwLa9843; Experiment: titration; Error Type: procedural; Specific Error: wrong_indicator; Concepts: acid_base; Severity: 3


In [38]:
!pip install langchain




In [39]:
!pip install openai




In [41]:
!pip install langchain_community

Collecting langchain_community
  Downloading langchain_community-0.3.18-py3-none-any.whl.metadata (2.4 kB)
Collecting dataclasses-json<0.7,>=0.5.7 (from langchain_community)
  Downloading dataclasses_json-0.6.7-py3-none-any.whl.metadata (25 kB)
Collecting pydantic-settings<3.0.0,>=2.4.0 (from langchain_community)
  Downloading pydantic_settings-2.8.1-py3-none-any.whl.metadata (3.5 kB)
Collecting httpx-sse<1.0.0,>=0.4.0 (from langchain_community)
  Downloading httpx_sse-0.4.0-py3-none-any.whl.metadata (9.0 kB)
Collecting marshmallow<4.0.0,>=3.18.0 (from dataclasses-json<0.7,>=0.5.7->langchain_community)
  Downloading marshmallow-3.26.1-py3-none-any.whl.metadata (7.3 kB)
Collecting typing-inspect<1,>=0.4.0 (from dataclasses-json<0.7,>=0.5.7->langchain_community)
  Downloading typing_inspect-0.9.0-py3-none-any.whl.metadata (1.5 kB)
Collecting python-dotenv>=0.21.0 (from pydantic-settings<3.0.0,>=2.4.0->langchain_community)
  Downloading python_dotenv-1.0.1-py3-none-any.whl.metadata (23 kB

In [43]:
from langchain.chains import RetrievalQA
from langchain.chat_models import ChatOpenAI
from langchain.vectorstores import FAISS as LangchainFAISS


In [49]:
# Define a wrapper class for SentenceTransformer
class SentenceTransformerWrapper:
    def __init__(self, model):
        self.model = model
    def embed_documents(self, texts):
        return self.model.encode(texts).tolist()
    def embed_query(self, text):
        return self.model.encode([text])[0].tolist()

# Create an instance of the wrapper
embedding_wrapper = SentenceTransformerWrapper(model)

# Now create the vector store using the wrapper
from langchain.vectorstores import FAISS as LangchainFAISS
vector_store = LangchainFAISS.from_texts(documents, embedding=embedding_wrapper)

# Initialize the LLM (e.g., GPT-4)
from langchain.chat_models import ChatOpenAI
llm = ChatOpenAI(model="gpt-4")

# Create a RetrievalQA chain
from langchain.chains import RetrievalQA
qa_chain = RetrievalQA.from_chain_type(llm=llm, retriever=vector_store.as_retriever(), chain_type="stuff")

# Example query
query = "How can I improve my titration accuracy?"
final_response = qa_chain.run(query)
print("Final Response:", final_response)


  llm = ChatOpenAI(model="gpt-4")


ValidationError: 1 validation error for ChatOpenAI
  Value error, Did not find openai_api_key, please add an environment variable `OPENAI_API_KEY` which contains it, or pass `openai_api_key` as a named parameter. [type=value_error, input_value={'model_kwargs': {}, 'nam...ne, 'http_client': None}, input_type=dict]
    For further information visit https://errors.pydantic.dev/2.10/v/value_error