In [4]:
import pandas as pd
import numpy as np

import matplotlib.pyplot as plt

import warnings
warnings.filterwarnings('ignore')

import tldextract
import urllib

import sys
sys.path.append("/scratch/venia/web2wiki/helpers")


In [5]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
import pyspark
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql import Row
import pyarrow.parquet as pq
import pyarrow as pa

from bs4 import BeautifulSoup

import os


from settings import DATA_DIR, EMBEDDING_DIR


os.environ['SPARK_HOME'] = "/home/veselovs/spark-3.2.1-bin-hadoop2.7"
os.environ['JAVA_HOME'] = "/home/veselovs/jdk-13.0.2"

config = pyspark.SparkConf().setAll([('spark.executor.memory', '50g'),
                                 ('spark.executor.cores', '40'),
                                 ('spark.driver.memory','50g'),
                                 ('spark.driver.maxResultSize','0'),
                                 ('spark.python.worker.memory', '5g'),
                                 ('spark.reducer.maxSizeInFlight','5g'),
                                 ('spark.rpc.message.maxSize', '1000'),
                                 ('spark.sql.autoBroadcastJoinThreshold','-1'),
                                 ('spark.local.dir', '/tmp/')])
sc = pyspark.SparkContext(conf=config)
spark = SparkSession.builder.appName('Venia').config(conf=config).getOrCreate()



22/08/18 20:37:46 WARN Utils: Your hostname, iccluster039 resolves to a loopback address: 127.0.1.1; using 10.90.38.15 instead (on interface ens786f0)
22/08/18 20:37:46 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/18 20:37:48 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
22/08/18 20:37:48 WARN SparkConf: Note that spark.local.dir will be overridden by the value set by the cluster manager (via SPARK_LOCAL_DIRS in mesos/standalone/kubernetes and LOCAL_DIRS in YARN).
22/08/18 20:37:50 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
22/08/18 20:37:50 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


In [6]:
spark.sparkContext.getConf().getAll()

[('spark.reducer.maxSizeInFlight', '5g'),
 ('spark.driver.memory', '50g'),
 ('spark.rpc.message.maxSize', '1000'),
 ('spark.sql.warehouse.dir',
  'file:/scratch/venia/web2wiki/iterative_coding/spark-warehouse'),
 ('spark.driver.port', '45807'),
 ('spark.local.dir', '/tmp/'),
 ('spark.executor.id', 'driver'),
 ('spark.driver.host', '10.90.38.15'),
 ('spark.app.name', 'pyspark-shell'),
 ('spark.app.id', 'local-1660847870325'),
 ('spark.app.startTime', '1660847867976'),
 ('spark.executor.cores', '40'),
 ('spark.python.worker.memory', '5g'),
 ('spark.rdd.compress', 'True'),
 ('spark.serializer.objectStreamReset', '100'),
 ('spark.driver.maxResultSize', '0'),
 ('spark.master', 'local[*]'),
 ('spark.submit.pyFiles', ''),
 ('spark.submit.deployMode', 'client'),
 ('spark.sql.autoBroadcastJoinThreshold', '-1'),
 ('spark.ui.showConsoleProgress', 'true'),
 ('spark.executor.memory', '50g')]

- Curlie data
- ORES topics
- Contexts
- Number of times the domain shares the article


In [7]:
@F.udf
def extract_subdomain(x):
    if len(tldextract.extract(x).subdomain) > 1:
        y = tldextract.extract(x).subdomain +"."+tldextract.extract(x).registered_domain
    else:
        y = tldextract.extract(x).registered_domain
    return y

@F.udf
def extract_domain(x):
    y = tldextract.extract(x).registered_domain
    return y

@F.udf
def normalise_title(title):
    """ Replace _ with space, remove anchor, capitalize """
    title = title.split("/")[-1]
    title = title.split("#")[0]
    title = urllib.parse.unquote(title)
    title = title.strip()
    if len(title) > 0:
        title = title[0].upper() + title[1:]
    n_title = title.replace("_", " ")
    # if '#' in n_title:
    #     n_title = n_title.split('#')[0]
    return n_title


def clean_web_content(web_content,drop_dups = True):
    """
    Run this once... To generate the metadata file.
    """
    web_content = web_content.dropna(subset = ["url","wiki_url"])
    if drop_dups == True:
        web_content.dropDuplicates(subset=["url","wiki_url"])
    web_content=web_content.withColumn("title", normalise_title("wiki_url"))
    web_content = web_content.withColumn("subdomain", extract_subdomain("url"))
    web_content = web_content.withColumn("domain", extract_domain("url"))
    web_content = web_content.filter(F.col("title") != "")
    return web_content


# this fn is necessary for reversing the host names
@F.udf 
def reverse_reverse_host(x):
    x = x.split(".")
    x = x[-1::-1]
    x = ".".join(x)
    return x



In [14]:
## importing files
harmonic = True
all_crawl = True
wiki_mirror = False
all_links = True


web_content = spark.read.load("/scratch/venia/web2wiki/data/web_content/iterative_coding_sample/wiki_content.parquet")
web_content = clean_web_content(web_content)

