# Data Extraction

Let's get going with the heavy lifting. Carrying over from the `1_eda` notebook: 

1. Run through the corpus with our classified English domain list
2. Filter out `other` domains that are most likely not `news` channels.
3. From the individual webpage, extract `title` `description` and `keywords` if there are any and the `body` content. 
4. Create a Data Frame to do more NLP
5. Do all this in a parallel format with `Spark`


In [1]:
import os
import pandas as pd
import sys

from loguru import logger
from lxml.html.clean import Cleaner
from lxml.html import fromstring, HTMLParser
from pathlib import Path
from time import time, strftime, gmtime
from urllib.parse import urlparse
from warcio.archiveiterator import ArchiveIterator


In [2]:
# Configuring the logger
config = {"handlers": [{"sink": sys.stdout,"colorize": True,
          "format": "<green>{time:YYYY-MM-DD HH:mm:ss}</green> | <level>{level: <8}</level> | <cyan>{name}</cyan>:<cyan>{function}</cyan>:<cyan>{line}</cyan> - <level>{message}</level>"}]}
logger.configure(**config)

[1]

In [3]:
# Data Folder
data_folder = Path.home() / 'Data' / 'cc_news'

In [4]:
# Loading our domains
df = pd.read_csv(data_folder / 'domains' / 'domains_labeled_complete.csv')
print(f"Shape of the file: {df.shape}\n")
print(f"Class Table:\n {df.label.value_counts()}\n")
df.head()

Shape of the file: (5063, 7)

Class Table:
 news              3688
other              462
financial_news     341
tech_news          200
sports             191
entertainment      181
Name: label, dtype: int64



Unnamed: 0,label,domain,title,description,city,country,title_desc
0,entertainment,www.thesun.co.uk,"News, sport, celebrities and gossip | The Sun","Get the latest news, exclusives, sport, celebr...",,,"News, sport, celebrities and gossip | The Sun...."
1,entertainment,parade.com,Parade,"Your daily destination for celebrity news, vid...",,,Parade. Your daily destination for celebrity n...
2,entertainment,usweekly.com,USweekly,,,,USweekly.
3,entertainment,www.whatsontv.co.uk,TV guide to top UK TV and soap spoilers | What...,"What’s on TV is the best guide to UK TV, news,...",,,TV guide to top UK TV and soap spoilers | What...
4,entertainment,www.eonline.com,"Entertainment News, Celebrity Gossip, Celebrit...","Your source for entertainment news, celebritie...",,,"Entertainment News, Celebrity Gossip, Celebrit..."


In [5]:
# Get file names
files = [str(x) for x in data_folder.glob('*') if x.is_file()]

We loaded our domains, set up, got our file names. 

Before we get into parallelization with Spark, let's try to write the function and apply it on one file: 

In [7]:
def extractor_test(uri, doms):
    # Specificy parser to remove blanks
    parser = HTMLParser(remove_blank_text=True)
    # Instantiate Cleaner to strip scripts, styles and links for our purposes
    cleaner = Cleaner(scripts=True, javascript=True, style=True, links=True)
    
    # get bad domains
    news_doms = doms[doms.label!='other'].domain.tolist()
    with open(uri, 'rb') as stream:
        for i, record in enumerate(ArchiveIterator(stream)):
            if record.rec_type == 'response':
                warc_uri = record.rec_headers.get_header('WARC-Target-URI')
                domain = urlparse(warc_uri).netloc
                
                if domain in news_doms:
                    domain_cat = doms[doms.domain == domain].label
                    content = record.content_stream().read()
                    try:
                        tree = fromstring(content, parser=parser)
                        title = tree.xpath("//title/text()")
                        title = title[0] if title else None
                        description = tree.xpath("//meta[@name='description']/@content")
                        description = description[0] if description else None
                        keywords = tree.xpath("//meta[@name='keywords']/@content")
                        news_keywords = tree.xpath("//meta[@name='news_keywords']/@content")
                        
                        tree = cleaner.clean_html(tree)
                        text = tree.text_content().strip()
                        text = " ".join(text.split())
                    
                    except TypeError:
                        
                        pass
                    print(f"{warc_uri}")
                    print(f"{domain_cat}")
                    print(f"{title}")
                    print(f"{description}")
                    print(f"{keywords}")
                    print(f"{news_keywords}\n")

            if i == 10:
                break

