### Database Intialization

##### Mongo Initialization

In [1]:
# DONE
from pymongo import MongoClient
from pymongo.errors import BulkWriteError, DuplicateKeyError

connection_string = "<Mongo Connection URL>"
client = MongoClient(connection_string)

db = client['bigData']

# article Collection
collection = db['articles']
collection.create_index("uri", unique=True)
"""
Schema:
'_id' : '_id'
"date": datetime_obj,
"url": "url",
"title": "title",
"summary": "summary",
"keywords": "concepts",
"uri" : 'uri',
"source" : 'source_location',
"embedding" : 'embeddings'
"""



'\nSchema:\n\'_id\' : \'_id\'\n"date": datetime_obj,\n"url": "url",\n"title": "title",\n"summary": "summary",\n"keywords": "concepts",\n"uri" : \'uri\',\n"source" : \'source_location\',\n"embedding" : \'embeddings\'\n'

In [2]:
# User Collection
# DONE
user_collection = db['user']
user_collection.create_index("email", unique=True)
"""
Schema:
user_id : _id
email : email
userName : userName String
userSelectedPreferences : {
keyword:
score : default 0
}
hiddenPreferece : {
keyword:
score: default 0
}
userHistory : [
article_id:
feedback_score : 
date : 
]
userVecote? : Vector Embedding 

"""

new_user = {
    'email' : 'kinjalk.00@gmail.com',
    'userName' : "Kinjalk Srivastava",
    'userSelectedPreferences' : [{
        'keyword' : "New York City",
        'score' : 0
    },{
        'keyword' : "Google",
        'score' : 0
    } ],
    'hiddenPreferences' : [{
        'keyword' : "Football",
        'score' : 0
    }, {
        'keyword' : "Miami",
        'score' : 0
    },{
        'keyword' : "SanFransisco",
        'score' : 0
    } ],
    'userHistory' : [],
    'userVector' : []
}

# try:
#     result = user_collection.insert_one(new_user)
# except DuplicateKeyError as dke:
#     email = dke.details['keyValue']['email']
#     print(f'Email Already present: {email}')


In [3]:
# Keyword Collection
# DONE
keywords_collection = db['keywords']
keywords_collection.create_index("keyword", unique=True)
"""
Schema:
_id : _id
keyword: keyword
last_24_hours : [{
date : date
score : float / default 0
} * 25] => array index 0 to 24 
"""

'\nSchema:\n_id : _id\nkeyword: keyword\nlast_24_hours : [{\ndate : date\nscore : float / default 0\n} * 25] => array index 0 to 24 \n'

In [4]:
# This code got all the keywords from articles and inserted into Keywords

# pipeline = [
#     {"$unwind": "$keywords"},  # Unwind the keywords array
#     {"$group": {"_id": None, "allKeywords": {"$addToSet": "$keywords"}}},  # Group and get unique keywords
#     {"$project": {"_id": 0, "allKeywords": 1}}  # Exclude the _id field
# ]

# # Execute the aggregation query
# result = list(collection.aggregate(pipeline))

# # Extract keywords from the result
# if result:
#     result = result[0]["allKeywords"]

# from datetime import datetime
# today = datetime.combine(datetime.now().date(), datetime.min.time())

# documents = []
# for keyword in result:
#     document = {
#         "keyword": keyword,
#         "last_24_hours": [{"date": today, "score": 0} for _ in range(25)] 
#     }
#     documents.append(document)

# # Insert the documents into MongoDB
# insert_result = keywords_collection.insert_many(documents)

#### Qdrant Intialization

In [5]:
# DONE
from qdrant_client import QdrantClient
import uuid
qdrant_client = QdrantClient(
    url="Qdrant Connection URL", 
    api_key="<Qdrant API Key>",
)

print(qdrant_client.get_collections())

collections=[CollectionDescription(name='articles_collection'), CollectionDescription(name='sparse_charts'), CollectionDescription(name='bigData_collection')]


In [244]:
# This code creates or updates the collection in Qdrant

# from qdrant_client.http.models import Distance, VectorParams

# if not qdrant_client.collection_exists("bigData_collection"):
#     qdrant_client.create_collection(
#             collection_name="bigData_collection",
#             vectors_config=VectorParams(size=1024, distance=Distance.COSINE),
#     )

### API

In [6]:
# DONE
from eventregistry import EventRegistry, QueryArticlesIter, ReturnInfo, ArticleInfoFlags, SourceInfoFlags, QueryEventsIter, QueryItems
er = EventRegistry(apiKey = '<API Key>') # => NewAPI Key

In [60]:
# New Events Streaming
"""
Patern:
This Query should Run to get concepts on which we can further fetch articles from.
This Quey will run at a given interval.
"""
# country  = er.getLocationUri('New_York')

q = QueryEventsIter(
    sourceLocationUri = "http://en.wikipedia.org/wiki/United_States", # => Get Events in Unisted States
    lang = 'eng',
)

# q.setRequestedResult(RequestEventsConceptAggr(conceptCount = 10, 
#     returnInfo= ReturnInfo(conceptInfo= Conc)))
events_result = []
for events in q.execQuery(er, sortBy= 'date', sortByAsc= False, maxItems=20): # => Latest / New Events
    events_result.append(events)

for events in q.execQuery(er, sortBy='socialScore', sortByAsc= False, maxItems=10): # -> Events which are sorted by a Popularity Score
    events_result.append(events)


In [73]:
# Cleanining the recived Data

concepts = set()
for event in events_result:
    for concept in event['concepts']:
        if concept['score'] > 50:
            concepts.add(concept['label']['eng'])