if wiki_mirror == True:
    mirrors = spark.read.load("/scratch/venia/web2wiki/data/all_wikimirror_shares.parquet")

if harmonic == True:
    
    if all_crawl:
        crawl_dump =spark.read.option("delimiter", "\t").csv(DATA_DIR + "common_crawl_dump/cc-main-2021-feb-apr-may-domain-ranks.txt.gz", header = True)
        crawl_dump = crawl_dump.withColumn("domain",reverse_reverse_host("#host_rev"))
    else:
        crawl_dump = spark.read.load("/scratch/venia/web2wiki/data/top_domains_pr.parquet")

# en_shares = spark.read.load("/scratch/venia/web2wiki/data/en_shares_merged.parquet")
# en_curlie = spark.read.load("/scratch/venia/web2wiki/data/en_curlie_metadata.parquet")
    
en_shares = pd.read_parquet("/scratch/venia/web2wiki/data/en_shares_merged2.parquet")
en_curlie = pd.read_parquet("/scratch/venia/web2wiki/data/en_curlie_metadata.parquet")
    

# url counts, use this for limiting to sites that only share one url
url_counts = spark.read.load("/scratch/venia/web2wiki/data/all_url_counts.parquet")



In [15]:
url_counts.take(3)

[Stage 12:>                                                         (0 + 1) / 1]                                                                                

[Row(url='http://www.hadice-prumyslove.cz/p/3142/hadice-flexibilni-pur-lehka-ag-tl-07-mm-dn-165-mm', count=1),
 Row(url='http://www.rvb-hassberge.de/meine-bank/geschaeftsstellen/berater/berater-zeil.html', count=1),
 Row(url='http://www.pages.mi.it/2014/12/02/ied-201415-sociologia-della-comunicazione-visuale-parte-duetre', count=6)]

In [None]:
dist = url_counts.toPandas()

                                                                                

In [13]:
# # if it's a bold, emphasize, ... then we will map it to the second level location. 
# web_content.loc[web_content["a1"].isin(word_changes), "a1"] = web_content.loc[web_content["a1"].isin(word_changes), "a2"]

# # replace any hx with header
# web_content.loc[web_content["a1"].str.match("h[1-6]"), "a1"] = "header"

In [14]:
# ensure english
from langdetect import detect
from whatthelang import WhatTheLang

@F.udf
def detect_lang(x):
    wtl = WhatTheLang()
    try: 
        return wtl.predict_lang(x)
    except:
        return None
    
def sample_files(web_content, with_lang = True, prop = 0.015):
    web_content_sample = web_content.sample(prop)
    if with_lang == True:
        web_content_sample = web_content_sample.withColumn("lang", detect_lang("text1"))
        web_content_sample = web_content_sample.filter(F.col("lang") == "en")
    return web_content_sample

web_content_sample = sample_files(web_content)

In [15]:
@F.udf
def extract_wiki_text(row):
    bs = BeautifulSoup(row,"xml")
    text = bs.get_text()
    if len(text)< 2:
        text = "None"
    return text



In [16]:
# we will also sample the explicit mentions of Wikipedia
web_content = web_content_sample.withColumn("text", extract_wiki_text(F.col("text1")))
# sample_explicit = sample_files(a, with_lang= False)


In [17]:
web_content = web_content.join(crawl_dump, on ="domain",how="inner")

In [18]:
web_content_sample = web_content.toPandas()

                                                                                

In [30]:
web_content_sample["#pr_pos"] = web_content_sample["#pr_pos"].astype(int)
top_pr = web_content_sample.sort_values(by = "#pr_pos").head(2000).drop_duplicates(subset = ["domain"]).sample(50)

In [72]:
subset_articles = ['Cloud computing', 'ISO 3166-1 alpha-2', 'WordPress', 'oronavirus disease 2019',
       'Dunning–Kruger effect']

In [21]:

