In [1]:
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml import Pipeline
from pyspark.sql.functions import col, trim, lower

spark = SparkSession.builder.appName("NewsSentimentAnalysis").getOrCreate()


In [2]:
from google.colab import files

def load_df():
    uploaded = files.upload()
    df = pd.read_csv(next(iter(uploaded)), encoding='latin1')
    df = df[["text", "sentiment"]]  #cloumn name as in the
    spark_df = spark.createDataFrame(df)
    # More robust cleaning of sentiment column
    spark_df = spark_df.withColumn("sentiment", trim(lower(col("sentiment"))))
    spark_df = spark_df.filter(
        (col("sentiment").isNotNull()) &
        (col("sentiment") != "") &
        (col("sentiment") != "nan")
    )
    spark_df = spark_df.filter(
        (col("sentiment") == "positive") | (col("sentiment") == "negative")
    )
    spark_df.show()
    return spark_df

In [3]:
train_df = load_df()
test_df = load_df()


Saving train.csv to train (2).csv
+--------------------+---------+
|                text|sentiment|
+--------------------+---------+
| Sooo SAD I will ...| negative|
|my boss is bullyi...| negative|
| what interview! ...| negative|
| Sons of ****, wh...| negative|
|2am feedings for ...| positive|
| Journey!? Wow......| positive|
|I really really l...| positive|
|My Sharpie is run...| negative|
|i want to go to m...| negative|
|Uh oh, I am sunbu...| negative|
| S`ok, trying to ...| negative|
|i`ve been sick fo...| negative|
|is back home now ...| negative|
|Playing Ghost Onl...| positive|
|the free fillin` ...| positive|
|          I`m sorry.| negative|
|On the way to Mal...| negative|
|juss came backk f...| positive|
|Went to sleep and...| negative|
|I`m going home no...| positive|
+--------------------+---------+
only showing top 20 rows



Saving test.csv to test (2).csv
+--------------------+---------+
|                text|sentiment|
+--------------------+---------+
| Shanghai is also...| positive|
|Recession hit Ver...| negative|
|         happy bday!| positive|
| http://twitpic.c...| positive|
| that`s great!! w...| positive|
|I THINK EVERYONE ...| negative|
| soooooo wish i c...| negative|
|My bike was put o...| negative|
|I`m in VA for the...| negative|
|Its coming out th...| negative|
|So hot today =_= ...| negative|
|            Miss you| negative|
|        Cramps . . .| negative|
| you guys didn`t ...| positive|
|Stupid storm. No ...| negative|
|My dead grandpa p...| negative|
|... need retail t...| negative|
| you are lame  go...| negative|
|       thats so cool| positive|
| look who I found...| positive|
+--------------------+---------+
only showing top 20 rows



In [4]:
train_df.count(), test_df.count()


(16363, 2104)

In [5]:
train_df.groupBy("sentiment").count().show()
test_df.groupBy("sentiment").count().show()


+---------+-----+
|sentiment|count|
+---------+-----+
| positive| 8582|
| negative| 7781|
+---------+-----+

+---------+-----+
|sentiment|count|
+---------+-----+
| positive| 1103|
| negative| 1001|
+---------+-----+



In [7]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import Tokenizer, StopWordsRemover, HashingTF, IDF
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.feature import StringIndexer
from pyspark.ml.feature import Word2Vec



# Build ML Pipeline
print(f"Training samples: {train_df.count()}, Test: {test_df.count()}")

# Build improved pipeline
tokenizer = Tokenizer(inputCol="text", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=2000)  # Increased
idf = IDF(inputCol="rawFeatures", outputCol="features")
labelIndexer = StringIndexer(inputCol="sentiment", outputCol="indexedLabel").setHandleInvalid("skip")
lr = LogisticRegression(maxIter=20, regParam=0.1, featuresCol="features", labelCol="indexedLabel")  # Tuned

pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, labelIndexer, lr])

# Train model using the existing train_df
model = pipeline.fit(train_df)

Training samples: 16363, Test: 2104


In [37]:
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

