# 1. Information Retrieval Basics

In [2]:
import numpy as np
from collections import Counter
import pandas as pd

## Create Vocabulary

In [3]:
# all_docs=documents['document'].str.cat(sep=' ').split(' ')
# vocabulary = Counter(all_docs)

## Term Frequency

In [4]:
def get_tf(documents, vocabulary):
    """
    It creates the term-frequency matrix with rows the terms of the vocabulary and columns the number of documents.
    Each value of the matrix represents the frequency (normalized to document max frequecy) of a term (row) 
    in a document (column).
    Example:
    
    > INPUT:
    documents:
    Doc1: hello hello world
    Doc2: hello friend
    
    voc: 
    [hello, world, friend]
    
    > OUPUT:    
    [[1, 1],
    [0.5, 0],
    [0, 1]]
    
    :param documents: list of list of str, with the tokenized documents.
    :param vocabulary: dict with the vocabulary (computed in 1.1) and each term's frequency.
    :return: np.array with the document-term frequencies
    """
    document_term_freq = np.zeros(shape=(len(vocabulary), len(documents)))
    # --------------
    # YOUR CODE HERE
    for j, doc in enumerate(documents):
        counts = Counter(doc.split(' '))
        for i,term in enumerate(vocabulary):
            document_term_freq[i][j] = counts[term]
    # --------------
    
    return document_term_freq

## Inverse Document Frequency

In [5]:
def get_idf(documents, vocabulary):
    """
    It computes IDF scores, storing idf values in a dictionary.
    
    :param documents: list of list of str, with the document text.
    :param vocabulary: dict with the vocabulary (computed in 1.1) and each term's frequency.
    :return: dict with the terms as keys and values the idf for each term.
    """
    # --------------
    # YOUR CODE HERE
    idf = {}
    num_documents = len(documents)
    for i, term in enumerate(vocabulary):
        idf[term] = np.log(num_documents / np.sum([term in doc for doc in documents]))
    # --------------
    return idf

## Vectorization

In [6]:
def vectorize_tfidf(documents, vocabulary, tf, idf):
    """
    It takes the input text and vectorizes it based on the tf-idf formula.
    
    :param documents: list of list of str, with the document text.
    :param vocabulary: dict, with keys: the terms of all the documents and values: the number of times they appear in the corpus.
    :param idf: dict, with the terms as keys and values the idf for each term.
    :return: np.array, with the vectorized documents
    """
    # --------------
    # YOUR CODE HERE
    tfidf_vectors=np.zeros((len(documents),len(vocabulary)))

    for doc_id, document in enumerate(documents):
        unique_words = set(document.split(' '))
        max_count = np.max(tf[:,doc_id])
        for word_id, word in enumerate(vocabulary):
            if word in unique_words:
                tfidf_vectors[doc_id][word_id] = (tf[word_id,doc_id] / max_count) * idf[word]
    # --------------
    return np.array(tfidf_vectors)

def vectorize_query(query, vocabulary, idf):
    """
    It generates the vector for an input document (with normalization).
    
    :param document: list of str with the tokenized documents.
    :param vocabulary: list of str, with the unique tokens of the vocabulary.
    :param idf: dict with the idf values for each vocabulary word.
    :return: list of floats
    """
    vector = [0]*len(vocabulary)
    counts = Counter(query)
    max_count = counts.most_common(1)[0][1]
    word_count = Counter(query)
    for i,term in enumerate(vocabulary):
        vector[i] = (word_count[term] / max_count) * idf[term]
    return vector

In [90]:
documents = [['a','a', 'b', 'c'], ['a', 'b'], ['c']]

vocabulary = Counter([doc for document in documents for doc in document])

documents = ['a a b c', 'a b', 'c']
tf = get_tf(documents, vocabulary)
idf = get_idf(documents, vocabulary)

tfidf = vectorize_tfidf(documents, vocabulary, tf, idf)

query = 'a b'
vector = vectorize_query(query, vocabulary, idf)

In [94]:
for i in range(len(documents)):
    print(cosine_similarity(vector, tfidf[i]))

0.8660254037844388
1.0
0


## Rocchio's Relevance Feedback

