In [1]:
import datetime as dt
import os
import re
import pathlib
import requests
import pandas as pd
import numpy as np
from bs4 import BeautifulSoup
from joblib import load
import sklearn
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
from nltk.tag import pos_tag
from nltk.tokenize import word_tokenize
from sklearn.feature_extraction.text import TfidfVectorizer
import json
import dill

In [1]:
import pandas as pd
import pathlib
import os
from joblib import load
import cloudpickle
from main import main

In [2]:
with open(f"./models/tfidf_vectorizer_300.pkl", "rb") as f:
    print("Vectorizer loaded.")
    t = cloudpickle.load(f)


Vectorizer loaded.


  t = cloudpickle.load(f)


In [3]:
print(type(t))
print(hasattr(t, "vocabulary_"))
print(len(t.vocabulary_))
print(t.__class__)

<class 'sklearn.feature_extraction.text.TfidfVectorizer'>
True
300
<class 'sklearn.feature_extraction.text.TfidfVectorizer'>


In [4]:
import numpy, sklearn, cloudpickle
print(numpy.__version__, sklearn.__version__, cloudpickle.__version__)


2.1.3 1.6.1 3.0.0


In [4]:
# Load and clean the dataset
df = pd.read_csv(f"./data/2025-04/2025-04-04_news_file.txt")
try:
    df.drop('Unnamed: 0', axis=1, inplace=True)
except KeyError:
    pass
df.dropna(inplace=True)

display(df.head())

print("Vectorizing content...")

Unnamed: 0,date,content
0,2025-04-04,As stock markets continue to tumble after the ...
1,2025-04-04,Bad news for Nintendo fans in the US who plann...
2,2025-04-04,\n New tariff policies have prompted a huge...
3,2025-04-04,Car companies are in panic mode as they scramb...
4,2025-04-04,Apple stock dropped 4% in early Friday trading...


Vectorizing content...


In [5]:
df.content

0     As stock markets continue to tumble after the ...
1     Bad news for Nintendo fans in the US who plann...
2     \n    New tariff policies have prompted a huge...
3     Car companies are in panic mode as they scramb...
4     Apple stock dropped 4% in early Friday trading...
                            ...                        
93    Deal Alerts\nPost a Deal\nGo Mobile\nSign Up\n...
94    The move comes in response to tariffs imposed ...
95    We recently published a list of Jim Cramer Say...
96    Deal Alerts\nPost a Deal\nGo Mobile\nSign Up\n...
97                                                   \n
Name: content, Length: 93, dtype: object

In [None]:
# Load vectorizer and transform content
char_array = t.transform(df.content.astype(str))
#frequency_matrix = pd.DataFrame(char_array, columns=tfidf_loaded.get_feature_names_out())


In [None]:
print("Classifying articles...")
# Load the model and predict labels
model = load(f"{model_path}/lgb.joblib")
model_pred = model.predict(frequency_matrix)

In [None]:
def get_full_article(url):
    try:
        response = requests.get(url, timeout=5)
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, 'html.parser')
            paragraphs = soup.find_all('p')
            full_text = "\n".join([p.get_text() for p in paragraphs])
            return full_text
    except requests.exceptions.RequestException:
        return None  # Return None if request fails

    return None

# Can't search everything
# This URL "url = 'https://newsapi.org/v2/everything?apiKey='+api_key" gave error "the scope of your search is too broad."

