# prereqs

In [None]:
%pip install tinydb ollama pandas pandera bokeh nltk gnews wordcloud

# db setup

In [None]:
from tinydb import TinyDB
from pandas import DataFrame

def save_to_db(df, db_path, table_name):
    db = TinyDB(db_path)
    table = db.table(table_name)
    table.truncate() # clear the table
    for i, row in df.iterrows():
        table.insert(row.to_dict())
    db.close()
    print(f"Saved {len(df)} rows to {db_path}/{table_name}")
    return df

def load_from_db(db_path, table_name):
    db = TinyDB(db_path)
    table = db.table(table_name)
    df = DataFrame(table.all())
    db.close()
    print(f"Loaded {len(df)} rows from {db_path}/{table_name}")
    return df

# llm setup

In [None]:
import ollama

# to help find which llm to use:
# ollama.list()

In [None]:
# setup the bot personas

news_bot = ollama.Client(host='http://localhost:11434')
headline_bot_init = [
        {
            'role': 'system',
            'content': 'You are a journalist writing a news headline. Include only content, no explanation. Include subltle random biases and opinions. Do not ask follow-up questions or include annotations or parenthases.',
        },
]
description_bot_init = [
        {
            'role': 'system',
            'content': 'You are a journalist writing a news description based off of a headline. Include only content, no explanation. Include subltle random biases and opinions. Do not ask follow-up questions or include annotations or parenthases.',
        },
]


def ask(question, context=[{'role':'system', 'content':'You are a helpful knowledge sharer'}]):
    response = news_bot.chat(model='llama3.1', messages=[
        *context,
        {
            'role': 'user',
            'content': question,
        },
        ])
    
    return response['message']['content']

In [None]:
# functions to generate test news headlines and descriptions

from datetime import datetime

def generate_news_headline():
    return ask('generate a single random news headline?', headline_bot_init)

def generate_news_description(headline:str):
    return ask(f'generate a single random news story based on the headline "{headline}"?', description_bot_init)

def generate_news():
    headline = generate_news_headline()
    description = generate_news_description(headline)
    date = datetime.now().isoformat()
    return headline, description, date

def generate_news_batch(n):
    return [generate_news() for _ in range(n)]


# get news headlines

In [None]:
from gnews import GNews

gnews = GNews(language='en', country='US', period='14d')
gnews.max_results = 100

# data modeling and processing

In [None]:
from pandas import DataFrame, concat, to_datetime
from pandera import Column, DataFrameSchema, Index, String, Object

In [None]:
raw_data_schema = DataFrameSchema({
    'headline': Column(String, nullable=False),
    'description': Column(String, nullable=False),
    'url': Column(String, nullable=False),
    'published date': Column(String, nullable=False),
    'publisher': Column(Object, nullable=False),
},
    index=Index(int),
)

In [None]:
class NewsGatherer:
    def __init__(self, language='en', country='US', period='7d', max_results=100):
        self.gnews = GNews(language=language, country=country, period=period, exclude_websites=[], max_results=max_results, start_date=None, end_date=None)
        self.news = DataFrame()
        
    def get_news(self, keyword=None, top=False, location=None, topic=None, site=None):
        """
        Retrieves news articles based on specified parameters.
        Parameters:
        - keyword (str): Optional. Retrieves news articles containing the specified keyword.
        - top (bool): Optional. If True, retrieves top news articles.
        - location (str): Optional. Retrieves news articles from the specified location.
        - topic (str): Optional. Retrieves news articles from the specified topic. Valid topics are 'WORLD', 'NATION', 'BUSINESS', 'TECHNOLOGY', 'ENTERTAINMENT', 'SPORTS', 'SCIENCE', 'HEALTH'.
        - site (str): Optional. Retrieves news articles from the specified site.
        Returns:
        - DataFrame: A DataFrame containing the retrieved news articles.
        """
        
        # hardcoded topics
        topics =  ['WORLD', 'NATION', 'BUSINESS', 'TECHNOLOGY', 'ENTERTAINMENT', 'SPORTS', 'SCIENCE', 'HEALTH']
        news_list = []
        if keyword:
            news_list.append(DataFrame(self.gnews.get_news(keyword)))
        if top:
            news_list.append(DataFrame(self.gnews.get_top_news()))
        if location:
            news_list.append(DataFrame(self.gnews.get_news_by_location(location)))
        if topic and topic in topics:
            topic = topic.upper()
            news_list.append(DataFrame(self.gnews.get_news_by_topic(topic)))
        if site:
            news_list.append(DataFrame(self.gnews.get_news_by_site(site)))
            
        self.news = concat(news_list)
        self.news.rename(columns={'title':'headline'}, inplace=True)
        return self.news
    
    def return_news(self):
        return self.news
    
    def clean_news(self):
        self.news = self.news_schema.validate(self.news)