In [7]:
def expand_query(relevant_doc_vecs, non_relevant_doc_vecs, query_vector, alpha, beta, gamma):
    """Expand the query using Rocchio algorithm.

    :param relevant_doc_vecs: np.array with the relevant document vectors
    :param non_relevant_doc_vecs: np.array with the non relevant document vectors
    :param query_vector: np.array with the original query vector
    :param alpha: float representing the alpha parameter
    :param beta: float representing the beta parameter
    :param gamma: float representing the gamma parameter
    :return: np.array with the expanded query vector
    """
    
    num_rel = len(relevant_doc_vecs)
    num_non_rel = len(non_relevant_doc_vecs)
    
    # Compute the first term in the Rocchio equation
    norm_query_vector = query_vector
    
    # Compute the second term in the Rocchio equation
    norm_sum_relevant = 1 / num_rel * np.sum(relevant_doc_vecs, axis=0)
    
    # Compute the last term in the Rocchio equation
    norm_sum_non_relevant = 1 / num_non_rel * np.sum(non_relevant_doc_vecs, axis=0)
    
    # Sum all the terms
    modified_query_vector = alpha * norm_query_vector + beta * norm_sum_relevant - gamma * norm_sum_non_relevant
    
    # Ignore negative elements
    # modified_query_vector = ...
    return modified_query_vector

# 4. Latent Semantic Indexing

In [8]:
def truncated_svd(term_doc_matrix, num_val):
    """Constructs the truncated SVD of a term-document matrix.
    
    :param term_doc_matrix: np.array with the term-document matrix
    :param num_val: int with the number of singular values to consider
    :return: tuple with the truncated SVD matrices
    """
    K, S, Dt = np.linalg.svd(term_doc_matrix, full_matrices=False)

    K_sel = K[:,0:num_val]

    S_sel = S[:num_val]
    S_sel = np.diag(S_sel)

    Dt_sel = Dt[0:num_val,:]

    return K_sel, S_sel, Dt_sel

def construct_query_vector(q, K_s, S_s):
    """Constructs the query vector using the truncated SVD matrices.

    :param q: np.array with the original query vector
    :param K_s: np.array with the left singular vectors
    :param S_s: np.array

    :return: np.array with the query vector
    """

    q_star = np.dot(q, np.dot(K_s, np.linalg.inv(S_s)))
    
    return q_star

Then, given q_star, compute the similarity with the document representation (given by the columns of Dt_sel)

# 5. Recommender Systems

In [9]:
from sklearn.metrics.pairwise import pairwise_distances, cosine_similarity
from tqdm import tqdm

def create_data_matrix(data,n_users,n_items):
    """This function should return a numpy matrix with a shape (n_users, n_items). 
        Each entry is the rating given by the user to the item
    
    Args:
        :data: pandas dataframe with the data
        :n_users: number of users
        :n_items: number of items

    Returns:
        :data_matrix: numpy matrix with shape (n_users, n_items)
    """
    data_matrix = np.zeros((n_users, n_items))
    
    for _, row in data.iterrows():
        user_id = row['user_id'] - 1
        item_id = row['item_id'] - 1
        rating = row['rating']

        data_matrix[user_id][item_id] = rating
    return data_matrix


# item_similarity = cosine_similarity(train_data_matrix.T)
# user_similarity = cosine_similarity(train_data_matrix)

def item_based_predict(ratings, similarity):
    """Compute the item-based prediction using the cosine similarity.
    
    Args:
        :ratings: numpy matrix with the ratings
        :similarity: numpy matrix with the item-item similarity
        
    Returns:
        :pred: numpy matrix with the predicted ratings
    """
    n_users, n_items = ratings.shape

    weighted_sum = np.dot(ratings, similarity.T)

    sim_sum = np.dot((ratings > 0), np.abs(similarity.T))

    pred = np.where(sim_sum > 0, weighted_sum / sim_sum, np.random.randint(1, 6, size=(n_users, n_items)))

    return pred