In [8]:
extractor_test(files[0], df)

https://chicago.cbslocal.com/2019/12/18/man-stabbed-after-his-granddaughter-three-other-teens-enter-rob-his-home/
1111    sports
Name: label, dtype: object
Man Stabbed After His Granddaughter And Three Other Teens Enter, Rob His Home – CBS Chicago
A man's granddaughter is one of four teens accused of stabbing him in the neck and stealing his car before crashing it on Monday night.
['grandfather, granddaughter, teen, teens, home invasion, house, stabbed, assault, assaulted, robbery, car, wallet, cellphone, hazel dell, washington, seaside, oregon, police, pursuit']
['grandfather, granddaughter, teen, teens, home invasion, house, stabbed, assault, assaulted, robbery, car, wallet, cellphone, hazel dell, washington, seaside, oregon, police, pursuit']

http://www.releasewire.com/press-releases/release-9742.htm
31    news
Name: label, dtype: object
Commence Industrial CRM Achieves Sales Optimization | Dec 21, 2006 - ReleaseWire
None
[]
[]

https://www.kark.com/news/national-news/1-dead-2-inju

#### Commentary
Looking not bad. We have a slight problem, on the eonline.com entertainment page, we got some Spanish.  As expected, some news sites operate in several countries and may publish news in several languages accordingly.  Looking at that page, we can see the language tag is `es-Ar` instead of doing a full blown language detection one more time, let's just check html/lang tag. 

In [9]:
def extractor_test(uri, doms):
    # Specificy parser to remove blanks
    parser = HTMLParser(remove_blank_text=True)
    # Instantiate Cleaner to strip scripts, styles and links for our purposes
    cleaner = Cleaner(scripts=True, javascript=True, style=True, links=True)
    
    # get bad domains
    news_doms = doms[doms.label!='other'].domain.tolist()
    with open(uri, 'rb') as stream:
        for i, record in enumerate(ArchiveIterator(stream)):
            if record.rec_type == 'response':
                warc_uri = record.rec_headers.get_header('WARC-Target-URI')
                domain = urlparse(warc_uri).netloc
                
                if domain in news_doms:
                    #df.at[0,'A']
                    domain_cat = doms[doms.domain == domain].label.values[0]
                    content = record.content_stream().read()
                    tree = fromstring(content, parser=parser)
                    try:
                        parsed_lang = tree.get("lang")
                    except TypeError:
                        parsed_lang = None
                    if parsed_lang is None or parsed_lang[:2].lower() ==  'en':
                        
                        try:
                            title = tree.xpath("//title/text()")
                            title = title[0] if title else None
                            description = tree.xpath("//meta[@name='description']/@content")
                            description = description[0] if description else None
                            keywords = tree.xpath("//meta[@name='keywords']/@content")
                            news_keywords = tree.xpath("//meta[@name='news_keywords']/@content")

                            tree = cleaner.clean_html(tree)
                            text = tree.text_content().strip()
                            text = " ".join(text.split())
                    
                        except:
                            pass
                    print(f"{warc_uri}")
                    print(f"{domain_cat}")
                    print(f"{title}")
                    print(f"{description}")
                    print(f"{keywords}")
                    print(f"{news_keywords}\n")
                    print(f"{text}\n")

            if i == 5:
                break

In [10]:
extractor_test(files[0], df)

https://chicago.cbslocal.com/2019/12/18/man-stabbed-after-his-granddaughter-three-other-teens-enter-rob-his-home/
sports
Man Stabbed After His Granddaughter And Three Other Teens Enter, Rob His Home – CBS Chicago
A man's granddaughter is one of four teens accused of stabbing him in the neck and stealing his car before crashing it on Monday night.
['grandfather, granddaughter, teen, teens, home invasion, house, stabbed, assault, assaulted, robbery, car, wallet, cellphone, hazel dell, washington, seaside, oregon, police, pursuit']
['grandfather, granddaughter, teen, teens, home invasion, house, stabbed, assault, assaulted, robbery, car, wallet, cellphone, hazel dell, washington, seaside, oregon, police, pursuit']