In [None]:
class NewsGenerator:
    def __init__(self, language='en', generator='ollama', gen_url='http://localhost:11434/api/chat'):
        self.language = language
        self.generator = generator
        self.gen_url = gen_url
        self.fake_news = DataFrame()
        self.schema = raw_data_schema

    def generate_news(self, n):
        self.fake_news = DataFrame(generate_news_batch(n), columns=['headline', 'description', 'date'])
        self.clean_news()
        return self.fake_news
    
    def clean_news(self):
        self.fake_news['published date'] = self.fake_news['date']
        self.fake_news['publisher'] = 'Ollama'
        self.fake_news['url'] = self.gen_url
        self.fake_news = self.schema.validate(self.fake_news)
        return self.fake_news
    
    


# gathering and generation

In [None]:
ngen = NewsGenerator()
ngen.generate_news(100)
ngen.fake_news.head()

In [None]:
nget = NewsGatherer()
nget.get_news(keyword='space', topic='technology', top=True, site='nasa.gov')
nget.get_news(keyword='science', topic='science', top=True, site='home.cern')

nget.get_news(topic='WORLD')
nget.get_news(topic='NATION')
nget.get_news(topic='BUSINESS')
nget.get_news(topic='TECHNOLOGY')
nget.get_news(topic='ENTERTAINMENT')
nget.get_news(topic='SPORTS')
nget.get_news(topic='SCIENCE')
nget.get_news(topic='HEALTH')

nget.get_news(location='US')
nget.get_news(location='UK')
nget.get_news(location='Canada')
nget.get_news(location='Australia')
nget.get_news(location='India')
nget.get_news(location='China')
nget.get_news(location='Russia')
nget.get_news(location='Brazil')
nget.get_news(location='Mexico')
nget.get_news(location='Japan')

nget.get_news(site='nasa.gov')
nget.get_news(site='home.cern')
nget.get_news(site='www.esa.int')
nget.get_news(site='www.spacex.com')
nget.get_news(site='www.nasa.gov')

nget.get_news(site='www.bbc.com')
nget.get_news(site='www.cnn.com')
nget.get_news(site='www.foxnews.com')
nget.get_news(site='www.nytimes.com')
nget.get_news(site='www.washingtonpost.com')
nget.get_news(site='www.huffpost.com')
nget.get_news(site='www.reuters.com')
nget.get_news(site='www.npr.org')
nget.get_news(site='www.apnews.com')
nget.get_news(site='www.aljazeera.com')

nget.news.head()

In [None]:
# df = concat([ngen.fake_news, nget.news])
df = nget.news

In [None]:
df

# analysis

In [None]:
from nltk.corpus import stopwords
from nltk.sentiment import SentimentIntensityAnalyzer
from nltk.stem import WordNetLemmatizer

from nltk.corpus import wordnet
from nltk.tokenize import word_tokenize
from nltk import pos_tag
from string import punctuation

# Initialize NLTK components
lemmatizer = WordNetLemmatizer()
stop_words = set(stopwords.words('english'))
sia = SentimentIntensityAnalyzer()


In [None]:


