In [53]:
import pandas as pd
import requests
import pathlib
import re
from bs4 import BeautifulSoup

from xai_sdk import Client
from xai_sdk.chat import user
from xai_sdk.search import SearchParameters, web_source, news_source, x_source, rss_source

import os
import json
from datetime import datetime, timedelta
from sklearn.feature_extraction.text import TfidfVectorizer
from sklearn.metrics.pairwise import cosine_similarity

In [15]:
# Trying Grok - Initial Code
client = Client(api_key="")

search_config = SearchParameters(
        mode="on",
        return_citations=True,
        from_date=datetime(2025, 9, 28),
        to_date=datetime(2025, 9, 28),
        max_search_results=30,
        sources=[web_source(country="NG"), news_source(country="NG"), x_source(), rss_source()]
    )
        
chat = client.chat.create(
    model="grok-4-fast-reasoning-latest",
    messages=[user("""I need news from the last 8 hours related to security issues such as banditry, gunmen or any kind on violent activities
    in Nigeria. I need the output json formated. For each news, return like this {"title": "<let here be a befitting title>", 
    "description":"<Summary of the incident from the news source>", "state":"<the state of occurence>", "lga":"<local government of occurence 
    if mentioned in the news>", "incident date":"<todays date or whatever date of incident>", "incidentTime":"<incident time if available>"}. 
    If any of the fields isn't available return Null but for date or time, return current date and current time.""")],
    search_parameters=search_config
)

response = chat.sample()
print("Full Response \n", response.content)

print("\n \n Citations \n", response.citations)