Man Stabbed After His Granddaughter And Three Other Teens Enter, Rob His Home – CBS Chicago Menu Videos Drugstore Fencing Ring Stolen GoodsMerchandise recovered from a fencing ring that took in stolen goods from drugstores and sold them online. (Credit: Wilmette Police)1 hour a

Fixed. Looking much better. Let's write generator to stream: 

In [14]:
def extractor_test_yield(uri, doms):
    # Specificy parser to remove blanks
    parser = HTMLParser(remove_blank_text=True)
    # Instantiate Cleaner to strip scripts, styles and links for our purposes
    cleaner = Cleaner(scripts=True, javascript=True, style=True, links=True)
    
    # get bad domains
    news_doms = doms[doms.label!='other'].domain.tolist()
    with open(uri, 'rb') as stream:
        for i, record in enumerate(ArchiveIterator(stream)):
            if record.rec_type == 'response':
                warc_uri = record.rec_headers.get_header('WARC-Target-URI')
                domain = urlparse(warc_uri).netloc
                
                if domain in news_doms:
                    #df.at[0,'A']
                    domain_cat = doms[doms.domain == domain].label.values[0]
                    content = record.content_stream().read()
                    tree = fromstring(content, parser=parser)
                    try:
                        parsed_lang = tree.get("lang")
                    except TypeError:
                        parsed_lang = None
                    if parsed_lang is None or parsed_lang[:2].lower() ==  'en':
                        
                        try:
                            title = tree.xpath("//title/text()")
                            title = title[0] if title else None
                            description = tree.xpath("//meta[@name='description']/@content")
                            description = description[0] if description else None
                            keywords = tree.xpath("//meta[@name='keywords']/@content")
                            news_keywords = tree.xpath("//meta[@name='news_keywords']/@content")

                            tree = cleaner.clean_html(tree)
                            text = tree.text_content().strip()
                            text = " ".join(text.split())
                    
                        except:
                            pass
                    yield [warc_uri, domain_cat, title, description, keywords, news_keywords, text]

            if i == 10:
                break

In [15]:
some_results = [result for result in extractor_test_yield(files[0], df)]

In [16]:
some_results