def user_based_predict(ratings, similarity):
    """Compute the user-based prediction using the cosine similarity.

    Args:
        :ratings: numpy matrix with the ratings
        :similarity: numpy matrix with the user-user similarity

    Returns:
        :pred: numpy matrix with the predicted ratings
    """
    n_users, n_items = ratings.shape  # Infer dimensions dynamically
    pred = np.zeros((n_users, n_items))

    # Compute user mean ratings, ignoring zero entries
    ratings_cpy = ratings.copy()
    ratings_cpy[ratings_cpy == 0] = np.nan
    usr_mean = np.nanmean(ratings_cpy, axis=1, keepdims=True)  # (n_users, 1)

    # Normalize ratings by subtracting user means
    norm_ratings = ratings_cpy - usr_mean
    norm_ratings = np.nan_to_num(norm_ratings)  # Replace NaNs with 0 for calculations

    for i in tqdm(range(n_items)):
        ratings_item_i = ratings[:, i]  # All users' ratings for item i
        user_rated_i = np.where(ratings_item_i > 0)[0]  # Users who rated item i

        if len(user_rated_i) == 0:
            pred[:, i] = usr_mean[:, 0]  # If no users rated, fallback to user mean
            continue

        # Get relevant similarity scores and normalized ratings
        sim_matrix = similarity[:, user_rated_i]  # (n_users, |user_rated_i|)
        ratings_vector = norm_ratings[user_rated_i, i]  # (|user_rated_i|,)

        # Compute prediction using matrix multiplication
        numerator = sim_matrix @ ratings_vector  # (n_users,)
        denominator = np.sum(np.abs(sim_matrix), axis=1)  # Sum of absolute similarities

        # Avoid division by zero
        mask = denominator > 0
        pred[:, i] = usr_mean[:, 0]  # Start with user mean
        pred[mask, i] += numerator[mask] / denominator[mask]  # Add weighted contribution

    return pred

# 6. Document Classification

## Knn Classifier

In [10]:
def knn(doc_vectors, query_vector, k=10):
    """ It finds the `k` nearest documents to the given query (based on euclidean distance).
    :param doc_vectors: An array of document vectors (np.array(np.array)).
    :param query_vector: Query representation (np.array)
    :return: List of document indices (list(int))
    """
    # --------------
    # YOUR CODE HERE
    scores = [[euclidean_distance(np.array(query_vector), np.array(doc_vectors[d])), d] for d in range(len(doc_vectors))]
    scores.sort(key=lambda x: x[0])
    top_k_docs = []
    for i in range(k):
        top_k_docs.append(scores[i][1])
    # --------------
    return top_k_docs

def knn_weighting_estimate(doc_vectors, doc_labels, query_vector, k=10):
    """ Weighting estimation for kNN classification

    Args:
        :param doc_vectors: Document vectors (np.array(np.array))
        :param doc_labels: Document labels/topics (list)
        :param query_vector: Query vector (np.array)
        :param k: Number of nearest neighbors to retrieve
    
    Returns:
        :return: A dictionary containing the estimation score for each label/topic (dict)
    """
    # --------------
    # YOUR CODE HERE
    query_vector = np.array(query_vector)
    scores = {}

    top_k_docs = knn(doc_vectors, query_vector, k)
    distances = np.array([cosine_similarity(query_vector, doc_vectors[doc]) for doc in top_k_docs])

    for i in range(min(doc_labels),max(doc_labels)+1):
        retrieved_labels = np.array([int(doc_labels[idx] == i) for idx in top_k_docs])

        scores[i] = np.dot(distances, retrieved_labels)
    # --------------
    return scores

def knn_probabilistic_estimate(doc_vectors, doc_labels, query_vector, k=10):
    """ Probabilistic estimation for kNN classification

    Args:
        :param doc_vectors: Document vectors (np.array(np.array))
        :param doc_labels: Document labels/topics (list)
        :param query_vector: Query vector (np.array)
        :param k: Number of nearest neighbors to retrieve
    Returns:
        :return: A dictionary containing the estimation score for each label/topic (dict)
    """
    # --------------
    # YOUR CODE HERE
    scores = {}
    top_k_docs = knn(doc_vectors, query_vector, k)

    for i in range(min(doc_labels),max(doc_labels)+1):
        scores[i] = np.sum(np.array([int(doc_labels[idx] == i) for idx in top_k_docs])) / k
    # --------------
    return scores


## Rocchio's Classifier

In [11]:
def rocchio_estimate(doc_vectors, doc_labels, query_vector):
    """ 
    Rocchio classification
    :param doc_vectors: Document vectors (np.array(np.array))
    :param doc_labels: Document labels/topics (list)
    :param query_vector: Query vector (np.array)
    
    :return: A dictionary containing the estimation score for each label/topic (dict)
    """
    # --------------
    # YOUR CODE HERE
    doc_labels = np.array(doc_labels).reshape(-1,1)
    scores = {}
    for i in range(min(doc_labels)[0],max(doc_labels)[0]+1):
        nb_docs_class = np.sum(doc_labels == i)
        centroid = np.sum((doc_vectors *(doc_labels == i)), axis=0) / nb_docs_class
        scores[i] = euclidean_distance(centroid, query_vector)
    # --------------
    return scores

## Naive Bayes Classifier

