### Install dependencies

In [0]:
%python
%pip install nltk

In [0]:
%python
%restart_python

### Import Required Libraries

In [0]:
%python
import requests
from bs4 import BeautifulSoup
from pyspark.sql.functions import col, current_timestamp, when, udf, length, abs, struct, lit
from pyspark.sql.types import StringType, StructType, StructField, FloatType, DoubleType
from nltk.sentiment.vader import SentimentIntensityAnalyzer
import nltk

try:
    nltk.data.find('sentiment/vader_lexicon.zip')
except LookupError:
    nltk.download('vader_lexicon')

### Read Parameters from Databricks Widget

In [0]:
%python
dbutils.widgets.text("catalog", "stock_analytics_dev")
dbutils.widgets.text("start_date", "")
dbutils.widgets.text("end_date", "")
 
START_DATE = dbutils.widgets.get("start_date")
END_DATE = dbutils.widgets.get("end_date")
CATALOG = dbutils.widgets.get("catalog")

SOURCE_TABLE = f"{CATALOG}.silver.int_news_union"
target_df = spark.table("raw.finviz.news_scores_data").filter(col('audit_loaded_at').between(START_DATE, END_DATE))

### Filter Source Data and Identify New Records

In [0]:
%python
source_df = spark.table(SOURCE_TABLE).filter(col('audit_loaded_at').between(START_DATE, END_DATE))
df = source_df.alias('src') \
              .join(target_df, on=['sk_news_article', 'audit_loaded_at'], how='left_anti') 

### User Defined Functions

#### Fetch and Parse Article Content UDF

In [0]:
%python
def fetch_article(url):
    try:
        r = requests.get(url, timeout=10)
        soup = BeautifulSoup(r.text, "html.parser")
        text = " ".join(p.get_text() for p in soup.find_all("p"))
        return text
    except Exception:
        return None
    
fetch_udf = udf(fetch_article, StringType())

#### VADER Sentiment Analysis UDF

In [0]:
%python
analyzer = SentimentIntensityAnalyzer()

def vader_sentiment(text):
    if text is None:
        return (0.0, 0.0, 0.0, 0.0)
    scores = analyzer.polarity_scores(text)
    return (
        scores["neg"],
        scores["neu"],
        scores["pos"],
        scores["compound"]
    )

schema = StructType([
    StructField("rt_negative_sentiment_score", DoubleType()),
    StructField("rt_neutral_sentiment_score", DoubleType()),
    StructField("rt_positive_sentiment_score", DoubleType()),
    StructField("rt_compound_sentiment_score", DoubleType()),

])

sentiment_udf = udf(vader_sentiment, schema)

### Fetch Article Content and Compute Sentiment Metrics

In [0]:
%python
df = (
    df
    .withColumn("desc_article_text", fetch_udf(col("desc_news_url")))
    .withColumn(
        "audit_fetch_status",
        when(col("desc_article_text").isNull() | (length(col("desc_article_text")) < 1000), "Failed").otherwise("Success")
    )
    .withColumn("audit_fetched_at", current_timestamp())
)

In [0]:
%python
df = (
    df
    .withColumn(
          "sentiment", 
          when(col("audit_fetch_status") == 'Success', sentiment_udf(col("desc_article_text")))
         .otherwise(struct(lit(0.0).alias("rt_negative_sentiment_score"), lit(0.0).alias("rt_neutral_sentiment_score"), lit(0.0).alias("rt_positive_sentiment_score"), lit(0.0).alias("rt_compound_sentiment_score")))
    )
    .withColumn(
         "desc_sentiment_score",
         when(col("sentiment.rt_compound_sentiment_score") < -0.05, "Negative")
        .when(col("sentiment.rt_compound_sentiment_score") > 0.05, "Positive")
        .otherwise("Neutral")
    )
    .withColumn("rt_sentiment_score_certainty", abs(col("sentiment.rt_compound_sentiment_score")) * (1 - col("sentiment.rt_neutral_sentiment_score")))
    .withColumn("audit_created_at_new", current_timestamp())
    .select(
        'sk_news_article',
        'desc_news_title',
        'desc_news_source',
        'dt_news_posting',
        'desc_news_url', 
        'desc_news_category',
        'desc_news_related_tickers',
        'desc_article_text',
        'sentiment.rt_negative_sentiment_score',
        'sentiment.rt_neutral_sentiment_score',
        'sentiment.rt_positive_sentiment_score',
        'sentiment.rt_compound_sentiment_score',
        'desc_sentiment_score',
        'rt_sentiment_score_certainty',
        'audit_fetch_status',
        'audit_fetched_at',
        'audit_loaded_at',
        col('audit_created_at_new').alias('audit_created_at')
    )
)

### Write Data to Delta Table

In [0]:
%python
df.write.format("delta").mode("append").saveAsTable("raw.finviz.news_scores_data")