[['https://chicago.cbslocal.com/2019/12/18/man-stabbed-after-his-granddaughter-three-other-teens-enter-rob-his-home/',
  'sports',
  'Man Stabbed After His Granddaughter And Three Other Teens Enter, Rob His Home – CBS Chicago',
  "A man's granddaughter is one of four teens accused of stabbing him in the neck and stealing his car before crashing it on Monday night.",
  ['grandfather, granddaughter, teen, teens, home invasion, house, stabbed, assault, assaulted, robbery, car, wallet, cellphone, hazel dell, washington, seaside, oregon, police, pursuit'],
  ['grandfather, granddaughter, teen, teens, home invasion, house, stabbed, assault, assaulted, robbery, car, wallet, cellphone, hazel dell, washington, seaside, oregon, police, pursuit'],
  "Man Stabbed After His Granddaughter And Three Other Teens Enter, Rob His Home – CBS Chicago Menu Videos Drugstore Fencing Ring Stolen GoodsMerchandise recovered from a fencing ring that took in stolen goods from drugstores and sold them online. (Cred

#### Commentary
Looks Good. Now let's run it on the whole file

In [17]:
def extractor(uri, doms):
    # Specificy parser to remove blanks
    parser = HTMLParser(remove_blank_text=True)
    # Instantiate Cleaner to strip scripts, styles and links for our purposes
    cleaner = Cleaner(scripts=True, javascript=True, style=True, links=True)
    
    # get bad domains
    news_doms = doms[doms.label!='other'].domain.tolist()
    with open(uri, 'rb') as stream:
        for i, record in enumerate(ArchiveIterator(stream)):
            if record.rec_type == 'response':
                warc_uri = record.rec_headers.get_header('WARC-Target-URI')
                domain = urlparse(warc_uri).netloc
                
                if domain in news_doms:
                    domain_cat = doms[doms.domain == domain].label.values[0]
                    content = record.content_stream().read()
                    tree = fromstring(content, parser=parser)
                    try:
                        parsed_lang = tree.get("lang")
                    except TypeError:
                        parsed_lang = None
                    if parsed_lang is None or parsed_lang[:2].lower() ==  'en':
                        
                        try:
                            title = tree.xpath("//title/text()")
                            title = title[0] if title else None
                            description = tree.xpath("//meta[@name='description']/@content")
                            description = description[0] if description else None
                            keywords = tree.xpath("//meta[@name='keywords']/@content")
                            news_keywords = tree.xpath("//meta[@name='news_keywords']/@content")

                            tree = cleaner.clean_html(tree)
                            text = tree.text_content().strip()
                            text = " ".join(text.split())
                    
                        except:
                            pass
                    yield [warc_uri, domain_cat, title, description, keywords, news_keywords, text]

In [18]:
start_time = time()
first_file_data = [result for result in extractor(files[0], df)]
elapsed = strftime("%H:%M:%S", gmtime(time() - start_time))
logger.info(f'It took {elapsed} to run this script!')
os.system("play /usr/share/sounds/sound-icons/trumpet-1.wav")

[32m2020-01-08 17:27:20[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m4[0m - [1mIt took 00:01:34 to run this script![0m


In [None]:
len(first_file_data)

In [20]:
first_file_data[0]

['https://chicago.cbslocal.com/2019/12/18/man-stabbed-after-his-granddaughter-three-other-teens-enter-rob-his-home/',
 'sports',
 'Man Stabbed After His Granddaughter And Three Other Teens Enter, Rob His Home – CBS Chicago',
 "A man's granddaughter is one of four teens accused of stabbing him in the neck and stealing his car before crashing it on Monday night.",
 ['grandfather, granddaughter, teen, teens, home invasion, house, stabbed, assault, assaulted, robbery, car, wallet, cellphone, hazel dell, washington, seaside, oregon, police, pursuit'],
 ['grandfather, granddaughter, teen, teens, home invasion, house, stabbed, assault, assaulted, robbery, car, wallet, cellphone, hazel dell, washington, seaside, oregon, police, pursuit'],
 "Man Stabbed After His Granddaughter And Three Other Teens Enter, Rob His Home – CBS Chicago Menu Videos Drugstore Fencing Ring Stolen GoodsMerchandise recovered from a fencing ring that took in stolen goods from drugstores and sold them online. (Credit: Wil

## Leveraging Spark 
The first file took a minute and half. We can easily parallelize this function with `Pool` from `multiprocessing` package and run it in parallel, should run under 5 mins or so. However, we will leverage `Spark`'s power to do even more. 

Here we will run Spark as standalone. But we have to change our extraction function and wrap it around with `decorator`

In [21]:
# Decorator
# Wraps around the data extractor function and modifies it to return a 
# function which has the row/record of the Warc file as an argument
def get_data_yield(f):
    def func(warc_files):
        for warc_file in warc_files:
            with open(warc_file, 'rb') as stream:
                try:
                    for record in ArchiveIterator(stream):
                        warc_uri = record.rec_headers.get_header('WARC-Target-URI')
                        domain = urlparse(warc_uri).netloc
                        if domain in doms_dict_bc.value.keys():
                            yield from f(record)
                except ArchiveLoadFailed:
                    pass
    return func

In [22]:
@get_data_yield
def extractor_spark(record):
    warc_url = record.rec_headers.get_header('WARC-Target-URI')
    dom_cat = doms_dict_bc.value[urlparse(warc_url).netloc]
    parser = HTMLParser(remove_blank_text=True)
    cleaner = Cleaner(scripts=True, javascript=True, style=True, links=True)
    content = record.content_stream().read()
    
    try:
        tree = fromstring(content, parser=parser)
        parsed_lang = tree.get("lang")
    
        if parsed_lang[:2].lower() ==  'en':
            try:
                title = tree.xpath("//title/text()")
                title = str(title[0]) if title else None
                description = tree.xpath("//meta[@name='description']/@content")
                description = str(description[0]) if description else None
                keywords = tree.xpath("//meta[@name='keywords']/@content")
                keywords = str(keywords) if keywords else None
                news_keywords = tree.xpath("//meta[@name='news_keywords']/@content")
                news_keywords = str(news_keywords) if news_keywords else None
                try:
                    tree = cleaner.clean_html(tree)
                    text = tree.text_content().strip()
                    text = " ".join(text.split())[:2500]  # We truncate because a) we don' need 
                                                          # the whole document for training our models
                                                          # b) Can get Java Heap Size errors
                except EOFError:
                    text = None
                yield warc_url, dom_cat, title, description, keywords, news_keywords, text
            except:
                pass
                # We could catch these guys but I decided to pass them for the time being 
                #yield warc_url, None, None, None, None, None
    except:
        pass
        #yield warc_url, None, None, None, None, None
        

In [24]:
# Kill Spark Context if we have one running already
try:
    sc
except NameError:
    pass
else:
    sc.stop()

In [25]:
# Instantiate our Spark Context and SQL Sessions and configure them
import pyspark
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType

conf = SparkConf().setAppName("cc_news").set("spark.driver.memory","8g")
SparkContext.setSystemProperty('spark.executor.memory', '2g')
SparkContext.setSystemProperty('spark.driver.maxResultSize', '12g')
sc = pyspark.SparkContext(conf=conf)
sql_context = SparkSession(sc)

In [26]:
# Get our current configuration 
sc._conf.getAll()

[('spark.executor.memory', '2g'),
 ('spark.app.name', 'cc_news'),
 ('spark.rdd.compress', 'True'),
 ('spark.driver.memory', '8g'),
 ('spark.driver.port', '44009'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.app.id', 'local-1578523088650'),
 ('spark.master', 'local[*]'),
 ('spark.executor.id', 'driver'),
 ('spark.submit.deployMode', 'client'),
 ('spark.driver.maxResultSize', '12g'),
 ('spark.driver.host', 'ozanix'),
 ('spark.ui.showConsoleProgress', 'true')]

In [27]:
# We will broadcast our domains and domain categories to all executors
domains = pd.read_csv(data_folder / "domains" / 'domains_labeled_complete.csv')
doms_dict = domains[domains.label != 'other'].set_index('domain').to_dict()['label']
doms_dict_bc = sc.broadcast(doms_dict)

In [28]:
# Parallelize Warc Files to our Cores
warcs = sc.parallelize([str(f) for f in files])

In [29]:
# Map our function to our different datasets
results = warcs.mapPartitions(extractor_spark)

In [30]:
# Create a Schema manually so that Spark does not have to guess
schema = StructType([
        StructField("url", StringType(), True),
        StructField("dom_cat", StringType(), True),
        StructField("title", StringType(), True),
        StructField("description", StringType(), True),
        StructField("keywords", StringType(), True),
        StructField("news_keywords", StringType(), True),
        StructField("text", StringType(), True)
    ])

In [31]:
# And run!
start_time = time()
df_final = sql_context.createDataFrame(results, schema)
df_final = df_final.toPandas()
elapsed = strftime("%H:%M:%S", gmtime(time() - start_time))
logger.info(f'It took {elapsed} to run this script!')
os.system("play /usr/share/sounds/sound-icons/trumpet-1.wav ") # Beeps an alert. May not run on your machine

[32m2020-01-08 17:45:15[0m | [1mINFO    [0m | [36m__main__[0m:[36m<module>[0m:[36m6[0m - [1mIt took 00:03:49 to run this script![0m


0

### Comment:
Ran under 4 minutes and we got 160K+ documents!

In [36]:
# Inspect
for text in df_final.sample(2)['text']:
    print (text)

G Sosa- 'Timing' Top StoriesNewsRumorsVideosFeaturesMusicOpinionIC: TheReasonMusic NewsA3CAllHipHopNewsRumorsVideosFeaturesMusicOpinionIC: TheReasonMusic NewsA3CThe ScumfrogLoginHomeMusicG Sosa- 'Timing'byAllHipHop Staff3 days-editedNorth Carolina's own G Sosa returns with 'Timing' EP. (AllHipHop Music) Gregory Barnes, better known as G Sosa, was born and raised in eastern North Carolina and currently resides in the Raleigh, North Carolina area.The young star's music interest began at the age of 15 as he freestyled over industry beats. He truly started to take his craft seriously as an adult.He released his first single “Tears Away” earlier this year, and he released two additional singles in the summer of 2019, to be followed up by his latest EP, 'Timing.'G Sosa plans to be one of the next big break out artists by 2020. His new single “LLNU” is a pain record dedicated to his best friend that passed in 2019 and is growing on his fans as one of his best works. G's passion for music grew

In [37]:
df_final.head()

Unnamed: 0,url,dom_cat,title,description,keywords,news_keywords,text
0,https://chicago.cbslocal.com/2019/12/18/man-st...,sports,Man Stabbed After His Granddaughter And Three ...,A man's granddaughter is one of four teens acc...,"['grandfather, granddaughter, teen, teens, hom...","['grandfather, granddaughter, teen, teens, hom...",Man Stabbed After His Granddaughter And Three ...
1,http://www.releasewire.com/press-releases/rele...,news,Commence Industrial CRM Achieves Sales Optimiz...,,,,Commence Industrial CRM Achieves Sales Optimiz...
2,https://www.kark.com/news/national-news/1-dead...,news,"1 dead, 2 injured in Oregon shopping center st...",Multiple people were stabbed at the Murray Hil...,,,"1 dead, 2 injured in Oregon shopping center st..."
3,https://www.ii.co.uk/etfs/sg-wti-x2-daily-long...,financial_news,LSE:SG30 ETF Share Price | SG WTI X2 Daily Lon...,Real-time share price updates and latest news ...,,,LSE:SG30 ETF Share Price | SG WTI X2 Daily Lon...
4,http://www.peicanada.com/island_deaths/frances...,news,Frances Salsman | Island Deaths | peicanada.com,"SALSMAN, Frances (Fran) Townsend At the Prince...","['island_deaths, death_notices']","['island_deaths, death_notices']",Frances Salsman | Island Deaths | peicanada.co...


In [None]:
# Strip brackets and single quotes from keywords
df_final['keywords'] = df_final.keywords.str.replace("'", "")
df_final['keywords'] = df_final.keywords.str.strip('[]')

df_final['news_keywords'] = df_final.news_keywords.str.replace("'", "")
df_final['news_keywords'] = df_final.news_keywords.str.strip('[]')

In [68]:
# Save to disk
# In case we need to persist some data, let's create a folder under Home directory -> Data and call it cc_news
file_path = Path.home() / 'Data' / 'cc_news' / 'spark_output'
if not file_path.is_dir():
    Path.mkdir(file_path, parents=True, exist_ok=True)
df_final.to_csv(file_path / 'df_final.csv', header=True, index=False, na_rep="")

## End of Notebook
We could have instead saved the Spark `DataFrame` as a `parquet` file which would have been a lot more efficient and we could have extracted a lot wider text but we will continue in the `pandas` world for the next steps. 

1. Mainly, in Spark, we
2. Ran through the corpus with our classified English domain list
3. Filtered out `other` domains that are most likely not `news` channels.
4. From the individual webpage, extracted `title` `description` and `keywords` if there are any and the `body` content. 
5. Created a Data Frame to do more NLP

All this is not trivial work, but more to come.  For the next steps, we will

1. Create our final labels
2. Do some feature engineering 
3. Train BERT like models 
4. And classify our documents into their final categories. 
