In [7]:
## 📂 Upload Required Files (ZIPs)
# Run this cell to upload:
# 1. MasterDictionary.zip
# 2. StopWords.zip
# 3. URL Text Analysis.zip (with .txt article files)

from google.colab import files
import zipfile
import os

# === Upload MasterDictionary.zip ===
print("Upload MasterDictionary.zip")
uploaded = files.upload()
md_zip = list(uploaded.keys())[0]
zipfile.ZipFile(md_zip, 'r').extractall("MasterDictionary")

# === Upload StopWords.zip ===
print("Upload StopWords.zip")
uploaded = files.upload()
sw_zip = list(uploaded.keys())[0]
zipfile.ZipFile(sw_zip, 'r').extractall("StopWords")

# === Upload URL Text Analysis.zip ===
print("Upload URL Text Analysis.zip")
uploaded = files.upload()
uta_zip = list(uploaded.keys())[0]
zipfile.ZipFile(uta_zip, 'r').extractall("URL_Text_Analysis")

# === Set paths for use in the notebook ===
master_dict_path = "/content/MasterDictionary"
stopwords_path   = "/content/StopWords"
articles_path    = "/content/URL_Text_Analysis"

print("\n✅ Paths set:")
print("Master Dictionary Path:", master_dict_path)
print("StopWords Path:", stopwords_path)
print("Articles Path:", articles_path)


Upload MasterDictionary.zip


Saving MasterDictionary.zip to MasterDictionary (1).zip
Upload StopWords.zip


Saving StopWords.zip to StopWords (1).zip
Upload URL Text Analysis.zip


Saving URL Text Analysis.zip to URL Text Analysis (1).zip

✅ Paths set:
Master Dictionary Path: /content/MasterDictionary
StopWords Path: /content/StopWords
Articles Path: /content/URL_Text_Analysis



# Blackcoffer Data Extraction & NLP Analysis

This notebook implements the **Data Extraction and NLP** assignment as per the Blackcoffer requirements.

## **Objective**
1. **Extract** article titles and content from URLs provided in `Input.xlsx`.
2. **Analyze** each article to compute:
   - Sentiment metrics (Positive Score, Negative Score, Polarity, Subjectivity)
   - Readability metrics (Average Sentence Length, % Complex Words, Fog Index)
   - Other metrics (Word Count, Syllables per Word, Personal Pronouns, Avg Word Length)
3. **Save** results in the exact format of `Output Data Structure.xlsx`.

## **Outputs**
- `output.xlsx` and `output.csv` containing all computed metrics.
- `{URL_ID}.txt` for each article's text.
- `failed_urls.txt` containing any URLs that could not be processed.

You can run this notebook locally or in **Google Colab**.


## 1. Install Required Packages

In [8]:

!pip install pandas openpyxl requests beautifulsoup4 nltk tqdm




## 2. Import Libraries and Setup

In [13]:

import os
import re
import time
import logging
from pathlib import Path

import pandas as pd
import requests
from bs4 import BeautifulSoup
import nltk
from nltk.tokenize import word_tokenize, sent_tokenize
from nltk.corpus import stopwords
from tqdm import tqdm

# Download all required NLTK data
nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)
nltk.download('wordnet', quiet=True)  # Often needed for text processing
nltk.download('averaged_perceptron_tagger', quiet=True)
nltk.download('punkt_tab', quiet=True)  # Specifically for the missing resource

# Verify downloads
try:
    nltk.data.find('tokenizers/punkt')
    nltk.data.find('tokenizers/punkt_tab/english')
    print("✅ NLTK resources successfully downloaded")
except LookupError as e:
    print("❌ NLTK resources still missing:", str(e))

# Logging setup
logging.basicConfig(level=logging.INFO, format='%(asctime)s [%(levelname)s] %(message)s')
logger = logging.getLogger('blackcoffer_nlp')


✅ NLTK resources successfully downloaded


In [14]:
# NLTK Resource Management
def setup_nltk():
    import nltk
    required_nltk = ['punkt', 'stopwords', 'wordnet', 'averaged_perceptron_tagger', 'punkt_tab']

    for resource in required_nltk:
        try:
            nltk.data.find(f'tokenizers/{resource}' if resource == 'punkt' else resource)
        except LookupError:
            nltk.download(resource, quiet=True)

    print("NLTK resources verified")

setup_nltk()

NLTK resources verified


## 3. Load Wordlists and Stopwords

In [15]:

def load_wordlists(master_dir='MasterDictionary', stopwords_dir='StopWords'):
    pos = set()
    neg = set()
    sw = set()

    mdir = Path(master_dir)
    if mdir.exists():
        for f in mdir.glob('*.txt'):
            name = f.stem.lower()
            try:
                text = f.read_text(encoding='utf-8', errors='ignore')
                words = [w.strip().lower() for w in re.split(r'\s+', text) if w.strip()]
                if 'positive' in name or 'pos' in name:
                    pos.update(words)
                elif 'negative' in name or 'neg' in name:
                    neg.update(words)
            except Exception as e:
                logger.warning(f'Could not read {f}: {e}')

    sdir = Path(stopwords_dir)
    if sdir.exists():
        for f in sdir.glob('*.txt'):
            try:
                text = f.read_text(encoding='utf-8', errors='ignore')
                words = [w.strip().lower() for w in re.split(r'\s+', text) if w.strip()]
                sw.update(words)
            except Exception as e:
                logger.warning(f'Could not read {f}: {e}')
    else:
        sw.update(stopwords.words('english'))

    if not pos:
        pos.update(['good','positive','growth','gain','increase','benefit','improve','strong','bullish','profit'])
    if not neg:
        neg.update(['bad','negative','loss','decline','drop','fall','weak','bearish','loss','risk'])

    return pos, neg, sw

pos_set, neg_set, stopword_set = load_wordlists()
logger.info(f"Loaded POS={len(pos_set)}, NEG={len(neg_set)}, Stopwords={len(stopword_set)}")


## 4. Helper Functions for Scraping and Analysis

In [16]:

HEADERS = {
    'User-Agent': 'Mozilla/5.0 (X11; Linux x86_64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/115.0 Safari/537.36'
}

def fetch_url(url, timeout=10):
    try:
        resp = requests.get(url, headers=HEADERS, timeout=timeout)
        resp.raise_for_status()
        return resp.text
    except Exception as e:
        logger.warning(f'Failed to fetch {url}: {e}')
        return None

def extract_main_text(html):
    soup = BeautifulSoup(html, 'html.parser')
    title_tag = soup.find('h1')
    title = title_tag.get_text(strip=True) if title_tag else ''

    candidates = []
    selectors = ['article', 'main']
    for tag in soup.find_all(True, attrs={'class': re.compile(r'(article|post|content|story|main)', re.I)}):
        candidates.append(tag)
    for tag in soup.find_all(True, attrs={'id': re.compile(r'(article|post|content|story|main)', re.I)}):
        candidates.append(tag)
    for sel in selectors:
        found = soup.find(sel)
        if found:
            candidates.insert(0, found)

    seen = set()
    uniq = []
    for c in candidates:
        if id(c) not in seen:
            uniq.append(c); seen.add(id(c))

    best = None
    best_len = 0
    for c in uniq:
        p_text = ' '.join([p.get_text(separator=' ', strip=True) for p in c.find_all('p')])
        if len(p_text) > best_len:
            best_len = len(p_text); best = c

    if best is None:
        body = soup.body or soup
        paras = []
        for p in body.find_all('p'):
            text = p.get_text(separator=' ', strip=True)
            if len(text) < 40:
                continue
            if p.find_parent(['nav','footer','header']):
                continue
            paras.append(text)
        article_text = '\n'.join(paras)
    else:
        article_text = ' '.join([p.get_text(separator=' ', strip=True) for p in best.find_all('p')])

    if not article_text or len(article_text) < 50:
        divs = soup.find_all('div')
        long_texts = sorted([d.get_text(separator=' ', strip=True) for d in divs if len(d.get_text(strip=True))>200], key=len, reverse=True)
        if long_texts:
            article_text = long_texts[0]

    if not title:
        og = soup.find('meta', property='og:title') or soup.find('meta', attrs={'name':'title'})
        if og and og.get('content'):
            title = og.get('content').strip()

    return title.strip(), article_text.strip()

VOWEL_RE = re.compile(r'[aeiouy]+', re.I)

def count_syllables(word):
    word = word.lower()
    groups = re.findall(VOWEL_RE, word)
    syll = len(groups)
    if word.endswith('e') and not word.endswith(('le','ue')) and syll>1:
        syll -= 1
    if (word.endswith('es') or word.endswith('ed')) and syll>1:
        syll -= 1
    return max(1, syll)

def is_complex_word(word):
    return count_syllables(word) > 2

def clean_tokens(text, stopword_set):
    tokens = [t for t in word_tokenize(text)]
    cleaned = []
    for t in tokens:
        if re.fullmatch(r'\W+', t):
            continue
        lw = t.lower()
        if lw in stopword_set:
            continue
        lw = re.sub(r'^[^a-zA-Z0-9]+|[^a-zA-Z0-9]+$', '', lw)
        if lw:
            cleaned.append(lw)
    return cleaned

def personal_pronouns_count(text):
    matches = re.findall(r'\b(I|we|my|ours|us)\b', text, flags=re.I)
    filtered = [m for m in matches if m.lower() != 'us']
    return len(filtered)