Full Response 
 [
  {
    "title": "Bandits Abduct Businessman In Midnight Attack On Kwara Community",
    "description": "Armed bandits carried out a midnight raid on a community in Kwara State, abducting a prominent businessman. The attack highlights ongoing security challenges in the region, as reported by Sahara Reporters.",
    "state": "Kwara",
    "lga": null,
    "incident date": "2025-09-28",
    "incidentTime": "midnight"
  }
]

 
 Citations 
 ['https://saharareporters.com/2025/09/28/breaking-bandits-abduct-businessman-midnight-attack-kwara-community', 'https://modernghana.com/news/1432967/nigeria-fights-back-against-banditry-and-insecurit.amp', 'https://thewillnews.com/opinion-nigeria-fights-back-against-banditry-and-insecurity', 'https://defenceweb.co.za/security/civil-security/armed-banditry-is-becoming-a-crisis-in-nigeria-why-fixing-the-police-is-key', 'https://link.springer.com/article/10.1057/s41284-025-00477-1', 'https://www.tandfonline.com/doi/full/10.1080/17419166.20

In [29]:
import os
import json  # Add this for parsing

from datetime import datetime, timedelta
from xai_sdk import Client
from xai_sdk.chat import user
from xai_sdk.search import SearchParameters, web_source, news_source, x_source, rss_source

client = Client(api_key="...")  # Replace with your real key

# Dynamic dates for last 8 hours
from_date = datetime.now() - timedelta(hours=8)
to_date = datetime.now()

search_config = SearchParameters(
    mode="on",
    return_citations=True,
    from_date=from_date,
    to_date=to_date,
    max_search_results=30,
    sources=[web_source(country="NG"), news_source(country="NG"), x_source(), rss_source(links=['https://news.google.com/rss/search?q=nigeria+security&hl=en-NG&gl=NG&ceid=NG:en'])]
)
    
chat = client.chat.create(
    model="grok-4-fast-reasoning-latest",
    messages=[user("""I need news from the last 8 hours related to security issues such as banditry, gunmen or any kind on violent activities
    in Nigeria. I need the output json formated. For each news, return like this {"title": "<let here be a befitting title>", 
    "description":"<Summary of the incident from the news source>", "state":"<the state of occurence>", "lga":"<local government of occurence 
    if mentioned in the news>", "incidentDate":"<todays date or whatever date of incident>", "incidentTime":"<incident time if available>",
    "status":"High" or "Medium" or "Low" depending on the severity of the news}. 
    If any of the fields isn't available return Null but for date or time, return current date and current time.""")],
    search_parameters=search_config
)

response = chat.sample()
#print("Full Response Content:\n", response.content)

#print("\nCitations:\n", response.citations)

# Parse the JSON news items if present in content
try:
    news_data = json.loads(response.content)  # Assumes content is a JSON array/object
    print("\nParsed News Items:\n", json.dumps(news_data, indent=2))
except json.JSONDecodeError:
    print("\nContent is not valid JSON—check the raw output above.")

# Debug: See everything in response
print("\nFull Response Object Attributes:\n", dir(response))

Full Response Content:
 [
  {
    "title": "Bandits Abduct Businessman in Midnight Attack on Kwara Community",
    "description": "Armed bandits launched a midnight raid on a community in Kwara State, abducting a prominent businessman during the attack.",
    "state": "Kwara",
    "lga": null,
    "incident date": "2025-09-28",
    "incidentTime": "00:00"
  },
  {
    "title": "Bandits Kill 10 Vigilantes and Abduct Several in Oke Ode Community, Kwara",
    "description": "Gunmen suspected to be bandits attacked Oke Ode community in Kwara State, killing 10 vigilantes and abducting several residents, despite government claims of repelling the assault.",
    "state": "Kwara",
    "lga": "Ifelodun",
    "incident date": "2025-09-28",
    "incidentTime": null
  },
  {
    "title": "NSCDC Launches Manhunt for Attackers of Jigawa Polytechnic Students",
    "description": "The Nigeria Security and Civil Defence Corps has initiated a search for assailants who attacked students at a polytechnic 

In [7]:
# Define your schema
REQUIRED_FIELDS = ["title", "description", "state", "lga", "status"]
OPTIONAL_FIELDS = ["incidentDate", "incidentTime", "lat", "lng"]
ALL_FIELDS = REQUIRED_FIELDS + OPTIONAL_FIELDS + ["is_duplicate"]


def normalize_news(news_data):
    """
    Normalize list of dicts into a DataFrame with consistent schema.
    Missing fields are filled with None.
    """
    df = pd.DataFrame(news_data)
    for col in ALL_FIELDS:
        if col not in df.columns:
            df[col] = None  # fill missing fields
    # Ensure correct column order
    df = df[ALL_FIELDS]
    return df


def save_news_to_csv(news_data, folder="news_data"):
    """Save current run news into a timestamped CSV"""
    os.makedirs(folder, exist_ok=True)
    curr_dt = datetime.now()
    timestamp = curr_dt.strftime("%Y%m%d_%H")
    exec_month = curr_dt.strftime("%Y-%m")
    filepath = os.path.join(folder, exec_month, f"{timestamp}.csv")
    df = normalize_news(news_data)
    df.to_csv(filepath, index=False)
    print(f"✅ Saved {len(df)} news to {filepath}")
    return filepath


def load_recent_news(folder="news_data", days=5):
    """Load news from the last N days of CSVs"""
    curr_dt = datetime.now()
    exec_month = curr_dt.strftime("%Y-%m")
    
    cutoff = datetime.now() - timedelta(days=days)
    dfs = []
    for fname in os.listdir(os.path.join(folder, exec_month)):
        if fname.endswith(".csv"):
            try:
                file_date = datetime.strptime(fname.split(".")[0], "%Y%m%d_%H")
                if file_date >= cutoff:
                    dfs.append(pd.read_csv(os.path.join(folder, exec_month, fname)))
            except Exception:
                pass
    return pd.concat(dfs, ignore_index=True) if dfs else pd.DataFrame()


def deduplicate_news(current_news, past_news, threshold=0.85):
    """Remove duplicates by cosine similarity on title+description"""
    if past_news.empty:
        current_news["is_duplicate"] = False
        return current_news

    combined_past = (
        past_news["title"].fillna("") + " " + past_news["description"].fillna("")
    ).tolist()
    combined_current = (
        current_news["title"].fillna("") + " " + current_news["description"].fillna("")
    ).tolist()

    vectorizer = TfidfVectorizer().fit(combined_past + combined_current)
    past_vecs = vectorizer.transform(combined_past)
    curr_vecs = vectorizer.transform(combined_current)

    duplicates = []
    for i, vec in enumerate(curr_vecs):
        sim = cosine_similarity(vec, past_vecs).max()
        duplicates.append(sim >= threshold)

    current_news["is_duplicate"] = duplicates
    return current_news

def publish_news(api_key: str, news_items: list):
    """
    Publishes news items to the Convex threats API.

    Args:
        api_key (str): Your Convex API key (string starting with stmp_...).
        news_items (list): A list of dicts containing threat data. 
                           Must include required fields:
                           title, description, state, lga, status
                           Optional fields: incidentDate, incidentTime, lat, lng
    """
    url = f"https://fantastic-mammoth-699.convex.site/api/threats?api_key={api_key}"
    headers = {"Content-Type": "application/json"}

    try:
        response = requests.post(url, headers=headers, json=news_items)
        response.raise_for_status()
        print("✅ Successfully published news!")
        return response.json()
    except requests.exceptions.HTTPError as http_err:
        print(f"❌ HTTP error occurred: {http_err} - {response.text}")
    except Exception as err:
        print(f"❌ Other error occurred: {err}")

def fetch_security_news(api_key: str):
    """
    Fetch security-related news in Nigeria from the last 8 hours.
    
    Args:
        api_key (str): Your XAI API key.

    Returns:
        list | dict: Parsed JSON object with news items if successful, 
                     or None if parsing fails.
    """
    client = Client(api_key=api_key)

    # Dynamic dates for last 8 hours
    from_date = datetime.now() - timedelta(hours=8)
    to_date = datetime.now()

    search_config = SearchParameters(
        mode="on",
        return_citations=True,
        from_date=from_date,
        to_date=to_date,
        max_search_results=30,
        sources=[
            web_source(country="NG"),
            news_source(country="NG"),
            x_source(),
            rss_source(links=[
                'https://news.google.com/rss/search?q=nigeria+security&hl=en-NG&gl=NG&ceid=NG:en'
            ])
        ]
    )

    chat = client.chat.create(
        model="grok-4-fast-reasoning-latest",
        messages=[user(
            """I need news from the last 8 hours related to security issues such as banditry, gunmen or any kind on violent activities
            in Nigeria. I need the output json formatted. For each news, return like this:
            {"title": "<title>", 
             "description":"<summary>", 
             "state":"<state of occurrence>", 
             "lga":"<local government if mentioned>", 
             "incidentDate":"<date>", 
             "incidentTime":"<time if available>",
             "status":"High" or "Medium" or "Low"}.
            If any of the fields isn't available return Null, but for date or time, 
            return current date and current time."""
        )],
        search_parameters=search_config
    )

    response = chat.sample()

    try:
        news_data = json.loads(response.content)  # Assumes valid JSON array/object
        return news_data
    except json.JSONDecodeError:
        print("⚠️ Content is not valid JSON—check the raw output instead.")
        return None


def filter_and_publish(news_data, api_key, folder="news_data"):
    """Workflow to save, deduplicate, and publish only unique news"""
    # Save current
    filepath = save_news_to_csv(news_data, folder)

    # Load past 5 days
    past_news = load_recent_news(folder, days=5)

    # Deduplicate
    current_df = normalize_news(news_data)
    deduped = deduplicate_news(current_df, past_news)

    # Filter unique
    unique_news = deduped[deduped["is_duplicate"] == False]

    print(f"📊 Found {len(unique_news)} unique news out of {len(current_df)}")

    if not unique_news.empty:
        publish_news(api_key, unique_news.to_dict(orient="records"))
    else:
        print("⚠️ No unique news to publish.")


In [40]:


# Example integration after you parse your API response
try:
    with open("Grok/key.json", "r") as f:
        config = json.load(f)
    
    grok_api_key = config["grok_api_key"]
    convex_api_key = config["convex_api_key"]
    
    news_data = fetch_security_news(grok_api_key)
    
    if isinstance(news_data, list) and news_data:
        filter_and_publish(news_data, api_key=convex_api_key)
    else:
        print("⚠️ No news items returned.")
except json.JSONDecodeError:
    print("❌ Could not parse response content as JSON.")


✅ Saved 3 news to news_data\2025-09\20250928_17.csv
📊 Found 3 unique news out of 3


NameError: name 'publish_news' is not defined

In [8]:
api_key = "stmp_owv8v4cd4rfl9uqqqc5hp"

news_items = [
    {
        "title": "Food Security Crisis - Northeast Region",
        "description": "Acute malnutrition affecting ~1.8 million children under 5 in Borno State due to US funding cuts. Displaced persons in camps increasingly vulnerable.",
        "state": "Borno",
        "lga": "Dikwa",
        "status": "High",
        "lat": 11.1,
        "lng": 13.2,
        "incidentDate": "2025-09-28",
        "incidentTime": "09:00"
    },
    {
        "title": "Healthcare Worker Violence",
        "description": "Female healthcare worker assaulted at Specialist Hospital Damaturu, Yobe State. Part of wider pattern of workplace violence.",
        "state": "Yobe",
        "lga": "Damaturu",
        "status": "Medium",
        "incidentDate": "2025-09-27",
        "incidentTime": "09:00"
    }
]

result = publish_news(api_key, news_items)
print(result)


✅ Successfully published news!
{'success': True, 'processed': 2, 'successful': 2, 'failed': 0, 'results': [{'index': 1, 'id': 'k17959fe0e8kn25k21j8gszmy57recqg', 'success': True}, {'index': 2, 'id': 'k174b5d94ysg5wdsgvwck084w97rewwn', 'success': True}], 'message': 'Processed 2 threats: 2 successful, 0 failed'}


In [2]:
def get_full_article(url):
    """
    Scrapes and returns the full text of a news article from the given URL.

    Parameters:
        url (str): URL of the news article.

    Returns:
        str: Concatenated text from all <p> tags if successful.
        None: If the request fails or content is unavailable.
    """
    try:
        response = requests.get(url, timeout=5)
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, 'html.parser')
            paragraphs = soup.find_all('p')
            full_text = "\n".join([p.get_text() for p in paragraphs])
            return full_text
    except requests.exceptions.RequestException:
        return None  # Return None if the request fails

    return None
    
def connect_to_api_csv(**context):
    """
    Connects to the news API, retrieves news articles for a given day,
    processes the articles, and saves them as a CSV file.

    The function:
    - Formats the execution date.
    - Removes an existing file if it exists.
    - Creates necessary directories.
    - Builds the API URL and retrieves data.
    - Scrapes full article content (or uses description if not available).
    - Saves the data in CSV format.
    """
    data_path = "news_data/"
    exec_datetime = datetime.now()
    exec_date = exec_datetime.strftime("%Y-%m-%d")
    exec_month = exec_datetime.strftime("%Y-%m")

    file_name = str(exec_date) + '_news_file.txt'

    # Drop file if it exists
    if os.path.exists(f"{data_path}{exec_month}/{file_name}"):
        os.remove(f"{data_path}{exec_month}/{file_name}")

    # Create required directory for the month
    pathlib.Path(f"{data_path}{exec_month}").mkdir(parents=True, exist_ok=True)

    # Build the news API URL
    url = (f"https://newsapi.org/v2/everything?q=(nigeria AND (security OR terrorism OR insurgency OR banditry OR kidnapping OR conflict OR military OR police))&from=2025-09-20&sortBy=publishedAt&language=en&apiKey=11a5a1f11b614295a2c034acd5e2b046")
    response = requests.get(url)
    resp_dict = response.json()
    articles = resp_dict['articles']

    empty_json = {'date': [], 'content': []}

    # Process each article in the response
    for article in articles:
        published_date = article['publishedAt']
        published_date = re.findall('[\d-]+', published_date)[0]
        content_url = article['url']
        content = get_full_article(content_url)

        # If full content is unavailable, use the description
        if content is None:
            content_descrp = article['description']
            empty_json['content'].append(content_descrp)
        else:
            empty_json['content'].append(content)
        empty_json['date'].append(published_date)
            
    news_df = pd.DataFrame(empty_json)
    news_df_shape = news_df.shape
    if news_df_shape[0] == 0:
        new_row = [exec_date, "No data for this day"]
        news_df.loc[len(news_df)] = new_row
        news_df.to_csv(f"{data_path}{exec_month}/{file_name}")
    else:
        news_df.to_csv(f"{data_path}{exec_month}/{file_name}")

In [24]:
connect_to_api_csv()

In [26]:
news_df = pd.read_csv('news_data/2025-09/2025-09-24_news_file.txt')

In [28]:
news_df

Unnamed: 0.1,Unnamed: 0,date,content
0,0,2025-09-23,The Reds aim to maintain their winning run as ...
1,1,2025-09-23,As a recent project to connect thousands of ho...
2,2,2025-09-23,"First Lady of Nigeria, Oluremi Tinubu\nFirst L..."
3,3,2025-09-23,\nPLOS Neglected Tropical Diseases is the top ...
4,4,2025-09-23,Germany’s Autentic Distribution is to shop 80 ...
5,5,2025-09-23,The World Health Organization estimates that a...
6,6,2025-09-23,The schedule for the second leg of the first p...
7,7,2025-09-23,"Large, selective colleges enroll the greatest ..."
8,8,2025-09-22,"“If you want a state, kill some Jews,” grumble..."
9,9,2025-09-22,Business Insider Edition \nThis as-told-to ess...


In [2]:
from convex import ConvexClient
from datetime import datetime

key = "eyJ2MiI6IjAzZWM2YjgzMGU1MDQyOTBhMDM2ZTVhMzRiMGY0N2MzIn0="

# Connect to your Convex deployment
client = ConvexClient("https://fantastic-mammoth-699.convex.cloud")
#client.set_auth(key) 

news_items = [
    {
        "title": "Food Security Crisis - Northeast Region",
        "description": (
            "Acute malnutrition affecting ~1.8 million children under 5. "
            "US funding cuts have impacted displaced populations in camps, "
            "including Fulatari camp in Dikwa, Borno State. "
            "Increased vulnerability of IDPs fleeing Boko Haram terrorism."
        ),
        "states": ["Borno"]
    },
    {
        "title": "Healthcare Worker Violence",
        "description": (
            "Physical assault on female healthcare worker at Specialist Hospital Damaturu. "
            "Part of broader pattern of violence against healthcare workers. "
            "Surveys show 64% of Kaduna and 88% of Abia health workers experience workplace violence."
        ),
        "states": ["Yobe", "Kaduna", "Abia"]
    },
    {
        "title": "Criminal Activity",
        "description": (
            "Suspects arrested in Kano for attempted murder of Nigerian singer Lil Kesh. "
            "Involved theft of ~₦2 million in Lagos."
        ),
        "states": ["Kano", "Lagos"]
    }
]

# Insert into Convex threats table
for item in news_items:
    for state in item["states"]:
        now = datetime.utcnow()  # use UTC for consistency
        client.mutation("threats:addThreat", {
            "description": item["description"],
            "lat": 0.0,
            "lng": 0.0,
            "lga": "unknown",
            "state": state,
            "status": "High",
            "title": item["title"],
            # auto-filled timestamps
            "incidentDate": now.strftime("%Y-%m-%d"),
            "incidentTime": now.strftime("%H:%M:%S"),
        })

print("✅ News inserted into Convex threats table with timestamps")


  now = datetime.utcnow()  # use UTC for consistency


Exception: [Request ID: 275c2a8db5ce375b] Server Error
Uncaught Error: Not authenticated
    at handler (../convex/threats.ts:70:4)


In [None]:
# Imports

# Airflow Modules
import airflow
from airflow.models import Variable
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.python import BranchPythonOperator
from airflow.operators.bash import BashOperator
from airflow.providers.common.sql.operators.sql import SQLExecuteQueryOperator
from airflow.operators.dummy import DummyOperator
from airflow.utils.email import send_email

# Trad NLP Modules
from joblib import load
import nltk
# Download necessary NLTK resources quietly
nltk.download("wordnet", quiet=True)
nltk.download('averaged_perceptron_tagger_eng', quiet=True)
from nltk.tag import pos_tag
from nltk.tokenize import word_tokenize
from nltk.corpus import stopwords
from nltk.stem import WordNetLemmatizer
import cloudpickle

# Useful Functionality Modules
import pandas as pd
import datetime as dt
import os
from bs4 import BeautifulSoup
import markdown
import shutil
import requests
import re
import datetime as dt
import pathlib
import json

# Google Gemini Modules
from google import genai
from google.genai import types

# Retrieve credentials from Airflow Variables
api_key = Variable.get("NEWS_API_KEY")
airflow_vars = Variable.get("email_vars", deserialize_json=True)
#airflow_vars_str = Variable.get("email_vars", deserialize_json=True)
#airflow_vars = json.loads(str(airflow_vars_str))
gemini_api_key = Variable.get("GEMINI_API_KEY")

SMTP_URL = airflow_vars['smtp_url']
SMTP_USER = airflow_vars['smtp_user']
SMTP_PASSWORD = airflow_vars['smtp_password']

# Default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'start_date': dt.datetime(2025, 4, 1),
}

# Defining path variables
parent_path = "/app/News-Api-Project/"
data_path = f"{parent_path}data/"
labeled_data_path = f"{parent_path}labeled_data/"
model_path = f"{parent_path}models/"
compressed_data_path = f"{parent_path}compressed_data/"
ai_content = f"{parent_path}ai_analysis/"

# DBT file paths
stock_news_path = "/app/News-Api-Project/My_DBT/Airflow_Stock_Sentiment_Project/models/airflow_stock_sentiment_models/Stock_News.sql"
non_stock_news_path = "/app/News-Api-Project/My_DBT/Airflow_Stock_Sentiment_Project/models/airflow_stock_sentiment_models/Non_Stock_News.sql"
dbt_path = "/app/News-Api-Project/My_DBT/Airflow_Stock_Sentiment_Project"

# DBT file content for stock news and non-stock news
stock_news_dbt_file_content = """{{ config(materialized='incremental',
unique_key='content') }}

select * from News_DB.Full_News_Table 

{% if is_incremental() %}
    where label =1 and date={{ ds }}
{% endif %}"""

non_stock_news_dbt_file_content = """{{ config(materialized='incremental',
unique_key='content') }}

select * from News_DB.Full_News_Table 

{% if is_incremental() %}
    where label =1 and date={{ ds }}
{% endif %}"""


def get_full_article(url):
    """
    Scrapes and returns the full text of a news article from the given URL.

    Parameters:
        url (str): URL of the news article.

    Returns:
        str: Concatenated text from all <p> tags if successful.
        None: If the request fails or content is unavailable.
    """
    try:
        response = requests.get(url, timeout=5)
        if response.status_code == 200:
            soup = BeautifulSoup(response.text, 'html.parser')
            paragraphs = soup.find_all('p')
            full_text = "\n".join([p.get_text() for p in paragraphs])
            return full_text
    except requests.exceptions.RequestException:
        return None  # Return None if the request fails

    return None


def connect_to_api_csv(**context):
    """
    Connects to the news API, retrieves news articles for a given day,
    processes the articles, and saves them as a CSV file.

    The function:
    - Formats the execution date.
    - Removes an existing file if it exists.
    - Creates necessary directories.
    - Builds the API URL and retrieves data.
    - Scrapes full article content (or uses description if not available).
    - Saves the data in CSV format.
    """
    exec_datetime = context["execution_date"]
    exec_date = exec_datetime.strftime("%Y-%m-%d")
    exec_month = exec_datetime.strftime("%Y-%m")

    file_name = str(exec_date) + '_news_file.txt'

    # Drop file if it exists
    if os.path.exists(f"{data_path}{exec_month}/{file_name}"):
        os.remove(f"{data_path}{exec_month}/{file_name}")

    # Create required directory for the month
    pathlib.Path(f"{data_path}{exec_month}").mkdir(parents=True, exist_ok=True)

    # Build the news API URL
    url = (f"https://newsapi.org/v2/everything?from={exec_date}&to={exec_date}&language=en&q=(market OR stock)&apiKey={api_key}")
    response = requests.get(url)
    resp_dict = response.json()
    articles = resp_dict['articles']

    empty_json = {'date': [], 'content': []}

    # Process each article in the response
    for article in articles:
        published_date = article['publishedAt']
        published_date = re.findall('[\d-]+', published_date)[0]
        content_url = article['url']
        content = get_full_article(content_url)

        # If full content is unavailable, use the description
        if content is None:
            content_descrp = article['description']
            empty_json['content'].append(content_descrp)
        else:
            empty_json['content'].append(content)
        empty_json['date'].append(published_date)
            
    news_df = pd.DataFrame(empty_json)
    news_df_shape = news_df.shape
    if news_df_shape[0] == 0:
        new_row = [exec_date, "No data for this day"]
        news_df.loc[len(news_df)] = new_row
        news_df.to_csv(f"{data_path}{exec_month}/{file_name}")
    else:
        news_df.to_csv(f"{data_path}{exec_month}/{file_name}")


# Load pre-trained TF-IDF vectorizer using cloudpickle
with open(f"{model_path}/tfidf_vectorizer_300.pkl", "rb") as f:
    tfidf_loaded = cloudpickle.load(f)


def filter_news(**context):
    """
    Filters news data to extract stock-related articles using a pre-trained model.

    The function:
    - Loads the daily news CSV file.
    - Cleans the data by dropping unnecessary columns and rows with missing values.
    - Transforms the content using a loaded TF-IDF vectorizer.
    - Predicts labels using a pre-trained model.
    - Saves the labeled data as a line-delimited JSON (.jsonl) file.
    """
    exec_datetime = context["execution_date"]    # <datetime> with timezone
    exec_date = exec_datetime.strftime("%Y-%m-%d")
    exec_month = exec_datetime.strftime("%Y-%m")

    file_name = str(exec_date) + '_news_file.txt'

    # Create required directories for labeled data and models
    pathlib.Path(f"{labeled_data_path}{exec_month}").mkdir(parents=True, exist_ok=True)
    pathlib.Path(model_path).mkdir(parents=True, exist_ok=True)

    # Load and clean the dataset
    df = pd.read_csv(f"{data_path}{exec_month}/{file_name}")
    df.drop('Unnamed: 0', axis=1, inplace=True)
    df.dropna(inplace=True)

    # Transform content using the loaded TF-IDF vectorizer
    char_array = tfidf_loaded.transform(df.content).toarray()
    frequency_matrix = pd.DataFrame(char_array, columns=tfidf_loaded.get_feature_names_out())

    # Load the model and predict labels
    model = load(f"{model_path}/lgb.joblib")
    model_pred = model.predict(frequency_matrix)
    df['label'] = model_pred
    df["content"] = df["content"].str.replace("\n", " ", regex=True)

    # Save the labeled data as line-delimited JSON (.jsonl)
    df.to_json(f"{labeled_data_path}{exec_month}/" + file_name.replace('.txt', '_labeled.jsonl'), orient="records", lines=True)


def compress_choice(**context):
    """
    Determines whether to compress and remove files based on the next execution day.

    Returns:
        str: 'compress_and_remove_files' if next execution day is the 1st,
             otherwise 'do_nothing_start_dbt'.
    """
    exec_datetime = context["next_execution_date"]
    exec_day = exec_datetime.day  # Get day of month
    if exec_day == 1:
        return 'compress_and_remove_files'
    else:
        return 'do_nothing_start_dbt'


def compress_and_remove_files_(**context):
    """
    Compresses raw news data for the month and removes the original folder if it's time to archive.

    The function:
    - Checks if it's the start of a new month.
    - Compresses the folder containing raw news data.
    - Removes the original uncompressed folder after archiving.
    """
    next_exec_datetime = context["next_execution_date"]
    next_exec_day = next_exec_datetime.day  # Get day of month

    exec_datetime = context["execution_date"]
    exec_month = exec_datetime.strftime("%Y-%m")

    compressed_file_name = f"{exec_month}_raw_news_data"

    # Create compressed data directory if it doesn't exist
    if os.path.exists(f"{compressed_data_path}"):
        pass
    else:
        os.makedirs(f"{compressed_data_path}")

    # If it's the 1st day and the archive does not exist, then compress
    if next_exec_day == 1 and os.path.exists(f"{compressed_data_path}{compressed_file_name}.zip"):
        pass
    elif next_exec_day == 1:
        # Path to the folder to compress
        folder_to_compress = f"{data_path}{exec_month}"

        # Output archive path (without .zip extension)
        archive_name = f"{compressed_data_path}{compressed_file_name}"

        # Create a zip archive of the folder
        shutil.make_archive(archive_name, 'zip', folder_to_compress)

        # Remove the original folder after compression
        shutil.rmtree(f"{data_path}{exec_month}")
    else:
        pass


def LLM_advice(**context):
    """
    Generates AI-based stock sentiment advice using Google Gemini based on extracted news.

    The function:
    - Retrieves news content from XCom.
    - Sends the news content to Google Gemini for summarization and stock advice.
    - Saves the AI-generated advice to a text file.
    """
    exec_datetime = context["execution_date"]
    exec_date = exec_datetime.strftime("%Y-%m-%d")
    exec_month = exec_datetime.strftime("%Y-%m")

    llm_output = f"{ai_content}{exec_month}/{exec_date}_llm_advice.txt"

    # System instruction for the language model
    sys_instruct = "You are a Stock Sentiment Analyst. Your work is to summarize news data and give stock investment advice in a nicely formated way."
    client = genai.Client(api_key=gemini_api_key)
    news = context["task_instance"].xcom_pull(
        task_ids="extract_stock_news_info", key="return_value"
    )
    news_as_text = ''
    for ind, cont in enumerate(news):
        news_as_text += f"News_{ind+1} \n '\n'.join{[str(a) for a in cont]} \n"

    response = client.models.generate_content(
        model="gemini-2.0-flash",
        config=types.GenerateContentConfig(
            system_instruction=sys_instruct),
        contents=["""These are the recent stock news for today. Please in a summarized way, tell me 1. The stocks mentioned today 2. Which of these stocks had a bad sentiment 3. Which had good sentiment 4. Which would you advice me to keep an eye on \n""" + news_as_text]
    )
    # Create directory for AI content if it doesn't exist
    if os.path.exists(f"{ai_content}{exec_month}"):
        pass
    else:
        os.makedirs(f"{ai_content}{exec_month}")
    # Save the AI-generated advice to file
    with open(llm_output, "w") as f:
        f.write(response.text)


def format_file_to_html(file_path):
    """
    Converts a Markdown-formatted text file into a styled HTML email.

    Parameters:
        file_path (str): Path to the Markdown file.

    Returns:
        str: HTML content with inline CSS styling.
    """
    # Read the raw Markdown text from the file
    with open(file_path, 'r', encoding='utf-8') as f:
        raw_text = f.read()
    
    # Convert Markdown to HTML
    html_body = markdown.markdown(raw_text)
    
    # Wrap the HTML body in a complete HTML email template with inline CSS
    html_email = f"""
    <!DOCTYPE html>
    <html>
      <head>
        <meta charset="utf-8">
        <title>AI Generated Stock News & Investment Advice</title>
        <style>
          body {{
            font-family: Arial, sans-serif;
            line-height: 1.6;
            color: #333;
            margin: 0;
            padding: 0;
            background-color: #f9f9f9;
          }}
          .container {{
            max-width: 800px;
            margin: 30px auto;
            background-color: #fff;
            padding: 20px;
            border: 1px solid #ddd;
            box-shadow: 0 2px 4px rgba(0,0,0,0.1);
          }}
          h1 {{
            color: #00539C;
          }}
          h2 {{
            color: #0073e6;
            border-bottom: 1px solid #ddd;
            padding-bottom: 5px;
          }}
          ul {{
            list-style-type: disc;
            margin-left: 20px;
          }}
          .good {{
            color: green;
            font-weight: bold;
          }}
          .bad {{
            color: red;
            font-weight: bold;
          }}
          .disclaimer {{
            font-size: 0.9em;
            color: #666;
            margin-top: 20px;
            border-top: 1px solid #ccc;
            padding-top: 10px;
          }}
        </style>
      </head>
      <body>
        <div class="container">
          <h2>AI Analysis on Stock News Provided</h2>
          {html_body}
        </div>
      </body>
    </html>
    """
    return html_email


def notify_email(**context):
    """
    Formats the AI advice file as an HTML email and sends it to designated recipients.

    The function:
    - Converts the advice file from Markdown to styled HTML.
    - Composes the email with subject and attachments.
    - Sends the email using Airflow's send_email utility.
    """
    exec_datetime = context["execution_date"]
    exec_date = exec_datetime.strftime("%Y-%m-%d")
    exec_month = exec_datetime.strftime("%Y-%m")

    file_path = f"{ai_content}{exec_month}/{exec_date}_llm_advice.txt"

    subject = f"AI Analysis Report on Stock News Today {exec_date}"
    html_content = format_file_to_html(file_path)
    # Specify file attachments for the email
    attachments = [file_path]
    
    send_email(
        to=["udohchigozie2017@gmail.com"],
        subject=subject,
        html_content=html_content,
        files=attachments
    )


# Define the DAG and its tasks using a context manager
with DAG(dag_id="Stock_sentiment_analysis", default_args=default_args, 
         schedule_interval="@daily", catchup=False) as dag:

    # Task: Retrieve news data from the API and save as CSV
    get_data = PythonOperator(
        task_id='get_news_data', 
        python_callable=connect_to_api_csv
    )

    # Task: Filter and label news data for stock relevance
    filter_data = PythonOperator(
        task_id='get_only_stock_news', 
        python_callable=filter_news
    )

    # Task: Load filtered data into the database (Vertica)
    load_to_table = SQLExecuteQueryOperator(
        task_id="load_to_vertica",
        conn_id="vertica",
        sql=r"""COPY News_DB.Full_News_Table from LOCAL '{{ params.labeled_data_path }}{{ ds[:7] }}/{{ ds }}_news_file_labeled.jsonl' PARSER fjsonparser();""",
        params={'labeled_data_path': labeled_data_path}
    )

    # Task: Decide whether to compress raw data files
    compress_or_not = BranchPythonOperator(
        task_id='compress_choice', 
        python_callable=compress_choice
    )

    # Set task dependencies: get data -> filter data -> load data -> branch decision
    get_data >> filter_data >> load_to_table >> compress_or_not

    # Task: Compress and remove files if the branch condition is met
    compress_and_remove_files = PythonOperator(
        task_id='compress_and_remove_files', 
        python_callable=compress_and_remove_files_
    )

    # Dummy task: No action needed if compression is not required
    do_nothing = DummyOperator(task_id='do_nothing_start_dbt', dag=dag)

    # Task: Execute DBT commands to separate stock and non-stock news and run DBT models
    start_dbt = BashOperator(
        task_id="separate_stock_news",
        bash_command="""
        rm {{ params.stock_path }} {{ params.non_stock_path }} &&
        
        echo "{% raw %}{{ config(materialized='incremental', unique_key='content') }} 
        select * from News_DB.Full_News_Table 
        {% if is_incremental() %}
            where label = {% endraw %}{{ params.one }}{% raw %} and date = '{% endraw %}{{ ds }}{% raw %}'
        {% endif %}{% endraw %}" >> {{ params.stock_path }} &&

        echo "{% raw %}{{ config(materialized='incremental', unique_key='content') }} 
        select * from News_DB.Full_News_Table 
        {% if is_incremental() %}
            where label = {% endraw %}{{ params.zero }}{% raw %} and date = '{% endraw %}{{ ds }}{% raw %}'
        {% endif %}{% endraw %}" >> {{ params.non_stock_path }} &&

        cd {{ params.dbt_path }} && dbt run
    """,
        params={
            "stock_path": stock_news_path,
            "non_stock_path": non_stock_news_path,
            "dbt_path": dbt_path,
            "one": 1,
            "zero": 0
        },
        trigger_rule="none_failed"
    )

    # Define branch dependencies: compress or not -> start DBT
    compress_or_not >> [compress_and_remove_files, do_nothing] >> start_dbt

    # Task: Extract stock news content from the database for AI processing
    extract_news_info = SQLExecuteQueryOperator(
        task_id="extract_stock_news_info",
        conn_id="vertica",
        sql=r"""select content from News_DB.Stock_News where date='{{ ds }}' and label=1;""",
        do_xcom_push=True
    )

    # Task: Generate AI-based stock recommendations using LLM
    get_ai_recommendation = PythonOperator(
        task_id='LLM_advice', 
        python_callable=LLM_advice
    )

    # Set dependency: after DBT, extract news info then generate AI recommendations
    start_dbt >> extract_news_info >> get_ai_recommendation

    # Task: Send the AI-generated stock insights via email
    notify = PythonOperator(
        task_id="send_ai_stock_insight", 
        python_callable=notify_email
    )
    
    # Set final dependency: after AI recommendations, send notification email
    get_ai_recommendation >> notify