In [12]:
def navie_bayes_classifier(tf_dict, query, topics_probs):
    """ Naive Bayes classification
    
    Args:
        :param tf_dict: A dictionary, with keys the topics/labels and values a dictionary of word frequencies  
        :param query: Query vector
        :param topics_probs: Probaility distribution of each topic/label (dict)

    Returns:
        :return: A dictionary containing the log probability estimation for each topic
    """
    # --------------
    # YOUR CODE HERE
    log_probabilities = {t:0 for t in tf_dict}

    for topic in tf_dict:
        prob = 0
        for word in query:
            if word in tf_dict[topic]:
                p_wc = (tf_dict[topic][word] + 1) / (sum(tf_dict[t][word] for t in tf_dict) + 1)
                prob += np.log(p_wc)
        prob += np.log(topics_probs[topic])

        log_probabilities[topic] = prob
    # --------------
    return log_probabilities

# 7. Link Based Ranking

## PageRank

In [None]:
def pagerank_eigen(L, R=None):
    """
    Compute the PageRank vector using the eigenvalue method.
    
    Args:
        L: Link matrix
        R: Transition probability matrix

    Returns:
        R: Transition probability matrix
        p: PageRank vector
    """
    if R is None:
        # Construct transition probability matrix from L
        R = L / np.where(L.sum(axis=0) == 0, 1, L.sum(axis=0))
    
    # Compute eigenvalues and eigenvectors of R
    eigenvalues, eigenvectors = np.linalg.eig(R)
    
    # Take the eigenvector corresponding to the largest eigenvalue
    dominant_vector = eigenvectors[:, np.argmax(np.absolute(eigenvalues))]
    
    # Ensure the eigenvector is positive and normalize to sum to 1
    p = np.absolute(dominant_vector)
    p = p / p.sum()
    
    return R, p

def pagerank_iterative(L, R=None):
    """Compute the PageRank vector using the iterative method.
    
    Args:
        L: Link matrix
        R: Transition probability matrix
        
    Returns:
        R: Transition probability matrix
        p: PageRank vector
    """
    if R is None:  # We might want to compute R outside this function to avoid recomputing large matrix
        R = np.multiply(L, 1 / np.sum(L,axis=0))

    N = L.shape[0]
    e = np.ones(shape=(N,1))
    q = 0.9

    p = e
    delta = 1
    epsilon = 0.001
    i = 0
    while delta > epsilon:
        p_prev = p
        p = q * np.dot(R,p_prev)
        p = p + ((1-q) / N) * e
        delta = np.linalg.norm(x=(p-p_prev), ord=1)
        i += 1

    print("Converged after {0} iterations. Ranking vector: p={1}".format(i, p[:,0]))
    return R,p

def hits_iterative(A, k=10):
    """Compute the HITS score using the iterative method.

    Args:
        A: Adjacency matrix
        k: Number of iterations
    
    Returns:
        x: Authority vector
        y: Hub vector
    """
    N = A.shape[0]
    x0, y0 = 1 / (N**0.5) * np.ones(N), 1 / (N**0.5) * np.ones(N)
    xprev, yprev = x0, y0
    delta1 = delta2 = 1
    epsilon = 0.001 # We can strictly check for convergence rate of HITS algorithm
    l = 0
    
    while l < k and delta1 > epsilon and delta2 > epsilon:
        y = np.matmul(A, xprev)
        x = np.matmul(np.transpose(A), y) 
        x = x / np.linalg.norm(x,2)
        y = y / np.linalg.norm(y,2)
        delta1 = np.linalg.norm(x-xprev,1)
        delta2 = np.linalg.norm(y-yprev,1)
        xprev = x
        yprev = y
        l += 1
        
    return xprev, yprev

# 8. Graph Mining

In [54]:
import networkx as nx

def compute_modularity(G: nx.Graph) -> float:
    """Compute the modularity of a graph
    
    Args:
        * G (nx.Graph): the graph to analyze
        
    Returns:
        * float: the modularity of the graph
    """
    m = len(G.edges)
    Q = 0
    # your code here
    for i in range(m-1):
        for j in range(m-1):
            A_ij = G.number_of_edges(i,j)
            k_i = G.degree[i]
            k_j = G.degree[j]
            if G.nodes[i]['community'] == G.nodes[j]['community']:
                Q += (A_ij - (k_i * k_j) / (2*m))
    return Q/(2*m)