def evaluate(test_df):
    print(f"evaluation df with rows: {test_df.count()}")
    predictions = model.transform(test_df)

    predictions.select('text', 'probability').show()

    evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="accuracy")
    accuracy = evaluator.evaluate(predictions)
    print(f"Accuracy: {accuracy:.4f}")

    # Optionally compute F1-score
    f1_evaluator = MulticlassClassificationEvaluator(labelCol="indexedLabel", predictionCol="prediction", metricName="f1")
    f1 = f1_evaluator.evaluate(predictions)
    print(f"F1-score: {f1:.4f}")


In [38]:
evaluate(test_df)


evaluation df with rows: 2104
+--------------------+--------------------+
|                text|         probability|
+--------------------+--------------------+
| Shanghai is also...|[0.93316695997353...|
|Recession hit Ver...|[0.26749427348193...|
|         happy bday!|[0.76834022193378...|
| http://twitpic.c...|[0.63226015806229...|
| that`s great!! w...|[0.55404217667697...|
|I THINK EVERYONE ...|[0.45991358141430...|
| soooooo wish i c...|[0.25990075547439...|
|My bike was put o...|[0.48170222051407...|
|I`m in VA for the...|[0.32529340563400...|
|Its coming out th...|[0.06289082270963...|
|So hot today =_= ...|[0.10702940040824...|
|            Miss you|[0.29387978869182...|
|        Cramps . . .|[0.62230557314881...|
| you guys didn`t ...|[0.67494332341428...|
|Stupid storm. No ...|[0.64765724736036...|
|My dead grandpa p...|[0.19960006523781...|
|... need retail t...|[0.07536261418514...|
| you are lame  go...|[0.41535526132874...|
|       thats so cool|[0.69728494532706...|
| 

In [39]:
evaluate(train_df)

evaluation df with rows: 16363
+--------------------+--------------------+
|                text|         probability|
+--------------------+--------------------+
| Sooo SAD I will ...|[0.09433190221133...|
|my boss is bullyi...|[0.46573050821159...|
| what interview! ...|[0.40911909718575...|
| Sons of ****, wh...|[0.50823170262448...|
|2am feedings for ...|[0.81257661484290...|
| Journey!? Wow......|[0.82171134447466...|
|I really really l...|[0.86764598961562...|
|My Sharpie is run...|[0.27167667389436...|
|i want to go to m...|[0.30261777556025...|
|Uh oh, I am sunbu...|[0.47454770797212...|
| S`ok, trying to ...|[0.23250682550946...|
|i`ve been sick fo...|[0.33084405362576...|
|is back home now ...|[0.31963645369293...|
|Playing Ghost Onl...|[0.72015270196714...|
|the free fillin` ...|[0.66048136696424...|
|          I`m sorry.|[0.35755840920519...|
|On the way to Mal...|[0.48266269984661...|
|juss came backk f...|[0.71867004301543...|
|Went to sleep and...|[0.12943843611876...|
|

In [13]:
# Save model
model.write().overwrite().save("/content/news_sentiment_model")

In [14]:
import requests

API_KEY = 'dc4e05a01f6b41469d7feb0ab22e65ce'  # Replace with your key
url = 'https://newsapi.org/v2/everything?q=finance&apiKey=' + API_KEY + '&pageSize=5'
response = requests.get(url)
print("Status:", response.status_code)
if response.status_code == 200:
    data = response.json()
    print("Sample Articles:", len(data.get('articles', [])))
    if data.get('articles'):
        print("Sample Title:", data['articles'][0]['title'])
else:
    print("Error Details:", response.text)

Status: 200
Sample Articles: 5
Sample Title: Robinhood Is Building a Social Network for Following Market Movers’ Trades


In [40]:
%%writefile app.py
import streamlit as st
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql.functions import when, col, udf
from pyspark.ml import PipelineModel
import plotly.express as px
import requests
from pyspark.ml.feature import Tokenizer, HashingTF, IDF
from pyspark.ml.linalg import DenseVector

# Initialize Spark
spark = SparkSession.builder.appName("Dashboard").getOrCreate()
model = PipelineModel.load("/content/news_sentiment_model")

# Fetch function with debugging
def fetch_news_headlines(query='finance', num_articles=20):
    API_KEY = 'dc4e05a01f6b41469d7feb0ab22e65ce'  # Replace with your NewsAPI key
    url = 'https://newsapi.org/v2/everything'
    params = {'q': query, 'apiKey': API_KEY, 'sortBy': 'publishedAt', 'pageSize': num_articles}
    try:
        response = requests.get(url, params=params)
        response.raise_for_status()
        data = response.json()
        st.write("API Response Status:", response.status_code)
        if 'articles' not in data or not data['articles']:
            st.error("No articles found. Try broader query like 'news'.")
            return pd.DataFrame()
        headlines = []
        for article in data['articles']:
            if article.get('title'):
                headlines.append({
                    'title': article['title'],
                    'source': article.get('source', {}).get('name', 'Unknown'),
                    'publishedAt': article.get('publishedAt', '')
                })
        if not headlines:
            st.error("No valid titles extracted.")
            return pd.DataFrame()
        df = pd.DataFrame(headlines)
        st.write("Fetched Headlines:", df.head())  # Debug: Show fetched data
        return df
    except requests.exceptions.RequestException as e:
        st.error(f"API Request Failed: {e}")
        return pd.DataFrame()
    except ValueError as e:
        st.error(f"JSON Decode Error: {e}")
        return pd.DataFrame()

# UDF to extract positive and negative probabilities
extract_pos_prob = udf(lambda v: float(v[1]) if v is not None else 0.0, "double")
extract_neg_prob = udf(lambda v: float(v[0]) if v is not None else 0.0, "double")

# Predict function (using both probabilities)
def predict_sentiment(headlines_df):
    if headlines_df.empty or 'title' not in headlines_df.columns:
        return pd.DataFrame(columns=['text', 'sentiment', 'probability'])
    # Create Spark DataFrame
    spark_df = spark.createDataFrame(headlines_df['title'], "string").withColumnRenamed("value", "text")
    # Manually apply feature extraction
    tokenizer = Tokenizer(inputCol="text", outputCol="words")
    hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=2000)
    idf = IDF(inputCol="rawFeatures", outputCol="features")
    # Transform without fit for Tokenizer and HashingTF
    words_data = tokenizer.transform(spark_df)
    tf_data = hashingTF.transform(words_data)
    idf_model = idf.fit(tf_data)
    featured_data = idf_model.transform(tf_data)
    # Extract the classifier and predict
    classifier = model.stages[-1]  # LogisticRegressionModel
    preds = classifier.transform(featured_data)
    # Debug: Show unique prediction values
    # st.write("Prediction Values:", preds.select("prediction").distinct().collect())
    # Add extracted probabilities as columns
    preds = preds.withColumn("pos_prob", extract_pos_prob(preds["probability"]))
    preds = preds.withColumn("neg_prob", extract_neg_prob(preds["probability"]))
    preds_pd = preds.select("text", "prediction", "probability", "pos_prob", "neg_prob").toPandas()
    # Debug: Show sample probabilities
    st.write("Sample Probabilities:", preds_pd[['text', 'pos_prob', 'neg_prob']].head())
    # Adjust thresholds:
    preds_pd['sentiment'] = preds_pd.apply(
        lambda row: "Positive" if row['pos_prob'] >= row['neg_prob'] else "Negative",
        axis=1
    )
    result = preds_pd[['text', 'sentiment', 'probability']]
    if result.empty:
        st.warning("No predictions generated—check model compatibility.")
    return result

# Streamlit app
st.title("🚀 Real-Time News Sentiment Dashboard")
st.markdown("Powered by PySpark ML – Analyzing Live News!")

query = st.sidebar.text_input("News Topic", "finance")
if st.sidebar.button("Fetch & Predict"):
    with st.spinner("Analyzing sentiments..."):
        news_df = fetch_news_headlines(query)
        if not news_df.empty:
            preds_df = predict_sentiment(news_df)
            st.session_state.preds = preds_df
        else:
            st.error("No news fetched. Check API key or query.")

if 'preds' in st.session_state:
    df = st.session_state.preds
    st.subheader("Latest News Headlines")
    df['confidence'] = df['probability'].apply(lambda x: f"{max(x)*100:.1f}%")
    st.dataframe(df[['text', 'sentiment', 'confidence']], use_container_width=True)

    st.subheader("Sentiment Distribution")
    fig = px.pie(df, names='sentiment', title="Sentiment Distribution",
                 color='sentiment', color_discrete_map={'Positive': '#00CC96', 'Negative': '#EF553B', 'Neutral': '#AB63FA'})
    st.plotly_chart(fig)

    st.subheader("Sentiment Bar Chart")
    st.bar_chart(df['sentiment'].value_counts())

st.markdown("---")
st.info(f"Data from NewsAPI.org ")

Overwriting app.py


In [16]:
!pip install streamlit pyngrok



Collecting streamlit
  Downloading streamlit-1.50.0-py3-none-any.whl.metadata (9.5 kB)
Collecting pyngrok
  Downloading pyngrok-7.4.0-py3-none-any.whl.metadata (8.1 kB)
Collecting pydeck<1,>=0.8.0b4 (from streamlit)
  Downloading pydeck-0.9.1-py2.py3-none-any.whl.metadata (4.1 kB)
Downloading streamlit-1.50.0-py3-none-any.whl (10.1 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m10.1/10.1 MB[0m [31m64.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading pyngrok-7.4.0-py3-none-any.whl (25 kB)
Downloading pydeck-0.9.1-py2.py3-none-any.whl (6.9 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m6.9/6.9 MB[0m [31m77.0 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: pyngrok, pydeck, streamlit
Successfully installed pydeck-0.9.1 pyngrok-7.4.0 streamlit-1.50.0


In [41]:
!ngrok authtoken 2OGvdUYAA80CrlzQV3s7kENO0wq_4vtn4GDqhrpi7LfW38XSr

Authtoken saved to configuration file: /root/.config/ngrok/ngrok.yml


In [42]:
!pkill -f ngrok


In [43]:
from pyngrok import ngrok

# Kill any existing tunnels
ngrok.kill()

# Start Streamlit in the background
get_ipython().system_raw('streamlit run /content/app.py &')

# Connect ngrok (explicitly set proto to http)
public_url = ngrok.connect(addr=8501, proto="http")
print("🔗 Public URL:", public_url)




🔗 Public URL: NgrokTunnel: "https://9a2ff79d5250.ngrok-free.app" -> "http://localhost:8501"


In [45]:
!zip -r news_sentiment_model.zip news_sentiment_model/


  adding: news_sentiment_model/ (stored 0%)
  adding: news_sentiment_model/stages/ (stored 0%)
  adding: news_sentiment_model/stages/0_Tokenizer_a3c2e0091d7b/ (stored 0%)
  adding: news_sentiment_model/stages/0_Tokenizer_a3c2e0091d7b/metadata/ (stored 0%)
  adding: news_sentiment_model/stages/0_Tokenizer_a3c2e0091d7b/metadata/._SUCCESS.crc (stored 0%)
  adding: news_sentiment_model/stages/0_Tokenizer_a3c2e0091d7b/metadata/.part-00000.crc (stored 0%)
  adding: news_sentiment_model/stages/0_Tokenizer_a3c2e0091d7b/metadata/_SUCCESS (stored 0%)
  adding: news_sentiment_model/stages/0_Tokenizer_a3c2e0091d7b/metadata/part-00000 (deflated 33%)
  adding: news_sentiment_model/stages/3_StringIndexer_3e5ee1261095/ (stored 0%)
  adding: news_sentiment_model/stages/3_StringIndexer_3e5ee1261095/data/ (stored 0%)
  adding: news_sentiment_model/stages/3_StringIndexer_3e5ee1261095/data/._SUCCESS.crc (stored 0%)
  adding: news_sentiment_model/stages/3_StringIndexer_3e5ee1261095/data/part-00000-9832d494-