class sample_articles():
    """
    This class is used to call various sampling techniques for 
    sampling the articles for the iterative coding. 
    """
    def __init__(self, web_contentt):
        """
        web_contentt: this is the cleaned web2wiki content. 
        """
        self.web_content = web_contentt.copy()
        self.sampled = None
        self.sampled_html = None
        
    def map_html_structure(self, word_changes):
        web_content = self.web_content
        web_content.loc[web_content["a1"].isin(word_changes), "a1"] = web_content.loc[web_content["a1"].isin(word_changes), "a2"]
        web_content.loc[web_content["a1"].str.match("h[1-6]"), "a1"] = "header"
        return web_content


    def article_sample(self, sample_col = "title", buckets = 5, sample_number = 15):
        """
        This file samples articles based on buckets.
        We drop duplicates on domains to get many different domains that invoke articles
        instead of the dominant several. 
        
        sample_col: this will be the column that we use to define buckets over
        buckets: determines the number of buckets we use to sample over
        sample_number: how many samples we want from each bucket. 
        """
        assert self.sampled == None, f"Already sampled"
        web_content = self.web_content
        web_content["article_count"] = web_content.groupby([sample_col])[sample_col].transform("count")
        web_content["bucket"] = pd.cut(web_content["article_count"].rank(pct=True), buckets, labels=False)
        web_content = web_content.drop_duplicates(subset = ["domain"])
        sampled = pd.DataFrame()
        for i in range(buckets):
            sampled = sampled.append(web_content[web_content["bucket"] == i].sample(sample_number))
    
        self.sampled = sampled
        return sampled
    
    def sample_html_structure(self, tags,sample_number = 7, word_changes = None):
        """
        Sample over the HTML structure.
        For each of the in tags
        
        tags: a list of HTML tags we wish to restrict to        
        sample_number: the number of rows we wish to sample
        word_changes: sometimes, a1 may be a bold or some other feature, in this case we want to look up one level.
        """
        assert self.sampled_html == None, f"Already sampled"
        if word_changes != None:
            web_content = map_html_structure(word_changes)
        else:
            web_content = self.web_content
        
        web_content = web_content[web_content["a1"].isin(tags)]
        tag_count = web_content.groupby("a1")["title"].count().reset_index()
        tag_count = tag_count[tag_count["title"] > 9]
        web_content = web_content.drop_duplicates(subset = ["domain"])
        web_content= web_content[web_content["a1"].isin(tag_count.a1)]
        sampled = web_content.groupby("a1").sample(sample_number, replace = True)
        sampled =  sampled.drop_duplicates()
        
        self.sampled_html= sampled

        return sampled, tag_count
    
    def list_append(self, subset_articles, sample_number = 5):
        """
        We sample one article many times to see the different contexts it can arise in.
        
        subset_articles: a list of articles that we want to sample over.
        """
        web_content = self.web_content
        web_content = web_content[web_content["title"].isin(subset_articles)]
        web_content = web_content.drop_duplicates(subset = "domain")
        appended_vals = web_content.groupby("title").sample(sample_number)
        return appended_vals


# sample,wc2 = article_sample(web_content, "title")

In [22]:

sa = sample_articles(web_content_sample)

In [43]:
sample1 = sa.list_append(subset_articles).sample(frac = 1)
sample2 = sa.article_sample().sample(frac= 1)

In [35]:
def extract_neighbouring_text(x,text, num_chars = 300):
        """
        Extracts the neighbouring text of the Wikipedia mention
        """
        ind = x.find(text)
        n = len(x)

        if ind - num_chars < 0:
            ind_0 = 0
        else: ind_0 = ind - 200
        if ind + num_chars > n:
            return x[ind_0:]
        else: 
            ind_1 = ind + num_chars + 50
            return x[ind_0:ind_1]


        return x[ind_0:ind_1]



def process_sample(sample, harmonic = False):
    """
    This will process a sample to make it clean for iterative coding
    
    sample: this is the sample of shares we want to process
    harmonic: if we include harminoc position
    """   
    sample = sample.merge(en_shares, on = "title")
    sample["context"] = sample.apply(lambda x: extract_neighbouring_text(x["text2"], x["wiki_url"]), axis = 1)
    if harmonic == True:
        sample = sample[["url", "domain","subdomain","#harmonicc_pos","a1", "a2", "title","context","text","total_count","total_views", "topic_leaf"]]
    else: 
        sample = sample[["url", "domain","subdomain","a1", "a2", "title","context","text","total_count","total_views", "topic_leaf"]]
    sample = sample.rename({"a1":"parent_1", "a2":"parent_2"},axis = 1)
    sample = sample.merge(en_curlie[["domain","label","label_leaf"]], on = "domain", how = "left")
    return sample


In [33]:
web_content_sample = web_content_sample[web_content_sample["lang"] == "en"]

In [36]:
top_harmonic = process_sample(top_pr, harmonic = True)

In [38]:
top_harmonic.to_csv("/scratch/venia/web2wiki/data/iterated_coding/4_top_pr.csv",index=False)

In [39]:
rand_sample = process_sample(web_content_sample.sample(200))

In [40]:
rand_sample.to_csv("/scratch/venia/web2wiki/data/iterated_coding/4_random_articles.csv")

In [37]:
url_count_limited_sample = process_sample(url_count_limited_sample)
url_count_limited_sample.to_csv("/scratch/venia/web2wiki/data/iterated_coding/one_share_per_url.csv", index=False)



In [59]:
# for the harmonic data
harmonic_head = harmonic.head(100).sample(60)
harmonic_tail = harmonic.tail(500).sample(60)

harmonic_head = process_sample(harmonic_head, harmonic = True)
harmonic_tail = process_sample(harmonic_tail, harmonic = True)

In [60]:
harmonic_head.to_csv("/scratch/venia/web2wiki/data/iterated_coding/top_harmonic_sample.csv", index = False)
harmonic_tail.to_csv("/scratch/venia/web2wiki/data/iterated_coding/bottom_harmonic_sample.csv", index = False)



In [54]:
# explicit = explicit_pd.sample(50)
explicit = process_sample(explicit_pd, harmonic = False)

In [60]:
explicit.to_csv("/scratch/venia/web2wiki/data/iterated_coding/explicit_mentions.csv", index=False)

In [223]:
pd.set_option('max_colwidth', 600)