def compute_betweenness(G: nx.Graph) -> dict:
    """Compute the betweenness centrality of a graph.
    
    Args:
        * G (nx.Graph): the graph to analyze
        
    Returns:
        * dict: the betweenness centrality of each node
    """

    betweenness_centralities = {n: 0.0 for n in G.nodes()}
    nodes = list(G.nodes())

    for n in nodes:
        for i in range(len(nodes)):
            s = nodes[i]
            for j in range(i + 1, len(nodes)):  # Ensure i < j to avoid redundant pairs
                t = nodes[j]

                paths = list(nx.all_shortest_paths(G, source=s, target=t))  # Convert to list once
                num_paths = sum(1 for path in paths if n in path and n not in {s, t})  # Count only valid paths
                
                if paths:  # Avoid division by zero
                    betweenness_centralities[n] += num_paths / len(paths)

    return betweenness_centralities

# A. Others

## Cosine similarity

In [13]:
import math

def cosine_similarity(v1, v2):
    """
    It computes cosine similarity.
    
    :param v1: list of floats, with the vector of a document.
    :param v2: list of floats, with the vector of a document.
    :return: float
    """
    sumxx, sumxy, sumyy = 0, 0, 0
    for i in range(len(v1)):
        x = v1[i]; y = v2[i]
        sumxx += x*x
        sumyy += y*y
        sumxy += x*y
    if sumxy == 0:
        sim = 0
    else:
        sim = sumxy/math.sqrt(sumxx*sumyy)
    return sim

## Euclidean Distance

In [14]:
def euclidean_distance(v1, v2):
    """ It computes the euclidean distance between to vectors.
    :param v1: First vector (numpy array).
    :param v2: Second vector (numpy array).
    :return: Euclidean distance (float)
    """
    return np.linalg.norm(v1 - v2)

## Precision, Recall, Map

In [15]:
def compute_recall_at_k(predict, gt, k):
    """
    It computes the recall score at a defined set of retrieved documents.
    
    :param predict: list of predictions
    :param gt: list of actual data
    :param k: int
    """
    return len(set(predict[:k]).intersection(set(gt))) / len(gt)

def compute_precision_at_k(predict, gt, k):
    """
    It computes the precision score at a defined set of retrieved documents.
    
    :param predict: list of predictions
    :param gt: list of actual data
    :param k: int
    """
    return len(set(predict[:k]).intersection(set(gt))) / k

def compute_map(queries, K = 10):
    """
    It computes mean average precision (MAP) for a set of queries.
    
    :param queries: list of str
    :param K: int
    """
    map_score = 0
    for i, query in enumerate(queries):
        precision_for_query = 0
        predict = search_vec(query, K)
        gt = search_vec_sklearn(query, features)
        precisions = []
        recalls = []
        for k in range(1, K+1):
            precisions.append(compute_precision_at_k(predict, gt, k))
            recalls.append(compute_recall_at_k(predict, gt, k))
        interpolate = [max(precisions[i:]) for i in range(len(precisions))]
        retrieved_idx = np.array([pred in gt for pred in predict])

        interpolate = interpolate * retrieved_idx

        map_score += 1 / len(gt) * np.sum(interpolate)
    map_score = 1 / len(queries) * map_score
    return map_score

## DCG

In [16]:
def dcg(k, scores):
    """Compute DCG@k for a given list of scores.
    
    :param k: int
    :param scores: list of floats
    :return: float
    """
    dcg_val = 0
    for i in range(1, k):
        dcg_val += scores[i] / math.log2(i+1)
    return dcg_val

## RMSE

In [17]:
def rmse(prediction, ground_truth):
    """ Compute the Root Mean Squared Error of the prediction.
    Args:
        :param prediction: np.array with the predicted values
        :param ground_truth: np.array
    
    Returns:
        :return: float
    """
    mask = ground_truth > 0
    N = np.sum(mask)

    prediction = prediction * mask

    squares = np.sqrt(1/N * np.sum((prediction - ground_truth)**2))
    return squares

## Pearson correlation

In [20]:
def pearson_correlation(vec1, vec2):
    """ Compute the Pearson correlation between two vectors.
    Args:
        :param vec1: np.array with the first vector
        :param vec2: np.array with the second vector
    
    Returns:
        :return: float
    """
    mean1 = np.mean(vec1)
    mean2 = np.mean(vec2)

    num = np.sum((vec1 - mean1) * (vec2 - mean2))
    den = np.sqrt(np.sum((vec1 - mean1)**2) * np.sum((vec2 - mean2)**2))

    return num / den

vec1 = [1,3]
vec2 = [1,4]

print(pearson_correlation(vec1, vec2))

1.0


## Calculate the number max of nodes

In [19]:
n_nodes = 10
n_edges_max = int(n_nodes * (n_nodes-1) / 2)
print(n_edges_max)

45