def get_wordnet_pos(treebank_tag):
    """Map POS tag to first character used by WordNetLemmatizer"""
    if treebank_tag.startswith('J'):
        return wordnet.ADJ
    elif treebank_tag.startswith('V'):
        return wordnet.VERB
    elif treebank_tag.startswith('N'):
        return wordnet.NOUN
    elif treebank_tag.startswith('R'):
        return wordnet.ADV
    else:
        return wordnet.NOUN  # by default, treat as noun

def preprocess_text(text):
    tokens = word_tokenize(text)
    tokens = [word.lower() for word in tokens]
    tokens = [word for word in tokens if word.isalpha() or word in punctuation]
    pos_tags = pos_tag(tokens)
    tokens = [lemmatizer.lemmatize(word, get_wordnet_pos(pos)) for word, pos in pos_tags]
    tokens = [word for word in tokens if word not in stop_words]
    return " ".join(tokens)

def get_sentiment_scores(text):
    return sia.polarity_scores(text)

def analyze_sentiment(headline):
    sentiment_analyzer = SentimentIntensityAnalyzer()
    return sentiment_analyzer.polarity_scores(headline)

def clean_text(text):
    # Remove stopwords
    text = ' '.join([word for word in text.split() if word.lower() not in stop_words])
    # Remove publisher title from headline
    text = text.split(' - ')[0]
    # Retain only first line
    text = text.split('\n')[0]
    # Lemmatize words
    text = ' '.join([lemmatizer.lemmatize(word) for word in text.split()])
    # Remove punctuation
    text = text.replace('[^\w\s]', '')
    # Remove extra spaces
    text = ' '.join(text.split())
    # Remove hyphens
    text = text.replace('-', ' ')
    return text


In [None]:
df['cleaned_headline'] = df['headline'].apply(clean_text)
df['cleaned_description'] = df['description'].apply(clean_text)
df['sentiment'] = df['cleaned_description'].apply(lambda x: sia.polarity_scores(x)['compound'])

df.head()

In [None]:
df = df.copy()

df['publisher name'] = df['publisher'].apply(lambda x: str(x['title']) if isinstance(x, dict) else None)

def apply_sentiment_analysis(df, column_name, prefix):
    df[f'{prefix}_sentiment'] = df[column_name].apply(analyze_sentiment)
    df[f'{prefix}_positive'] = df[f'{prefix}_sentiment'].apply(lambda x: float(x['pos']))
    df[f'{prefix}_negative'] = df[f'{prefix}_sentiment'].apply(lambda x: float(x['neg']))
    df[f'{prefix}_neutral'] = df[f'{prefix}_sentiment'].apply(lambda x: float(x['neu']))
    df[f'{prefix}_compound'] = df[f'{prefix}_sentiment'].apply(lambda x: float(x['compound']))

def apply_wordcount(df, column_name, prefix):
    df[f'{prefix}_wordcount'] = df[column_name].apply(lambda x: len(x.split()))

def apply_preprocessing_and_sentiment_analysis(df, column_name, prefix):
    df[f'abstracted_{prefix}'] = df[column_name].apply(preprocess_text)
    apply_sentiment_analysis(df, f'abstracted_{prefix}', f'abstracted_{prefix}')
    apply_wordcount(df, f'abstracted_{prefix}', f'abstracted_{prefix}')

# Apply sentiment analysis and word count for headlines
apply_sentiment_analysis(df, 'cleaned_headline', 'headline')
apply_wordcount(df, 'cleaned_headline', 'headline')

# Apply sentiment analysis and word count for descriptions
apply_sentiment_analysis(df, 'cleaned_description', 'description')
apply_wordcount(df, 'cleaned_description', 'description')

# Apply preprocessing, sentiment analysis, and word count for abstracted headlines
apply_preprocessing_and_sentiment_analysis(df, 'cleaned_headline', 'headline')

# Apply preprocessing, sentiment analysis, and word count for abstracted descriptions
apply_preprocessing_and_sentiment_analysis(df, 'cleaned_description', 'description')

df