concepts = list(concepts)

['Tariff', 'Same-sex marriage', 'Pardon', 'NBC', 'Saint', 'Corey Anderson (fighter)', 'Ultimate Fighting Championship', 'Same-sex relationship', 'Newsmax', 'Thailand', 'Florida Attorney General', 'New Jersey', 'Bangkok', 'France', 'Chicago P.D. (TV series)', 'Ukraine', 'Germany', 'Pam Bondi', 'Hunter Biden', 'Test cricket', 'Hammerstein Ballroom', 'Heavyweight', 'Russia', 'Donald Trump', 'South Korea', 'Bishop', 'Federal Communications Commission', 'Boxing', 'Volodymyr Zelenskyy', 'Mohammed Amer', 'Chicago (franchise)', 'City of Greater Geelong', 'Michael Waltz', 'National Security Advisor (United States)', 'Bashar al-Assad', 'Venezuela', 'Australia Day', 'Cryptocurrency', 'Chicago Med', 'United States Congress', 'United States Attorney General', 'Canada', 'Abortion', 'Catholic Church', 'Puri', 'Yogi Adityanath', 'The Weeknd', 'Refugee', 'Gensler', 'Rose Bowl (stadium)', 'Game Changer Wrestling', 'Syria', 'U.S. Securities and Exchange Commission', 'Justin Trudeau', 'Houston', 'Michel B

In [23]:
# Concept Based API HIT



"""
Find News Articles Based on Concepts we ge from user from Event Query Hit
"""
max_concept = 15

result = [] # -> Final Answer
result_uri = set()

# This chunks all the concepts we want to search for in a batch of 15
# Required by the API
# We can pass have a batch of 1-15 Concepts at a Time

concepts = ["Ceasefire"]

for idxs in range(0 , len(concepts), max_concept): 
    
    chunk = [f'http://en.wikipedia.org/wiki/{concepts[i].replace(" ", "_")}' for i in range(idxs, min(len(concepts),idxs + max_concept))]
    print(chunk)

    # [f'http://en.wikipedia.org/wiki/{concept}']
    
    q = QueryArticlesIter(
        conceptUri= QueryItems.OR(chunk),
        sourceLocationUri = "http://en.wikipedia.org/wiki/United_States",
        ignoreSourceGroupUri="paywall/paywalled_sources",
        lang = 'eng',
        isDuplicateFilter = 'skipDuplicates',
        dataType = ["news", "pr", "blog"]
    )

    for article in q.execQuery(er, sortBy="date", sortByAsc=False, 
        returnInfo = ReturnInfo(
            articleInfo = ArticleInfoFlags(concepts = True, categories = True, location=True, eventUri=True, socialScore=True), 
            sourceInfo= SourceInfoFlags(location=True)),
        maxItems= 1):
        if article['uri'] not in result_uri:
            result.append(article)
            result_uri.add(article['uri'])

['http://en.wikipedia.org/wiki/Ceasefire']


In [27]:
# Keyword Based API HIT

q = QueryArticlesIter(
    keywords = "Putin",
    sourceLocationUri = "http://en.wikipedia.org/wiki/United_States",
    ignoreSourceGroupUri="paywall/paywalled_sources",
    lang = 'eng',
    isDuplicateFilter = 'skipDuplicates',
    dataType = ["news", "pr", "blog"]
)

result_2 = []
for article in q.execQuery(er, sortBy="date", sortByAsc=False, 
    returnInfo = ReturnInfo(
        articleInfo = ArticleInfoFlags(concepts = True, categories = True, location=True, storyUri=True), 
        sourceInfo= SourceInfoFlags(location=True)),
    maxItems=1):
    result_2.append(article)

[{'uri': '8460737739',
  'lang': 'eng',
  'isDuplicate': False,
  'date': '2024-12-16',
  'time': '23:31:58',
  'dateTime': '2024-12-16T23:31:58Z',
  'dateTimePub': '2024-12-16T23:30:35Z',
  'dataType': 'news',
  'sim': 0.6666666865348816,
  'url': 'https://lacrossetribune.com/news/nation-world/government-politics/trump-adams-polio-vaccine/article_f9d87f6c-cf32-530a-bd53-ec7eefc14687.html',
  'title': 'Trump weighs in on NY mayor, vaccines and drones in news conference',
  'source': {'uri': 'lacrossetribune.com',
   'dataType': 'news',
   'title': 'La Crosse Tribune',
   'location': {'type': 'place',
    'label': {'eng': 'La Crosse, Wisconsin'},
    'country': {'type': 'country', 'label': {'eng': 'United States'}}},
   'locationValidated': False},
  'authors': [{'uri': 'colleen_long@lacrossetribune.com',
    'name': 'Colleen Long',
    'type': 'author',
    'isAgency': False}],
  'concepts': [{'uri': 'http://en.wikipedia.org/wiki/Presidency_of_Joe_Biden',
    'type': 'wiki',
    'score

### LLM 

#### Tranform into Dataset

In [32]:
# DONE
from datasets import Dataset
from datetime import datetime
data = {
    'article' : [],
    'date' : [],
    'url' : [],
    'title': [],
    'source_location' : [],
    'concepts' : [],
    'uri' : []
}
for at in result:
    if collection.find_one({'uri' : at['uri']}) is None:
        data['article'].append(at['body'])
        data['date'].append(datetime.strptime(at['date'], "%Y-%m-%d").date())
        data['url'].append(at['url'])
        data['title'].append(at['title'])
        data['uri'].append(at['uri'])
        if at['source']['location']['type'] == 'place':
            data['source_location'].append(at['source']['location']['country']['label']['eng'])
        else:
            data['source_location'].append(at['source']['location']['label']['eng'])
        
        concepts = []
        for concept in at['concepts']:
            if concept['score'] >= 4:
                concepts.append(concept['label']['eng'])
        if len(concepts) < 1:
            for concept in at['concepts']:
                if concept['score'] >= 3:
                    concepts.append(concept['label']['eng'])           
        data['concepts'].append(concepts)
    
dataset = Dataset.from_dict(data)

#### Summary / Embedding Model

In [33]:
# DONE
from transformers import pipeline, BartTokenizer, BartModel
import torch
from transformers import logging
device = 0 if torch.cuda.is_available() else -1
logging.set_verbosity_error()
summarizer = pipeline("summarization", model="facebook/bart-large-cnn", device='cuda')
tokenizer = BartTokenizer.from_pretrained("facebook/bart-large-cnn")
model = BartModel.from_pretrained('facebook/bart-large').to("cuda")

def split_text_into_chunks(text, max_tokens):
    tokens = tokenizer(text, return_tensors="pt", padding= True)
    num_tokens = len(tokens['input_ids'][0])
    
    chunks = []
    for i in range(0, num_tokens, max_tokens):
        chunk = tokenizer.decode(tokens['input_ids'][0][i:i+max_tokens], skip_special_tokens=True)
        chunks.append(chunk)
    return chunks

def summarize_article(article):
    chunks = split_text_into_chunks(article, 728)
    summaries = []
    for chunk in chunks:
        summary = summarizer(chunk, max_length= 150, do_sample=False)
        summaries.append(summary[0]['summary_text'])

    final_summary = " ".join(summaries)

    return final_summary

def summary_tokenizer(summary):
    inputs = tokenizer(summary, return_tensors='pt', truncation=True, padding=True, max_length=128)
    inputs = {key: value.to('cuda') for key, value in inputs.items()}
    outputs = model(**inputs)
    embeddings = outputs.last_hidden_state
    vector = embeddings.mean(dim=1).detach().cpu().numpy()
    return vector

def batch_summarize(batch):
    summaries = [summarize_article(article) for article in batch['article']]
    return {'summary': summaries}

def batch_embedding(batch):
    embeddings = [summary_tokenizer(summary) for summary in batch['summary']]
    return {'embeddings' : embeddings}

print("Generating Summary...")
dataset = dataset.map(batch_summarize, batched= True, batch_size= 10)
print('Generated Summary, Generating Embeddings...')
dataset = dataset.map(batch_embedding, batched= True, batch_size= 10)

Generating Summary...


Map: 100%|██████████| 1/1 [00:32<00:00, 32.22s/ examples]


Generated Summary, Generating Embeddings...


Map: 100%|██████████| 1/1 [00:02<00:00,  2.42s/ examples]


#### Inserting into Both Database

In [119]:
# inserting into MongoDB
# DONE
documents = []
keywords = []
today = datetime.combine(datetime.now().date(), datetime.min.time())
for row in dataset:
    datetime_obj = datetime.combine(row['date'], datetime.min.time())
    document = {
        "date": datetime_obj,
        "url": row["url"],
        "title": row["title"],
        "summary": row["summary"],
        "keywords": row["concepts"],
        "uri" : row['uri'],
        "source" : row['source_location'],
        "embedding" : row['embeddings']
    }
    for key in row["concepts"]:
        keywords.append({
            "keyword" : key,
            "last_24_hours" : [{"date": today, "score": 0}] * 25
        })
    documents.append(document)

# inserting into Article Collection
try:
    inserted_documents = collection.insert_many(documents, ordered=False)
    inserted_ids = inserted_documents.inserted_ids
    query = {'_id': {'$in': inserted_ids}}
    projection = {'_id': 1, 'embedding': 1, 'date' : 1}   
    retrieved_documents = collection.find(query, projection)
    
except BulkWriteError as bwe:
    print(f'Documents Already present: {bwe.details}')

# inserting into Keywords Collection
try:
    keywords_collection.insert_many(keywords, ordered=False)
except BulkWriteError as bwe:
    print(f'Documents Already present')

print(f"{len(documents)} documents inserted into MongoDB.")

490 documents inserted into MongoDB.


In [120]:
# inserting into Qdrant-DB

from qdrant_client.http.models import PointStruct
data_points = [PointStruct(id= str(uuid.uuid4()), vector= value['embedding'][0], payload= { "_id" : str(value['_id']), "date" : int(value['date'].timestamp())}) for value in retrieved_documents]

operation_info = qdrant_client.upsert(
    collection_name="bigData_collection",
    wait=True,
    points= data_points
)

In [73]:
from bson import ObjectId
result = collection.find_one({"_id" : ObjectId('675cab521a218adf94f0ca95')})
print(result)


{'_id': ObjectId('675cab521a218adf94f0ca95'), 'date': datetime.datetime(2024, 12, 13, 0, 0), 'url': 'https://natlawreview.com/article/december-2024-legal-news-law-firm-news-and-mergers-industry-awards-and-recognition', 'title': 'December 2024 Legal News: Law Firm News and Mergers, Industry Awards and Recognition, DEI and Women in Law', 'summary': "Ogletree, Deakins, Smoak & Stewart, P.C. announced that they are opening the firm's 58th office in Baltimore. Ward and Smith attorneys Grant Osborne and Ken Gray are now recognized by the North Carolina State Bar Board of Legal Specialization as specialists in Employment Law. Ronald A. Christaldi, a partner at Shumaker, Loop & Kendrick, LLP, was awarded the 2024 H.L. Culbreath Jr. Profile in Leadership Award by the Tampa Bay Chamber. Catherine D. Little was named by Hart Energy's Oil & Gas Investor Magazine as one of", 'keywords': ['Practice of law', 'Law firm', 'Latin honors', 'Labour law', 'Insurance', 'Baltimore'], 'uri': 'b-8456647382', '

### Recommendation

In [7]:
final_candidate_list = []

#### 1: Keyword Based Recommendation

In [377]:
# DONE
from bson import ObjectId
user_id = ObjectId('675b80f0b49e0719e0ac4be5')

pipeline = [
    { 
        '$match': { '_id': user_id }
    },
    { 
        '$project': {
            'userSelectedPreferences': {
                '$slice': [{ '$sortArray': { 'input': '$userSelectedPreferences', 'sortBy': { 'score': -1 } } }, 10]
            },
            'hiddenPreferences': {
                '$slice': [{ '$sortArray': { 'input': '$hiddenPreferences', 'sortBy': { 'score': -1 } } }, 10]
            }
        }
    }
]

# Execute the aggregation query
result = user_collection.aggregate(pipeline)
top_user_keyword_list = []
for doc in result:
    top_user_keyword_list.extend([idx['keyword'] for idx in doc['userSelectedPreferences']])
    top_user_keyword_list.extend([idx['keyword'] for idx in doc['hiddenPreferences']])

print("Top User Preferences:", top_user_keyword_list)



Top User Preferences: ['NYU', 'New York City', 'Google', 'Basketball', 'LA', 'Amazon']


In [378]:
mongo_query = {
    'keywords': {'$in': top_user_keyword_list}
}

mongo_get_articles = collection.find(mongo_query).sort('date', -1).limit(30)

final_candidate_list.extend([article['_id'] for article in mongo_get_articles])


#### 2. Vector Database Recommendation

In [13]:
# DONE
from qdrant_client.models import Filter, FieldCondition, Range
from datetime import datetime, timedelta
from bson import ObjectId
user_id = ObjectId('675b80f0b49e0719e0ac4be5')

# TODO : Change this query to get history based on weighted (recency_score + feedback score)
user_data = user_collection.find(
    {"_id": user_id},
    {"userHistory": {"$slice": -10}, "_id": 0}
)

user = next(iter(user_data))['userHistory']

user_history = [article['article_id'] for article in user]
recommendations = []
date_minus_4 = datetime.now() - timedelta(days=4)
unix_timestamp_minus_4 = int(date_minus_4.timestamp())
# hyper-paramter => get all articles with score less than 0.8
if len(user_history) > 0:
    articles_data = collection.find({"_id" : { "$in" : user_history}}, {"embedding" : 1, "_id" : 1})

    for article in articles_data: 
        embedding = article['embedding'][0]
        search_results = qdrant_client.search(
            collection_name="bigData_collection",
            query_vector=embedding,
            limit=10,
            query_filter = Filter(
                must = [
                    FieldCondition(
                        key = "date",
                        range = Range(
                            gte = unix_timestamp_minus_4
                        )
                    )
                ]
            )
        )
        recommendations.extend([(result.payload['_id'], result.score) for result in search_results if result.score <= 0.82])

    recommendations = sorted(recommendations, key = lambda x: x[1])
    articles = [ObjectId(article) for article, _ in recommendations]
    final_candidate_list.extend(articles[:30])

[ObjectId('675ac0b610a4574847b8f347'), ObjectId('675ac0b610a4574847b8f346'), ObjectId('675ac0b610a4574847b8f345'), ObjectId('675ac0b610a4574847b8f344'), ObjectId('675a48d7b3e0fc35c8b26611'), ObjectId('675a48d7b3e0fc35c8b265f2')]


#### 3. User Vector Recommendation

In [380]:
# update user-vector and add to history
# given: article_id, user_id, feedback
# DONE
import numpy as np

def update_user_embeddings(user_embeddings, article_embeddings, feedback_score, alpha = 0.1):
    adjusted_article_embedding = article_embeddings * feedback_score
    updated_user_embeddings = ((1 - alpha) * user_embeddings) + (alpha * adjusted_article_embedding)
    return updated_user_embeddings.tolist()

def update_user(user_id, article_id, feedback):
    user = next(iter(user_collection.find({'_id' : ObjectId(user_id)})))

    # this can change into cummulative update if kafka allows it
    article = next(iter(collection.find({'_id' : ObjectId(article_id)})))

    user_collection.update_one(
        {'_id': ObjectId(user_id)},
        {
            '$set': {'userVector': update_user_embeddings(np.array(user['userVector']), np.array(article['embedding']), feedback)},
            '$push': {'userHistory': {
                'article_id': article['_id'],
                'feedback_score': feedback, # => got the score from below 
                'date' : datetime.now() 
            }}
        }
    )

user_id = '675b80f0b49e0719e0ac4be5'
article_id = '675a48d7b3e0fc35c8b265f2'
feedback = 5

# update_user(user_id, article_id, feedback)
# DONE
# Get Recommendations
def user_vector_recommendation(user_id, skip = 0):
    user = next(iter(user_collection.find({'_id' : ObjectId(user_id)})))
    search_results = qdrant_client.search(
            collection_name="bigData_collection",
            query_vector= user['userVector'][0],
            limit= 20 + skip
        )
    return search_results

recommendations = [ObjectId(article.payload['_id']) for article in user_vector_recommendation(user_id)][:30]

final_candidate_list.extend(recommendations)


#### 4. Recently Popular Keywords  Recommendation (1-Day)

In [381]:
from datetime import datetime, timedelta
# DONE
today = datetime.combine(datetime.now().date(), datetime.min.time())
yesterday = today - timedelta(days=1)

pipeline = [
    {
        "$unwind": "$last_24_hours"
    },
    {
        "$match": {
            "last_24_hours.date": {"$gte": yesterday}
        }
    },
    {
        "$group": {
            "_id": "$keyword",
            "total_score": {"$sum": "$last_24_hours.score"}
        }
    },
    {
        "$sort": {"total_score": -1}
    },
    {   
        "$limit": 10
    }
]
recommendation_keywords = keywords_collection.aggregate(pipeline)
recommendation_keywords = [keyword['_id'] for keyword in recommendation_keywords]

mongo_query = {
    'keywords': {'$in': recommendation_keywords}
}

mongo_get_articles = collection.find(mongo_query).sort('date', -1).limit(30)
articles = [article['_id'] for article in mongo_get_articles]

final_candidate_list.extend(articles)


#### Final List

In [382]:
print(len(final_candidate_list))
final_candidate_list = list(set(final_candidate_list))
print(len(final_candidate_list))
print(final_candidate_list)

70
55
[ObjectId('675cab521a218adf94f0ca22'), ObjectId('675a48d7b3e0fc35c8b265f6'), ObjectId('675a48d7b3e0fc35c8b265ec'), ObjectId('675cab521a218adf94f0c94f'), ObjectId('675cab521a218adf94f0ca96'), ObjectId('675cab521a218adf94f0c95e'), ObjectId('675ac0b610a4574847b8f343'), ObjectId('675cab521a218adf94f0c98f'), ObjectId('675cab521a218adf94f0ca1d'), ObjectId('675cab521a218adf94f0c8ff'), ObjectId('675cab521a218adf94f0cab2'), ObjectId('675cab521a218adf94f0c977'), ObjectId('675cab521a218adf94f0ca2c'), ObjectId('675a48d7b3e0fc35c8b265fb'), ObjectId('675a48d7b3e0fc35c8b26616'), ObjectId('675a48d7b3e0fc35c8b26612'), ObjectId('675cab521a218adf94f0ca68'), ObjectId('675cab521a218adf94f0cab4'), ObjectId('675cab521a218adf94f0ca3e'), ObjectId('675cab521a218adf94f0ca1f'), ObjectId('675cab521a218adf94f0c989'), ObjectId('675cab521a218adf94f0cab9'), ObjectId('675cab521a218adf94f0ca66'), ObjectId('675cab521a218adf94f0ca1e'), ObjectId('675cab521a218adf94f0c996'), ObjectId('675ac0b610a4574847b8f341'), Objec

### Metrices

#### 1. Recency Scores
based on article release time

In [61]:
from datetime import datetime
from bson import ObjectId
import math

def logistic_decay(days_passed, midpoint = 7, steepness = 0.2):
    return 1 / (1 + math.exp(steepness * (days_passed - midpoint)))

def exponential_decay(days_passed, decay_rate = 0.15):
    return math.exp(-decay_rate * days_passed)

def recency_score(date, decay = 'logistic'):
    current_date = datetime.now()
    time_difference = current_date - date
    days_passed = time_difference.days
    if decay == 'logistic':
        return logistic_decay(days_passed)
    else:
        return exponential_decay(days_passed)

def python_receny_function():
    user = user_collection.find({}).limit(1)
    user = next(iter(user))['userHistory']

    scores = [recency_score(article['date'], 'exponential') for article in user]
    print(scores)

# python_receny_function()

# pipeline for exponential decay
# added feedback score to get the more accurate scores
# DONE
def find_recency_score_for_user(user_id, decay_rate = -0.15, limit = 10):
    today = datetime.now()
    pipeline_exponential_deacy = [{
            "$match": {
                "_id": user_id  # Replace 'user_id' with the specific user's ID or condition
            }
    },
    {
        "$addFields" : {
            "newField" : {
                "$map" : {
                    "input" : "$userHistory",
                    "as" : 'hist',
                    "in" : {
                        "$let" : {
                            "vars" : {
                                # Recency Score Starts here
                                "recency_score" : {
                                    "$multiply" : [
                                        '$$hist.feedback_score',
                                        {
                                            "$exp" : {
                                                "$multiply" : [
                                                    decay_rate,
                                                    {
                                                        "$dateDiff" : {
                                                            "startDate" : "$$hist.date",
                                                            "endDate" : today,
                                                            "unit" : "day"
                                                        }
                                                    }
                                                ]
                                            }
                                        }
                                    ]
                                }
                                # ends here
                            },
                            "in" : {
                                'article_id' : "$$hist.article_id",
                                'score' : '$$recency_score'
                            }
                        }
                    }
                }
            }
        }
    },
    {
        "$set": {
            "newField": {
                "$slice" : [
                    {
                        "$sortArray": {
                        "input": "$newField",  # The array you want to sort
                        "sortBy": { "score": -1 },  # Sort by 'score' in descending order

                        }
                    },
                    limit # limit the output
                ]
                
            }
        }
    },
    {
        "$project": {
            "newField": 1,  # Include only the 'newField'
            "_id": 0  # Exclude the '_id' field
        }
    }]

    result = user_collection.aggregate(pipeline_exponential_deacy)
    result = next(iter(result))['newField']
    return  result


# recency score for article funuling => only dependent on time
# use it in re_rank function to get newer articles while final filtering
# DONE
def find_recency_score_for_articles(articles, decay_rate = -0.15):
    today = datetime.now()
    pipeline_exponential_deacy = [
        {
            "$match" : {
                "_id" : {"$in" : articles}
            }
        },
        {
            "$addFields" : {
                "recency_score" : {
                    "$exp" : {
                        "$multiply" : [
                            decay_rate,
                            {
                                "$dateDiff" : {
                                    "startDate" : "$date",
                                    "endDate" : today,
                                    "unit" : "day"
                                }
                            }
                        ]
                    }
                }
            }
        },{
            "$project": {
                "_id": 1,
                "recency_score": 1
            }
        }
    ]
    result = collection.aggregate(pipeline_exponential_deacy)
    # for doc in result:
    #     print(doc['newField'])
    # result = next(iter(result))['recency_score']
    
    return  result

user_id = ObjectId('675b80f0b49e0719e0ac4be5')
articles = [i['article_id'] for i in find_recency_score_for_user(user_id)]
print(articles)

articles = [
    ObjectId('675ac0b610a4574847b8f347'),
    ObjectId('675ac0b610a4574847b8f346')
]
for i in find_recency_score_for_articles(articles):
    print(i['_id'], i['recency_score'])

675ac0b610a4574847b8f346 0.6376281516217733
675ac0b610a4574847b8f347 0.6376281516217733


#### 2. Per User Keyword Score
based Feedback-score

In [234]:
from bson import ObjectId

userId = ObjectId('675baa4fb49e0719e0ac4bf3')
articles = [
    ObjectId('675ac0b610a4574847b8f347'),
    ObjectId('675ac0b610a4574847b8f346')
]

feedback = [
    -1,
    0.1
]

# get article keywords
def getKeywords(articles = [], feedback = []):
    articles = collection.find({
        '_id' : {
            "$in": articles
        } 
    },{'_id' : 0, 'keywords' : 1})

    keywords = []
    keyword_feedback = []
    for idx, keyword in enumerate(articles):
        keywords.extend(keyword['keywords'])
        keyword_feedback.extend([feedback[idx]] * len(keyword['keywords']))

    return (keywords, keyword_feedback)

# add new keywords
def update_user_hidden_keywords(user_id, keywords):
  
  user_collection.update_one(
    { "_id": user_id },  # Match the user document by their userId
    [
      {
        "$set": {
          'hiddenPreferences': {
            "$concatArrays": [
              "$hiddenPreferences",  # Existing user preferences
              {
                "$filter": {
                  "input": keywords,
                  'as': "newKeyword",
                  "cond": {
                    "$and" : [
                        {
                          "$not": {
                            "$in": [
                              "$$newKeyword",  # Keyword to check
                              "$hiddenPreferences.keyword"  # Existing keywords in userSelectedPreferences
                            ]
                          }
                        },{
                          "$not" : {
                              "$in" : [
                                  "$$newKeyword",
                                  "$userSelectedPreferences.keyword"
                              ]
                          }
                        }
                    ]
                  }
                }
              }
            ]
          }
        }
      },
      {
        "$set": {
          'hiddenPreferences': {
            "$map": {
              "input": "$hiddenPreferences",
              "as": "item",
              "in": {
                "$cond": {
                  "if": { 
                    "$eq": [{ "$type": "$$item" }, "object"]  # Check if it's already an object
                  },
                  "then": "$$item",  # Keep the existing object as is
                  "else": { 
                    "keyword": "$$item",  # For strings, create the object
                    "score": 0.5
                  }
                }
              }
            }
          }
        }
      }
    ]
  )

  user_collection.update_one(
  { "_id": userId },  # Match the user document by their userId
  [
    {
      "$set": {
        "hiddenPreferences": {
          "$filter": {
            "input": "$hiddenPreferences",  # Existing array
            "as": "item",
            "cond": { 
              "$gte": ["$$item.score", 0.3]  # Filter out objects where score < 0.3
            }
          }
        }
      }
    }
  ]
  )

# update keywords score
def update_score(prev_score, update, alpha = 10**-3):
    new_score = prev_score + alpha * update
    normalized_score  = min(1, new_score)
    return normalized_score

def update_keyword_scores(user_id, articles, feedback):
    
    article_keywords, keyword_score = getKeywords(articles, feedback)

    update_user_hidden_keywords(user_id, article_keywords)

    user = user_collection.find_one({"_id" : user_id})
    user_keywords = user['hiddenPreferences']
    user_selected_keywords = user['userSelectedPreferences']

    updated_preferences = []
    user_selected_preference = []

    for keyword in user_keywords:
        for idx, concept in enumerate(article_keywords):
            if keyword['keyword'] == concept:
                updated_preferences.append({
                    'keyword' : keyword['keyword'],
                    'score' : update_score(keyword['score'], keyword_score[idx])
                })
                break
        else:
            updated_preferences.append(keyword)

    for keyword in user_selected_keywords:
        for idx, concept in enumerate(article_keywords):
            if keyword['keyword'] == concept:
                user_selected_preference.append({
                    'keyword' : keyword['keyword'],
                    'score' : update_score(keyword['score'], keyword_score[idx])
                })
                break
        else:
            user_selected_preference.append(keyword) 
            
    updated_preferences = [obj for obj in updated_preferences if obj['score'] >= 0.3] 
    updated_preferences = sorted(updated_preferences, key = lambda x : x['score'])[:40]   

    user_collection.update_one(
        { "_id": user_id }, 
        { "$set": { 
          "hiddenPreferences": updated_preferences,
          "userSelectedPreferences" : user_selected_preference
            } }  
    )
    return 

update_keyword_scores(userId, articles, feedback) # => Call function 




#### 3. Per User News Feedback Score
based on Readtime, like/dislike, url-click

In [211]:
user_id = [ObjectId('675baa4fb49e0719e0ac4bf3')]

articles_details = [
    ('675ac0b610a4574847b8f347', 3, 1, 0, 277),
    ('675ac0b610a4574847b8f346', 1, 0, 0, 1005)
]

#calcuate the length of artcile in the frontend
def calcuate_user_feedback(articles_details, w1 = 0.3, w2 = 0.25, w3 = 0.45):
    result = []
    for article, read_time, reaction, clicked_url, length in articles_details:            

        min_time = (10 * (length / 100)) / 60
        max_time = (40 * (length / 100)) / 60
        
        if read_time < min_time:
            f_readtime = -1
        elif read_time > max_time:
            f_readtime = 1
        else:
            f_readtime = 2 * ((read_time - min_time) / (max_time - min_time + 1**-7)) - 1
        
        f_reaction = reaction
        f_url_click = clicked_url

        feedback_score = (
            w1 * f_readtime + 
            w2 * f_reaction + 
            w3 * f_url_click
        )

        feedback_score = max(-1, min(1, feedback_score))

        result.append({ 'article_id' :ObjectId(article), 'feedback_score': feedback_score})

    return result

calcuate_user_feedback(articles_details)

[{'article_id': ObjectId('675ac0b610a4574847b8f347'), 'feedback_score': 0.55},
 {'article_id': ObjectId('675ac0b610a4574847b8f346'), 'feedback_score': -0.3}]

In [193]:
articles = [
    ObjectId('675ac0b610a4574847b8f347'),
    ObjectId('675ac0b610a4574847b8f346')
]
result = collection.find({
    "_id" :{"$in" : articles}
})
for i in result:
    print(len(i['summary']))

277
1005


In [10]:
import numpy as np 
from bson import ObjectId
user_vector = np.random.rand(1024).tolist()
id_to_update = ObjectId('675baa4fb49e0719e0ac4bf3')
result = user_collection.update_one(
    {'_id': id_to_update},  # Filter by _id
    {'$set': {'userVector': [user_vector]}}  # Set the userVector field
)

#### 4. Similairity Penalty
For Reranking

In [49]:
from sklearn.metrics.pairwise  import cosine_similarity
import numpy as np 
def get_top_recommendations(user_id, recommendations):
    # Step 1: Get user history
    user_data = db['user'].find_one({"_id": user_id}, {"userHistory": {"$slice": -10}, "_id": 0})
    user_history = [article['article_id'] for article in user_data['userHistory']]
  
    user_embeddings = []
    for embeddings in collection.find({"_id" : {"$in" : user_history}},{"embedding": 1, "_id" : 0}):
        user_embeddings.append(embeddings['embedding'][0])
    
    user_embeddings = np.array(user_embeddings)  # Shape: (n_user_history, 1024)

    # # Step 4: Get embeddings for recommended documents
    print(len(recommendations))
    recommended_embeddings = []
    filtered_recommendations = []
    for embeddings in collection.find({"_id" : {"$in" : recommendations, "$nin" : user_history}},{"embedding": 1, "_id" : 1}):
        recommended_embeddings.append(embeddings['embedding'][0])
        filtered_recommendations.append(embeddings['_id'])
    
    recommendations = filtered_recommendations
    print(len(filtered_recommendations))
    recommended_embeddings = np.array(recommended_embeddings) # Shape: (n_recommendations, 1024)


    if user_embeddings.size == 0 or recommended_embeddings.size == 0:
        average_similarity_scores = np.ones(len(recommendations))
    else:
        similarity_scores = cosine_similarity(recommended_embeddings, user_embeddings)  # Shape: (n_recommendations, n_user_history)
        average_similarity_scores = np.mean(similarity_scores, axis=1)  

    # # Step 6: Get recency scores using the new function
    # # print(all_recommendation_ids)
    recency_scores = find_recency_score_for_articles(recommendations)
    rec_scores = {}
    for i in recency_scores:
        rec_scores[i['_id']] = i['recency_score']
        
    # # Step 7: Calculate final sco
    final_scores = {}
    for idx, doc_id in enumerate(recommendations):
        cosine_score = average_similarity_scores[idx] # if idx < len(average_similarity_scores) else 0
        r_score= rec_scores[ObjectId(doc_id)]
        if cosine_score > 0: final_scores[doc_id] = r_score / cosine_score

    # # Step 8: Get top 20 document IDs based on final scores
    top_10_docs = sorted(final_scores.items(), key=lambda x: x[1], reverse=True)[:10]    
    
    return top_10_docs
    
#Example usage
user_id = ObjectId('675b80f0b49e0719e0ac4be5')  # Replace with the actual user ID
final_candidate_list.append(ObjectId('675ac0b610a4574847b8f347'))
top_10 =get_top_recommendations(user_id,list(set(final_candidate_list)))
for i in top_10:
    print(i)

27
26
(ObjectId('675cab521a218adf94f0c95e'), 1.24740527062591)
(ObjectId('675cab521a218adf94f0ca95'), 1.2412987489043612)
(ObjectId('675cab521a218adf94f0ca22'), 1.2182529843843477)
(ObjectId('675cab521a218adf94f0c916'), 1.1990655993233756)
(ObjectId('675cab521a218adf94f0ca1f'), 1.1516692799503536)
(ObjectId('675cab521a218adf94f0c996'), 1.1465236295877004)
(ObjectId('675cab521a218adf94f0ca4e'), 1.1382542173305186)
(ObjectId('675cab521a218adf94f0ca2c'), 1.1341498966947563)
(ObjectId('675cab521a218adf94f0c8fa'), 1.1334585248352975)
(ObjectId('675cab521a218adf94f0cac9'), 1.1250118982855755)


In [28]:
print(len(final_candidate_list))

60


In [32]:
from transformers import pipeline, BartTokenizer, BartModel
import torch
from transformers import logging
from sklearn.metrics.pairwise import cosine_similarity
from bson import ObjectId
from qdrant_client import QdrantClient
from qdrant_client.http.models import Filter, FieldCondition, Range

# Initialize tokenizer and model
tokenizer = BartTokenizer.from_pretrained("facebook/bart-large-cnn")
model = BartModel.from_pretrained('facebook/bart-large').to("cuda")
llm_pipeline = pipeline("text-generation", model="facebook/bart-large-cnn", trust_remote_code=True)

# Initialize Qdrant client
qdrant_client = QdrantClient(
    url="https://ebc8b276-71dc-4b06-9303-2ced3445a1ec.us-east4-0.gcp.cloud.qdrant.io:6333", 
    api_key="5oIorWxl94DrfMOzfZzrK9xkM5sGQ_1FohJWCqBFZI-NWqXX2-LDJQ"
)

# Function to embed the query into a 1024-dimensional vector
def embed_query(query, tokenizer, model):
    tokens = tokenizer(query, return_tensors='pt', truncation=True, padding=True, max_length=1024)
    tokens = {key: value.to('cuda') for key, value in tokens.items()}
    
    outputs = model(**tokens)
    embeddings = outputs.last_hidden_state
    query_vector = embeddings.mean(dim=1).detach().cpu().numpy()
    return query_vector

# Function to fetch top documents using Qdrant
def fetch_documents_and_generate_response(query):
    # Step 1: Embed the query
    query_embedding = embed_query(query, tokenizer, model).flatten()

    # Step 2: Search for top documents in Qdrant
    search_results = qdrant_client.search(
        collection_name="bigData_collection",
        query_vector=query_embedding.tolist(),
        limit=5
    )

    # Step 3: Extract document details from search results
    document_ids = [result.payload["_id"] for result in search_results]
    scores = [result.score for result in search_results]

    documents = list(collection.find({"_id": {"$in": [ObjectId(doc_id) for doc_id in document_ids]}}, {"_id": 1, "summary": 1, "url": 1}))
    summaries = [doc["summary"] for doc in documents]
    urls = [doc["url"] for doc in documents]

    # Step 4: Prepare input for LLM
    selected_summaries = "\n".join(summaries)
    llm_input = f"Query: {query}\n\nContext:\n{selected_summaries}"

    # Step 5: Generate response from LLM
    response = llm_pipeline(llm_input, max_length=256, num_return_sequences=1)[0]["generated_text"]

    # Step 6: Append URLs to the response
    result_urls = [f"[For More Information, Click here]({url})" for url in urls]
    final_response = response + "\n\n" + "\n".join(result_urls)

    return final_response

# Example usage
query ="Kumbh Mela"
response = fetch_documents_and_generate_response(query)
print(response)

Query: Kumbh Mela

Context:
Travis Kelce became the fastest tight end to reach 12,000 career receiving yards. He did so in only 172 games. Kelce says Tony Gonzalez's record will never be broken. The Kansas City Chiefs beat the Los Angeles Chargers 24-17 in Week 14. 6H w6:89ED Wo?6H96:89 ED9@HX k2 9C67lQ9EEADi^^EH:EE6C]4@>^?6E96: 89ED9 @H^DE2EFD^'gefe Thanks for the feedback. Was the information in this article useful? If so, please email us at jennifer.glanfield@mailonline.co.uk. If you have any questions, please contact us at: jennifer Glanfield at jlanfield @mailonline
Essential Utilities Inc. announced more than $770,000 in total contributions during the company's 2024-2025 United Way campaign. The total donation was achieved through individual pledges made by employees from its water and gas segments. Aqua and Peoples Natural Gas have partnered with the United Way for more than two decades E96 4@CA@C2E6 >2E49 H:== 36 5:C64E65 E@ E9@D6 D2>6 @C82?:K2E:@?D E9C@F89 E96 &?:E65 (2J  CE:4

### SPARK

In [1]:
import os
from pyspark.sql import SparkSession
from pyspark.sql import Row
from pyspark.sql.functions import explode
from pyspark.sql import functions as F
from pyspark.ml.feature import Word2Vec
from pyspark.sql.types import StringType
import numpy as np
from scipy.spatial.distance import cosine
from pyspark.sql.types import ArrayType, DoubleType

In [2]:
os.environ['JAVA_HOME'] = 'C:/Program Files/Java/jdk-11.0.2'
os.environ['PYSPARK_PYTHON'] = 'C:/Users/oswme/anaconda3/envs/bigData/python.exe'
os.environ['PYSPARK_DRIVER_PYTHON'] = 'C:/Users/oswme/anaconda3/envs/bigData/python.exe'
os.environ['PYSPARK_DRIVER_PYTHON_OPTS'] = 'notebook'

In [3]:
spark = SparkSession.builder.appName("Test")\
    .getOrCreate()

In [4]:
spark