def connect_to_api_csv(**context):
    
    # I want the DAG to be idempotent so that the model will use the same data to train the model or to do anything in any case 
    # need to backfill
    
    
    api_key=os.getenv("NEWS_API_KEY")
    
    exec_datetime=dt.datetime.strptime('2025-2-9', '%Y-%m-%d')
    exec_date=exec_datetime.date()

    # One idea I had to implement branching had to do with saving the csvs on sunday only but this affects atomicity. If some 
    # one decided to run the train model task only, how can he do that when they data is from teh past and not abailable?
    # I was thinking that

    # dropping csv if exists
    path="/mnt/c/Users/User/News-Project/"
    file_name=str(exec_date)+'_news_file.txt'

    # dropping file if exists
    if os.path.exists(path+file_name):
        os.remove(path+file_name)

    # creating dirs needed
    pathlib.Path(path).mkdir(parents=True, exist_ok=True)

    url=(f"https://newsapi.org/v2/everything?from={exec_date}&to={exec_date}&language=en&q=(market OR stock)&apiKey={api_key}")
    response = requests.get(url)
    print(url, response)
    resp_dict=response.json()
    articles=resp_dict['articles']

    empty_json={'date':[], 'content':[]}

    for article in articles:
        published_date=article['publishedAt']
        published_date=re.findall('[\d-]+', published_date)[0]
        content_url=article['url']
        content=get_full_article(content_url)

        #if content is none, get the description
        if content==None:
            content_descrp=article['description']
            empty_json['content'].append(content_descrp)
        else:
            empty_json['content'].append(content)
        empty_json['date'].append(published_date)
            
    news_df = pd.DataFrame(empty_json)
    news_df.to_csv(path+file_name)

'''
def connect_to_api_csv():
    
    # I want the DAG to be idempotent so that the model will use the same data to train the model or to do anything in any case 
    # need to backfill
    api_key=os.getenv("NEWS_API_KEY")
    
    exec_datetime=dt.datetime.strptime('2025-01-10', '%Y-%m-%d')
    exec_date=exec_datetime.date()
    
    # One idea I had to implement branching had to do with saving the csvs on sunday only but this affects atomicity. If some 
    # one decided to run the train model task only, how can he do that when they data is from teh past and not abailable?
    # I was thinking that
    
    # dropping csv if exists
    path="/mnt/c/Users/User/News-Project/"
    file_name=str(exec_date)+'_news_file.txt'

    # dropping file if exists
    if os.path.exists(path+file_name):
        os.remove(path+file_name)

    # creating dirs needed
    pathlib.Path(path).mkdir(parents=True, exist_ok=True)

    # Generally, it seems there's constituents of the url, my brian is trying to remember the basic web concepts I learned before.
    url=('https://newsapi.org/v2/everything?from='+str(exec_date)+'&to='+str(exec_date)+'&language=en&q=(market OR stock)&apiKey='+api_key)
    response = requests.get(url)
    resp_dict=response.json()
    #print(url, resp_dict)
    articles=resp_dict['articles']

    empty_json={'date':[], 'content':[]}

    for article in articles:
        published_date=article['publishedAt']
        #print(published_date)
        published_date=re.findall('[\d-]+', published_date)[0]
        #print(published_date)
        content_url=article['url']
        #print(content_url)
        content=get_full_article(content_url)
        #print(article.keys())

        #if content is none, get the description
        if content==None:
            content_descrp=article['description']
            empty_json['content'].append(content_descrp)
        #print(published_date+'->'+content+'\n\n')
        #append to dataset or csv file
        #news_csv+='\n"'+published_date+'"|"'+content+'"'
        else:
            empty_json['content'].append(content)
        empty_json['date'].append(published_date)
            
            #print(news_csv)
    #print(empty_json)
    #Reading into a pandas df before writing as csv
    news_df = pd.DataFrame(empty_json)
    news_df.to_csv(path+file_name)
    display(news_df)'''

In [None]:
parent_path="C:/Users/User/News-Project/"
data_path=f"{parent_path}data/"
new_path=f"{parent_path}labeled_data/"
model_path=f"{parent_path}models/"