def analyze_text(text, pos_set, neg_set, stopword_set):
    sentences = sent_tokenize(text)
    cleaned_words = clean_tokens(text, stopword_set)
    word_count = len(cleaned_words)
    sentence_count = max(1, len(sentences))

    pos_score = sum(1 for w in cleaned_words if w in pos_set)
    neg_score = sum(1 for w in cleaned_words if w in neg_set)
    polarity = (pos_score - neg_score) / ((pos_score + neg_score) + 0.000001)
    subjectivity = (pos_score + neg_score) / (word_count + 0.000001)

    avg_sentence_len = sum(len(word_tokenize(s)) for s in sentences) / sentence_count
    complex_words = sum(1 for w in cleaned_words if is_complex_word(w))
    pct_complex_words = complex_words / (word_count + 0.000001)
    fog_index = 0.4 * (avg_sentence_len + pct_complex_words * 100)
    avg_words_per_sentence = word_count / sentence_count

    syllables_per_word = sum(count_syllables(w) for w in cleaned_words) / (word_count + 0.000001)
    pp = personal_pronouns_count(text)
    avg_word_length = sum(len(w) for w in cleaned_words) / (word_count + 0.000001)

    return {
        'POSITIVE SCORE': pos_score,
        'NEGATIVE SCORE': neg_score,
        'POLARITY SCORE': polarity,
        'SUBJECTIVITY SCORE': subjectivity,
        'AVG SENTENCE LENGTH': avg_sentence_len,
        'PERCENTAGE OF COMPLEX WORDS': pct_complex_words,
        'FOG INDEX': fog_index,
        'AVG NUMBER OF WORDS PER SENTENCE': avg_words_per_sentence,
        'COMPLEX WORD COUNT': complex_words,
        'WORD COUNT': word_count,
        'SYLLABLE PER WORD': syllables_per_word,
        'PERSONAL PRONOUNS': pp,
        'AVG WORD LENGTH': avg_word_length
    }


## 5. Main Pipeline Execution

In [17]:

input_file = 'Input.xlsx'
output_file = 'output.xlsx'
articles_dir = 'articles'
failed_log = 'failed_urls.txt'

os.makedirs(articles_dir, exist_ok=True)

df_in = pd.read_excel(input_file)

results = []
failed = []

for idx, row in tqdm(df_in.iterrows(), total=len(df_in), desc="Processing URLs"):
    url = str(row['URL']).strip()
    url_id = str(row['URL_ID']).strip()
    html = fetch_url(url)
    if not html:
        failed.append(url)
        continue
    title, article_text = extract_main_text(html)
    txt_path = Path(articles_dir) / f'{url_id}.txt'
    with open(txt_path, 'w', encoding='utf-8') as fw:
        fw.write(title + '\n\n' + article_text)
    full_text = (title + '\n' + article_text).strip()
    metrics = analyze_text(full_text, pos_set, neg_set, stopword_set)
    out_row = row.to_dict()
    out_row.update(metrics)
    results.append(out_row)
    time.sleep(0.3)

if failed:
    with open(failed_log, 'w', encoding='utf-8') as ff:
        ff.write('\n'.join(failed))

metric_cols = ['POSITIVE SCORE','NEGATIVE SCORE','POLARITY SCORE','SUBJECTIVITY SCORE','AVG SENTENCE LENGTH',
               'PERCENTAGE OF COMPLEX WORDS','FOG INDEX','AVG NUMBER OF WORDS PER SENTENCE','COMPLEX WORD COUNT',
               'WORD COUNT','SYLLABLE PER WORD','PERSONAL PRONOUNS','AVG WORD LENGTH']

out_df = pd.DataFrame(results)
final_cols = list(df_in.columns) + metric_cols
out_df = out_df.loc[:, final_cols]
out_df.to_excel(output_file, index=False)
out_df.to_csv(Path(output_file).with_suffix('.csv'), index=False)

logger.info(f'Output saved to {output_file} and CSV version. Failed URLs: {len(failed)}')
out_df.head()


Processing URLs: 100%|██████████| 100/100 [02:26<00:00,  1.46s/it]


Unnamed: 0,URL_ID,URL,POSITIVE SCORE,NEGATIVE SCORE,POLARITY SCORE,SUBJECTIVITY SCORE,AVG SENTENCE LENGTH,PERCENTAGE OF COMPLEX WORDS,FOG INDEX,AVG NUMBER OF WORDS PER SENTENCE,COMPLEX WORD COUNT,WORD COUNT,SYLLABLE PER WORD,PERSONAL PRONOUNS,AVG WORD LENGTH
0,blackassign0001,https://insights.blackcoffer.com/rising-it-cit...,7,0,1.0,0.012346,22.275862,0.195767,16.741033,19.551724,111,567,1.738977,5,5.160494
1,blackassign0002,https://insights.blackcoffer.com/rising-it-cit...,13,2,0.733333,0.009152,23.209877,0.218426,18.020985,20.234568,358,1639,1.818792,4,5.527761
2,blackassign0003,https://insights.blackcoffer.com/internet-dema...,3,1,0.5,0.003123,24.533333,0.300546,21.835191,21.35,385,1281,2.033568,13,6.128806
3,blackassign0004,https://insights.blackcoffer.com/rise-of-cyber...,2,2,0.0,0.003157,26.581818,0.286504,22.092869,23.036364,363,1267,1.981847,6,5.98895
4,blackassign0005,https://insights.blackcoffer.com/ott-platform-...,2,0,1.0,0.00223,23.604651,0.218506,18.182106,20.860465,196,897,1.810479,7,5.692308