In [None]:
def apply_grammar_analysis(df, column_name, prefix):
    df[f'{prefix}_grammar'] = df[column_name].apply(lambda x: pos_tag(word_tokenize(x)))
    df[f'{prefix}_noun'] = df[f'{prefix}_grammar'].apply(lambda x: len([word for word, pos in x if pos in ['NN', 'NNS', 'NNP', 'NNPS']]))
    df[f'{prefix}_verb'] = df[f'{prefix}_grammar'].apply(lambda x: len([word for word, pos in x if pos in ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']]))
    df[f'{prefix}_adjective'] = df[f'{prefix}_grammar'].apply(lambda x: len([word for word, pos in x if pos in ['JJ', 'JJR', 'JJS']]))
    df[f'{prefix}_adverb'] = df[f'{prefix}_grammar'].apply(lambda x: len([word for word, pos in x if pos in ['RB', 'RBR', 'RBS']]))
    df[f'{prefix}_pronoun'] = df[f'{prefix}_grammar'].apply(lambda x: len([word for word, pos in x if pos in ['PRP', 'PRP$', 'WP', 'WP$']]))
    df[f'{prefix}_conjunction'] = df[f'{prefix}_grammar'].apply(lambda x: len([word for word, pos in x if pos in ['CC']]))
    df[f'{prefix}_preposition'] = df[f'{prefix}_grammar'].apply(lambda x: len([word for word, pos in x if pos in ['IN']]))
    df[f'{prefix}_interjection'] = df[f'{prefix}_grammar'].apply(lambda x: len([word for word, pos in x if pos in ['UH']]))
    df[f'{prefix}_grammar_count'] = df[f'{prefix}_grammar'].apply(len)

apply_grammar_analysis(df, 'cleaned_headline', 'headline')
apply_grammar_analysis(df, 'cleaned_description', 'description')
apply_grammar_analysis(df, 'abstracted_headline', 'abstracted_headline')
apply_grammar_analysis(df, 'abstracted_description', 'abstracted_description')

df.head()

# visuals

In [None]:
from bokeh.io import output_notebook, curdoc
from bokeh.plotting import figure, show
from bokeh.models import ColumnDataSource, Whisker
from bokeh.models import HoverTool, LinearColorMapper

output_notebook()
curdoc().theme = 'night_sky'

hover = HoverTool(tooltips=[
        ('headline', '@headline'),
        ('headline_compound', '@headline_compound'),
        ('headline_positive', '@headline_positive'),
        ('headline_negative', '@headline_negative'),
        ('headline_neutral', '@headline_neutral'),
        ('headline_wordcount', '@headline_wordcount'),
        # ('description', '@description'),
        # ('description_compound', '@description_compound'),
        # ('description_positive', '@description_positive'),
        # ('description_negative', '@description_negative'),
        # ('description_neutral', '@description_neutral'),
        # ('description_wordcount', '@description_wordcount'),
        # ('abstracted_headline', '@abstracted_headline'),
        # ('abstracted_headline_compound', '@abstracted_headline_compound'),
        # ('abstracted_headline_positive', '@abstracted_headline_positive'),
        # ('abstracted_headline_negative', '@abstracted_headline_negative'),
        # ('abstracted_headline_neutral', '@abstracted_headline_neutral'),
        # ('abstracted_headline_wordcount', '@abstracted_headline_wordcount'),
        # ('abstracted_description', '@abstracted_description'),
        # ('abstracted_description_compound', '@abstracted_description_compound'),
        # ('abstracted_description_positive', '@abstracted_description_positive'),
        # ('abstracted_description_negative', '@abstracted_description_negative'),
        # ('abstracted_description_neutral', '@abstracted_description_neutral'),
        # ('abstracted_description_wordcount', '@abstracted_description_wordcount'),

    ])



In [None]:
# valid_df.plot(kind='kde')
# valid_df.plot(kind='hist')


In [None]:
from wordcloud import WordCloud, get_single_color_func
import numpy as np
import pandas as pd
from nltk import pos_tag

class GroupedColorFunc(object):
    """Create a color function object which assigns DIFFERENT SHADES of
       specified colors to certain words based on the color to words mapping.

       Uses wordcloud.get_single_color_func

       Parameters
       ----------
       color_to_words : dict(str -> list(str))
         A dictionary that maps a color to the list of words.

       default_color : str
         Color that will be assigned to a word that's not a member
         of any value from color_to_words.
    """

    def __init__(self, color_to_words, default_color):
        self.color_func_to_words = [
            (get_single_color_func(color), set(words))
            for (color, words) in color_to_words.items()]

        self.default_color_func = get_single_color_func(default_color)

    def get_color_func(self, word):
        """Returns a single_color_func associated with the word"""
        try:
            color_func = next(
                color_func for (color_func, words) in self.color_func_to_words
                if word in words)
        except StopIteration:
            color_func = self.default_color_func

        return color_func

    def __call__(self, word, **kwargs):
        return self.get_color_func(word)(word, **kwargs)

color_to_pos = {
    'blue': ['NN', 'NNS', 'NNP', 'NNPS'],
    'green': ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ'],
    'red': ['JJ', 'JJR', 'JJS'],
    'orange': ['RB', 'RBR', 'RBS'],
    'purple': ['PRP', 'PRP$', 'WP', 'WP$'],
    'yellow': ['CC'],
    'pink': ['IN'],
    'brown': ['UH'],
}

words_df = df['cleaned_headline'].str.split(expand=True).stack().reset_index().rename(columns={'level_0': 'index', 0: 'word'})
words_df['word'] = words_df['word'].str.replace("'s", "")
strings_to_replace = ['.', ',', '!', '?', ':', ';', '(', ')', '[', ']', '{', '}', '"', "'", '`']
for s in strings_to_replace:
    words_df['word'] = words_df['word'].str.replace(s, '')

def categorize_words_to_dict(df, column_name):
    words_df = df[column_name].str.split(expand=True).stack().reset_index().rename(columns={'level_0': 'index', 0: 'word'})
    words_df = words_df[words_df['word'].str.len() > 2]  # Filter out single-letter words
    words_df['pos'] = words_df['word'].apply(lambda x: pos_tag([x])[0][1])
    return words_df.groupby('pos')['word'].apply(list).to_dict()

def calculate_word_colors(df, column_name):
    words_dict = categorize_words_to_dict(df, column_name)
    color_to_words = {color: words for color, words in color_to_pos.items() if color in words_dict}
    return color_to_words

def word_cloud(df):
    import matplotlib.pyplot as plt

    wordcloud = WordCloud(width=1000, height=750, max_font_size=1000, max_words=500, background_color='darkgrey')
    color_to_words = calculate_word_colors(df, 'word')
    wordcloud.generate_from_frequencies(frequencies=df['word'].value_counts().to_dict())



    plt.imshow(wordcloud, interpolation='bilinear')
    plt.axis('off')
    return plt

plot_df = df.copy()
plot_df.groupby('publisher name')

wordcloud = word_cloud(words_df)
wordcloud.savefig('wordcloud.png')
wordcloud.show()
words_df

In [None]:
plot_df

In [None]:
# import logscale
from bokeh.plotting import figure


def plot_word_counts(df):
    plot_df = df['cleaned_headline'].str.split(expand=True).stack().value_counts().reset_index().rename(columns={'index': 'word', 0: 'count'})
    p = figure(x_range=plot_df['word'], title='Word Frequency', toolbar_location=None, tools='', min_width=600, min_height=400)
    
    p.xgrid.grid_line_color = None
    p.ygrid.grid_line_color = None
    p.scatter(x='word', y='count', size='count', fill_color='blue', line_color='white', source=plot_df)
    p.add_tools(HoverTool(tooltips=[('Word', '@word'), ('Count', '@count')]))

    show(p)

plot_word_counts(plot_df)

In [None]:
from bokeh.plotting import figure, show
from bokeh.models import ColumnDataSource, HoverTool
from bokeh.palettes import Category20c

from bokeh.transform import cumsum

def pie_cart_wordcount(df):
    plot_df = df['word'].str.split(expand=True).stack().value_counts().reset_index().rename(columns={'index': 'word', 0: 'count'})
    plot_df['angle'] = plot_df['count']/plot_df['count'].sum() * 2*np.pi
    # plot_df['color'] = Category20c[len(plot_df)]

    p = figure(height=350, title='Word Frequency', toolbar_location=None, tools='', min_width=600, min_height=400)
    p.wedge(x=0, y=1, radius=0.4, start_angle=cumsum('angle', include_zero=True), end_angle=cumsum('angle'), line_color='white', legend_field='word', source=plot_df)
    p.axis.axis_label = None
    p.axis.visible = False
    p.grid.grid_line_color = None

    p.add_tools(HoverTool(tooltips=[('Word', '@word'), ('Count', '@count')]))

    show(p)

pie_cart_wordcount(words_df)

In [None]:
def plot_scatter_sentiment(df):
    source = ColumnDataSource(df)
    p = figure()
    radii = df['headline_compound']
    color_mapper = LinearColorMapper(palette='Viridis256', low=min(radii), high=max(radii))
    p.scatter(x='headline_positive', y='headline_negative', source=source, fill_color={'field': 'headline_compound', 'transform': color_mapper}, size='headline_wordcount', line_color='black', line_width=0.5)
    p.xaxis.axis_label = 'Positivity'
    p.yaxis.axis_label = 'Negativity'
    p.title.text = 'Word Counts'
    p.add_tools(hover)
    return p

show(plot_scatter_sentiment(plot_df))

In [None]:
def boxplot(df):
    p = figure(
        title='Boxplot for Sentiment Analysis of News titles',
        x_range=df['headline'].unique(),
        x_axis_label='headline',
        y_axis_label='Sentiment Score',
        min_width=1200,

    )


    p.add_tools(hover)

    source = ColumnDataSource(df)

    headline_whisker = Whisker(source=source, base='headline', upper='headline_positive', lower='headline_negative', line_color='black')
    p.add_layout(headline_whisker)

    p.vbar(x='headline', top='headline_compound', bottom='headline_neutral', width=0.5, source=source, fill_color='blue', line_color='black')
    p.vbar(x='headline', top='headline_neutral', bottom='headline_negative', width=0.5, source=source, fill_color='green', line_color='black')

    # p.scatter(x='headline', y='headline_compound', source=source, color='blue', legend_label='Compound')
    # p.scatter(x='headline', y='headline_positive', source=source, color='green', legend_label='Positive')
    # p.scatter(x='headline', y='headline_negative', source=source, color='red', legend_label='Negative')
    # p.scatter(x='headline', y='headline_neutral', source=source, color='gray', legend_label='Neutral')
    return p

show(boxplot(plot_df))

In [None]:
def plot_scatter(df):
    source = ColumnDataSource(df)
    p = figure(title='Sentiment Analysis', x_axis_label='Neutral', y_axis_label='Pos/Neg/Compound', min_width=800)
    p.add_tools(hover)
    p.scatter(x='headline_neutral', y='headline_positive', source=source, color='#14346499', size='abstracted_headline_wordcount', legend_label='Positive')
    p.scatter(x='headline_neutral', y='headline_negative', source=source, color='#34146499', size='abstracted_headline_wordcount', legend_label='Negative')
    p.scatter(x='headline_neutral', y='headline_compound', source=source, color='#46413499', size='abstracted_headline_wordcount', legend_label='Compound')
    return p

show(plot_scatter(plot_df))

In [None]:
def plot_sentiment_scatter(df):
    p = figure(title='Sentiment Analysis', x_axis_label='Word Count', y_axis_label='Sentiment Score', min_width=800)
    p.add_tools(hover)
    p.scatter(x='headline_wordcount', y='headline_compound', source=ColumnDataSource(df), size='abstracted_headline_wordcount', color='blue')
    return p

show(plot_sentiment_scatter(plot_df))

# set analysis

In [None]:
from bokeh.layouts import gridplot

dfs = [plot_df, df]

def plot_all(dfs):
    plots = []
    for df in dfs:
        plots.append(boxplot(df))
        plots.append(plot_scatter(df))
        plots.append(plot_sentiment_scatter(df))
    return gridplot(plots, ncols=3)

show(plot_all(dfs))