def filter_news():
    exec_datetime=dt.datetime.strptime('2025-1-12', '%Y-%m-%d')
    exec_date=exec_datetime.date()

    file_name=str(exec_date)+'_news_file.txt'

    # creating dirs needed
    pathlib.Path(new_path).mkdir(parents=True, exist_ok=True)
    pathlib.Path(model_path).mkdir(parents=True, exist_ok=True)

    # vectorize the dataset for the day
    # loading the dataset
    df=pd.read_csv(data_path+file_name)
    df.drop('Unnamed: 0', axis=1, inplace=True)
    df.dropna(inplace=True)
    display(df)
    # Load the saved vectorizerrue
    #tfidf_loaded = load(f"{model_path}/tfidf_vectorizer_300.joblib")
    
    # Save the vectorizer
    with open(f"{model_path}/tfidf_vectorizer_300.dill", "rb") as f:
        tfidf_loaded=dill.load(f)
    #with open("tfidf_vocab.json", "r") as f:
    #    vocab = json.load(f)
    
    # Recreate TF-IDF Vectorizer with the same vocabulary
    #tfidf_loaded = TfidfVectorizer(vocabulary=vocab)

    # Transform using the loaded vectorizer
    char_array = tfidf_loaded.transform(df.content).toarray()
    frequency_matrix = pd.DataFrame(char_array, columns= tfidf_loaded.get_feature_names_out())

    # load the model
    model=load(f"{model_path}/lgb.joblib")
    model_pred=model.predict(frequency_matrix)
    df['label']=model_pred
    #df_np = df.to_numpy()
    #delimiter='<|>'
    #np.savetxt(f"{parent_path}labeled_data/"+file_name.replace('.txt', '_labeled1.txt'), df_np, delimiter=delimiter, fmt="%s")
    # replacing delimiter with \delimiter to escape while loading in vertica
    #df.content.replace('~', '\~', inplace=True)
    #df.to_csv(f"{parent_path}labeled_data/"+file_name.replace('.txt', '_labeled.txt'), index=False, sep="~", header=False, 
              #encoding="UTF-8")
    # Replace newlines inside the 'content' field with space
    df["content"] = df["content"].str.replace("\n", " ", regex=True)

    # Save as line-delimited JSON (.jsonl)
    df.to_json(f"{parent_path}labeled_data/"+file_name.replace('.txt', '_labeled.jsonl'), orient="records", lines=True)

In [None]:
# Save
tfidf=load(f"{model_path}/tfidf_vectorizer_300.joblib")

# Save the vectorizer
with open(f"{model_path}/tfidf_vectorizer_300.dill", "wb") as f:
    dill.dump(tfidf, f)

In [None]:
#testing multiple delimiters
import pandas as pd
import numpy as np

# Sample DataFrame
df = pd.DataFrame({
    "col1": ["A", "B", "C"],
    "col2": [1, 2, 3],
    "col3": ["X", "Y", "Z"]
})
display(df)
# Convert DataFrame to NumPy array
data = df.to_numpy()
#print(data, data.shape)

# Define a custom delimiter format
delimiter = " | "  # Multiple delimiters can be simulated with spaces

# Save using numpy.savetxt
np.savetxt("output.txt", data, delimiter=delimiter, fmt="%s")

print("File saved successfully!")

In [None]:
#86 rows
filter_news()

In [None]:
connect_to_api_csv()

In [None]:
def compress_data(**context):
    exec_datetime=dt.datetime(2025,1,12)
    print(exec_datetime)
    #exec_date=exec_datetime.day
    #print(exec_date)
    #exec_date=exec_datetime.date()
    #exec_datetime = context["execution_date"]
    exec_date=exec_datetime.strftime("%Y-%m")
    print(exec_date)
    #if 

In [None]:
compress_data()

In [None]:
!pip install -q -U google-genai

In [None]:
response.text

In [None]:
news='hello'
contents=["""These are the recent stock news for today. Please in a summarized way, tell me 1. The stocks mentioned today 2. Which of these stocks had a bad sentiment 3. Which had good sentiment 4. Which would you advice me to keep an eye on"""+news]
print(contents)

In [None]:
ind=["dd", "d"]
" ".join(ind)

ind.join("\n")+

In [4]:
import smtplib
# creates SMTP session
s = smtplib.SMTP('smtp.gmail.com', 587)
# start TLS for security
s.starttls()
# Authentication
s.login("udohchigozie2017@gmail.com", "")
# message to be sent
with open('ai_analysis/2025-02/2025-02-09_llm_advice.txt', 'r', encoding='utf-8') as f:
    body=f.read()
#print(tt)
subject = "Your AI Analysis Report"
message = f"Subject: {subject}\n\n{body}"

# sending the mail
s.sendmail("udohchigozie2017@gmail.com", "udohchigozie2017@gmail.com", message.encode('utf-8'))
# terminating the session
s.quit()


(221,
 b'2.0.0 closing connection 3f1490d57ef6-e5dae0d9aadsm606071276.35 - gsmtp')

In [1]:
import shutil
shutil.rmtree('test')

### Directions
1. I want to save the news incrementally i.e I want a single file to have news for only it's date. In the prediction or summarization stage, we would the dates data to get the current stock sentiment.
2. I would need to incrementally push this to my datawarehouse which would serve as the storage for historical data. The conditional branch would be that if it's the last day of the month, confirm that the minimum and maximum date of the month are in the datawarehouse after which the csv files for the month can be dropped.
2. To use the branch operator, it'll be in the downstream tasks perhaps to only analyze sentiment if it's english also if news isn't about stock save in a separate dataset. The branch operator will be the end of month thing. If it's last day of month, we drop all the data otherwise, run a dummy operator.
4. After the data is loaded into the datawarehouse another thing could be to use DBT to move all the non-stock data to a different table everyday. It makes sense to store this data historically as in the free version, I can only access articles that are a month old.
3. I plan to train a naive bayes model that will be pretrained to know if a news is related to stocks
4. I'll wrap this code and everything in a docker image after I'm done

### Clearer Steps - Check Task excel file
1. Create task for getting data - DONE
2. Create task for sending email for now with just an indication that data extraction is complete - Pending
3. Create DAG and let just these two tasks run (this will be a good way to test what happens when we add tasks to our DAG)
4. Pass the data to deepseek and ask it to label with 1 and 0 if content has to do with stock insights. Seems I might use a model fom hugging face. I need to read the insights on using this model from deep seek. - DONE (used hugging face)
5. Pretrain normal ML model - DONE
6. Create task to generate labelled data - DONE
7. Create task to copy this data to the vertica docker container and same task should load the data to the table (Bash operator I guess with docker exec and vsql tool to copy the data). Need to create a container that'll map a dir on local to the vertica container so the data is automatically accessible in docker. - In progress
8. Create branch operator where original function which check if execution data is last day of month and return two diff ids
9. First task should compress the original csv for all days in the month and then delete the originals. Second task should be a dummy for when branch goes there.
10. Next task should connect to deepseeks API to vertica and query the ones with label of 1 then use a prompt to summarize the details into an xcom
11. Use DBT to separate both the data
12. The final task should take the summary and send via email.
13. Prepare a dockerfile for the entire process and generate an image to be sure it's working.
14. Prepare a readme and push to github.
15. Use project as deemed fit.

In [2]:
"""
Main orchestration script for GitHub Actions workflow
"""
import os
import sys
from datetime import datetime
from api_data_fetcher import fetch_news_data
from news_classifier import classify_news
from ai_stock_advisor import extract_stock_news, generate_ai_advice
from email_notifier import send_email_notification
import pandas as pd

In [None]:
import cloudpickle

with open(f"./models/tfidf_vectorizer_300.pkl", "rb") as f:
    t =  cloudpickle.load(f)

# Load and clean the dataset
df = pd.read_csv(f"./data/2025-04/2025-04-04_news_file.txt")
try:
    df.drop('Unnamed: 0', axis=1, inplace=True)
except KeyError:
    pass
df.dropna(inplace=True)

display(df.head())

print("Vectorizing content...")
# Load vectorizer and transform content

char_array = t.transform(df.content).toarray()
frequency_matrix = pd.DataFrame(char_array, columns=tfidf_loaded.get_feature_names_out())

In [None]:
from joblib import load
model = load(f"./models/lgb.joblib")

In [None]:
labeled_path = classify_news("./data", "./labeled_data", "./models", "2025-04-04")

Unnamed: 0,date,content
0,2025-04-04,As stock markets continue to tumble after the ...
1,2025-04-04,Bad news for Nintendo fans in the US who plann...
2,2025-04-04,\n New tariff policies have prompted a huge...
3,2025-04-04,Car companies are in panic mode as they scramb...
4,2025-04-04,Apple stock dropped 4% in early Friday trading...


In [None]:



def main():
    """Main execution function"""
    # Get environment variables
    NEWS_API_KEY = os.environ.get('NEWS_API_KEY')
    GEMINI_API_KEY = os.environ.get('GEMINI_API_KEY')
    SMTP_URL = os.environ.get('SMTP_URL')
    SMTP_USER = os.environ.get('SMTP_USER')
    SMTP_PASSWORD = os.environ.get('SMTP_PASSWORD')
    TO_EMAIL = os.environ.get('TO_EMAIL', 'udohchigozie2017@gmail.com')
    
    # Validate required environment variables
    required_vars = {
        'NEWS_API_KEY': NEWS_API_KEY,
        'GEMINI_API_KEY': GEMINI_API_KEY,
        'SMTP_URL': SMTP_URL,
        'SMTP_USER': SMTP_USER,
        'SMTP_PASSWORD': SMTP_PASSWORD
    }
    
    missing_vars = [key for key, value in required_vars.items() if not value]
    if missing_vars:
        print(f"Error: Missing required environment variables: {', '.join(missing_vars)}")
        sys.exit(1)
    
    # Define paths
    parent_path = "./"
    data_path = f"{parent_path}data/"
    labeled_data_path = f"{parent_path}labeled_data/"
    model_path = f"{parent_path}models/"
    ai_content_path = f"{parent_path}ai_analysis/"
    
    # Get current date
    exec_date = datetime.now().strftime("%Y-%m-%d")
    
    print(f"Starting workflow for {exec_date}")
    
    # Step 1: Fetch news data
    print("Step 1: Fetching news data from API...")
    try:
        csv_path = fetch_news_data(NEWS_API_KEY, exec_date, data_path)
        print(f"✓ News data saved to {csv_path}")
    except Exception as e:
        print(f"✗ Error fetching news data: {e}")
        sys.exit(1)
    
    # Step 2: Classify news
    print("Step 2: Classifying news articles...")
    try:
        labeled_path = classify_news(data_path, labeled_data_path, model_path, exec_date)
        print(f"✓ Labeled data saved to {labeled_path}")
    except Exception as e:
        print(f"✗ Error classifying news: {e}")
        sys.exit(1)
    
    # Step 3: Extract stock news
    print("Step 3: Extracting stock-related news...")
    try:
        stock_news = extract_stock_news(labeled_data_path, exec_date)
        print(f"✓ Found {len(stock_news)} stock-related articles")
    except Exception as e:
        print(f"✗ Error extracting stock news: {e}")
        sys.exit(1)
    
    # Step 4: Generate AI advice
    print("Step 4: Generating AI stock advice...")
    try:
        advice_path = generate_ai_advice(GEMINI_API_KEY, stock_news, ai_content_path, exec_date)
        print(f"✓ AI advice saved to {advice_path}")
    except Exception as e:
        print(f"✗ Error generating AI advice: {e}")
        sys.exit(1)
    
    # Step 5: Send email notification
    print("Step 5: Sending email notification...")
    try:
        send_email_notification(SMTP_URL, SMTP_USER, SMTP_PASSWORD, TO_EMAIL, advice_path, exec_date)
        print(f"✓ Email sent successfully to {TO_EMAIL}")
    except Exception as e:
        print(f"✗ Error sending email: {e}")
        sys.exit(1)
    
    print(f"\n✓ Workflow completed successfully for {exec_date}")


if __name__ == "__main__":
    main()