# Hacker News Data Processing

In [1]:
#!pip install google-cloud-bigquery
#!pip install textblob
#!pip install swifter

from google.cloud import bigquery
from textblob import TextBlob
import bokeh
import numpy as np # linear algebra
import pandas as pd # data processing, CSV file I/O (e.g. pd.read_csv)
import html 
import dask
import re
import dask.dataframe as dd
from tqdm import tqdm, tqdm_pandas
import swifter


'''import swifter
df.swifter.apply(func)'''

# Define the BigQuery Client
client = bigquery.Client.from_service_account_json("winterrose-nlp-49041459bd3c.json")

# A Google BigQuery Function
def querytodf(query):
    query_job = client.query(query)
    
    iterator = query_job.result(timeout=60)
    rows = list(iterator)

    # Transform the rows into a nice pandas dataframe
    df = pd.DataFrame(data=[list(x.values()) for x in rows], columns=list(rows[0].keys()))
    
    return df

# Query Section

In [None]:
%%time

# Using WHERE reduces the amount of data scanned / quota used
query = """
SELECT hnc.id, 
       hnc.by,
       hnc.author,
       hnc.text, 
       hnc.time, 
       hnc.ranking, 
       hnc.deleted, 
       hnc.dead, 
       hnc.parent as sid,
       hns.by as sauthor,
       hns.time as stime,
       hns.title as stitle,
       hns.deleted as sdeleted,
       hns.dead as sdead,
       hns.score as score,
       hns.text as stext,
       hns.url as surl
FROM `bigquery-public-data.hacker_news.comments` as hnc
INNER JOIN `bigquery-public-data.hacker_news.stories`as hns ON hns.id  = hnc.parent
"""
df = querytodf(query)

In [None]:
import dask.dataframe as dd

In [None]:
df.shape

In [None]:
df.head()

In [None]:
# Convert Pandas Dataframe to Dask Dataframe and then save to Disk as CSV.
#!mkdir data
ds = dd.from_pandas(df, npartitions=30)
ds.to_csv('data/export-*.csv').compute()

In [None]:
# Trash the old ones from memory. 
del ds, df
import gc
gc.collect()

# IMPORT FROM CSV 

In [None]:
import dask.dataframe as dd

In [None]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)

In [None]:
cluster

In [None]:
%%time
# IMPORT FROM CSV's
ds2 = dd.read_csv('data/export-*.csv').compute(scheduler='processes')

In [None]:
%%time
# IMPORT FROM CSV's
ds2 = dd.read_csv('data/export-*.csv').compute(scheduler='threads')

In [None]:
ds2.shape

In [None]:
ds2.head(3)

In [None]:
ds2.shape

In [None]:
nans = ds2.text.isna().sum()
nans
ds2 = ds2.dropna(subset=['author', 'text'])
ds2.shape

In [None]:
# Define sentiment Analysis function

def encode_decode(text):
    unescaped = html.unescape(text)
    return unescaped

def noHTML(text):
    cleanr = re.compile('<.*?>')
    cleantext = re.sub(cleanr, ' ', text)
    return cleantext

def get_sentiment(text):
    """
    Utility function to classify sentiment of passed text
    using textblob's sentiment method. Return the polarity
    score as a float within the range [-1.0, 1.0]
    """
    # create TextBlob object of passed text's polarity
    return TextBlob(text).sentiment.polarity

def noURLS(text):
    """
    Utility function to clean text by removing links
    using simple regex statements.
    """
    return ''.join(re.sub(r"http\S+", "", text))

In [None]:
tqdm_pandas(tqdm())
# via Pandas
ds2['cleaned_comment'] = ds2.text.progress_apply(lambda x: noURLS(noHTML(encode_decode(x))))

In [None]:
%%time
# via Swifter
ds2.text.swifter.apply(lambda x: noURLS(noHTML(encode_decode(x))))
print('done')

In [None]:
%%time
# via Dask
ds2.text.apply(lambda x: noURLS(noHTML(encode_decode(x)))).compute(scheduler='threads')
print(done)

In [None]:
ds2['comment_sentiment'] = ds2['cleaned_comment'].swifter.apply(lambda x: get_sentiment(x))

In [None]:
ds6 = ds2.loc[:, ~ds2.columns.str.match('Unnamed')]
ds6.head()

In [None]:
ds3['comment_sentiment_dask'] = ds3['cleaned_comment'].apply(lambda x: get_sentiment(x)).compute(scheduler='threads')

In [3]:
#ds2.to_csv('data/ds2export.csv',index=False)

NameError: name 'ds2' is not defined

In [2]:
%%time
# IMPORT FROM CSV's
d1 = pd.read_csv('data/ds2export.csv')
d1.shape

CPU times: user 38.4 s, sys: 3.21 s, total: 41.6 s
Wall time: 34.8 s


In [3]:
ds3 = d1.loc[:, ~d1.columns.str.match('Unnamed')]

In [4]:
commentorList = ds3.by.unique().tolist()
len(commentorList)

183926

In [5]:
cList = pd.DataFrame(commentorList)

In [6]:
cList.columns = ['commentor']
cList.head()

Unnamed: 0,commentor
0,jpeg_hero
1,barce
2,josephpmay
3,opendomain
4,darrellsilver


In [7]:
%%time
# Create testingDF for df format
x = 'eli'

# Select subdf for the selected author. 
subdf = ds3[ds3['by'].values == x]

# Commentor
commentor = x

# Create the first output, a float indicating commentor's mean sentiment score. 
commentor_sentiment = subdf['comment_sentiment'].mean() 

# Upvotes Mean
commentor_upvotes_mean = subdf['ranking'].mean() 

# Upvotes Total
commentor_upvotes_total = subdf['ranking'].sum()

# Total Happiness
commentor_total_happyness = subdf[subdf['comment_sentiment'] > 0.0].comment_sentiment.sum() 

# Total Saltiness
commentor_total_saltiness = subdf[subdf['comment_sentiment'] < 0.0].comment_sentiment.sum() 

# Third output, total number of commments
total_comments = len(subdf.index)

# Total salty comments
qty_salty_comments = (subdf.comment_sentiment < 0.0).sum()

# Total non-salty comments
qty_non_salty_comments = (subdf.comment_sentiment > 0.0).sum()

# Create the second output, a list of the commentor's saltiest comments. 
salty_comments = subdf[['time','comment_sentiment','ranking','cleaned_comment', 'stitle']][0:9].to_json(orient='records')

# Ten most positive comments
sweet_comments = subdf[['time','comment_sentiment','ranking','cleaned_comment', 'stitle']].tail(10).to_json(orient='records')

outputDF = pd.DataFrame.from_records([{ 'commentor': commentor, 
                                        'commentor_sentiment': commentor_sentiment, 
                                        'commentor_upvotes_mean': commentor_upvotes_mean,
                                        'commentor_upvotes_total': commentor_upvotes_total,
                                        'commentor_total_happyness': commentor_total_happyness,
                                        'commentor_total_saltiness': commentor_total_saltiness,
                                        'total_comments': total_comments,
                                        'qty_salty_comments': qty_salty_comments, 
                                        'qty_non_salty_comments': qty_non_salty_comments,
                                        'salty_comments': salty_comments, 
                                        'sweet_comments': sweet_comments} ])

testingDF = outputDF

CPU times: user 87.4 ms, sys: 8 ms, total: 95.4 ms
Wall time: 94.2 ms


In [8]:
display(testingDF.head())

Unnamed: 0,commentor,commentor_sentiment,commentor_total_happyness,commentor_total_saltiness,commentor_upvotes_mean,commentor_upvotes_total,qty_non_salty_comments,qty_salty_comments,salty_comments,sweet_comments,total_comments
0,eli,0.084931,147.795118,-51.737913,8.584439,9709,658,286,"[{""time"":1426095792,""comment_sentiment"":0.0031...","[{""time"":1307486234,""comment_sentiment"":0.075,...",1131


In [9]:
final_dataframe = testingDF

In [None]:
len(commentorList)

183926

In [None]:
def loopSum(i):  
    # Select subdf for the selected author.
    subdf = ds3[ds3['by'].values == i]
    # Commentor
    commentor = i
    # Create the first output, a float indicating commentor's mean sentiment score. 
    commentor_sentiment = subdf['comment_sentiment'].mean() 
    # Upvotes Mean
    commentor_upvotes_mean = subdf['ranking'].mean() 
    # Upvotes Total
    commentor_upvotes_total = subdf['ranking'].sum()
    # Total Happiness
    commentor_total_happyness = subdf[subdf['comment_sentiment'] > 0.0].comment_sentiment.sum() 
    # Total Saltiness
    commentor_total_saltiness = subdf[subdf['comment_sentiment'] < 0.0].comment_sentiment.sum() 
    # Third output, total number of commments
    total_comments = len(subdf.index)
    # Total salty comments
    qty_salty_comments = (subdf.comment_sentiment < 0.0).sum()
    # Total non-salty comments
    qty_non_salty_comments = (subdf.comment_sentiment > 0.0).sum()
    # Create the second output, a list of the commentor's saltiest comments. 
    salty_comments = subdf[['time','comment_sentiment','ranking','cleaned_comment', 'stitle']][0:9].to_json(orient='records')
    # Ten most positive comments
    sweet_comments = subdf[['time','comment_sentiment','ranking','cleaned_comment', 'stitle']].tail(10).to_json(orient='records')
    outputDF = pd.DataFrame([{ 'commentor': commentor, 
                                            'commentor_sentiment': commentor_sentiment, 
                                            'commentor_upvotes_mean': commentor_upvotes_mean,
                                            'commentor_upvotes_total': commentor_upvotes_total,
                                            'commentor_total_happyness': commentor_total_happyness,
                                            'commentor_total_saltiness': commentor_total_saltiness,
                                            'total_comments': total_comments,
                                            'qty_salty_comments': qty_salty_comments, 
                                            'qty_non_salty_comments': qty_non_salty_comments,
                                            'salty_comments': salty_comments, 
                                            'sweet_comments': sweet_comments} ])
    return outputDF

In [None]:
from tqdm import tqdm
    
results = []
for j in tqdm(commentorList[0:183926]):
    newDF = loopSum(j)
    results.append(newDF)

  2%|▏         | 4410/183926 [05:23<3:34:00, 13.98it/s]

In [None]:
finalTableResults = pd.concat(results)
finalTableResults.head()

In [None]:
finalTableResults.to_csv('data/commentor_data.csv',index=False)

In [None]:
finalTableResults.to_parquet('data/commentor_data.csv',index=False)

In [15]:
finalTableResults.shape

(183926, 11)

In [37]:
import os
#!pip install pandavro
import pandavro as pdx

#OUTPUT_PATH='{}/data/example.avro'.format(os.path.dirname(__file__))


dxx = pd.DataFrame({"Boolean": [True, False, True, False],
                   "Float64": np.random.randn(4),
                   "Int64": np.random.randint(0, 10, 4),
                   "String": ['foo', 'bar', 'foo', 'bar'],
                   "DateTime64": [pd.Timestamp('20190101'), pd.Timestamp('20190102'),
                                  pd.Timestamp('20190103'), pd.Timestamp('20190104')]})

pdx.to_avro('data/hn_commentors_db.avro', finalTableResults)
saved = pdx.read_avro('data/hn_commentors_db.avro')

In [36]:
display(saved.head(10))

Unnamed: 0,commentor,commentor_sentiment,commentor_total_happyness,commentor_total_saltiness,commentor_upvotes_mean,commentor_upvotes_total,qty_non_salty_comments,qty_salty_comments,salty_comments,sweet_comments,total_comments
0,jpeg_hero,0.042945,12.108628,-7.985893,23.125,2220,43,32,"[{""time"":1339013609,""comment_sentiment"":0.0,""r...","[{""time"":1385394486,""comment_sentiment"":-0.024...",96
1,barce,0.174968,14.072513,-1.649803,20.732394,1472,50,13,"[{""time"":1326303831,""comment_sentiment"":0.1479...","[{""time"":1426269657,""comment_sentiment"":0.0,""r...",71
2,josephpmay,0.129768,19.228468,-4.045566,7.247863,848,72,25,"[{""time"":1373721992,""comment_sentiment"":0.0,""r...","[{""time"":1364939435,""comment_sentiment"":-0.152...",117
3,opendomain,0.159312,20.135372,-1.655124,8.232759,955,83,16,"[{""time"":1337896054,""comment_sentiment"":0.5,""r...","[{""time"":1417001198,""comment_sentiment"":0.3,""r...",116
4,darrellsilver,0.265126,11.086797,-0.746875,22.205128,866,28,6,"[{""time"":1389388135,""comment_sentiment"":-0.063...","[{""time"":1374354465,""comment_sentiment"":0.2123...",39
5,nshankar,0.129017,4.436171,-0.694683,16.413793,476,20,5,"[{""time"":1366261218,""comment_sentiment"":0.15,""...","[{""time"":1359348181,""comment_sentiment"":0.5,""r...",29
6,adamrneary,0.043665,1.150998,-0.758011,18.555556,167,5,4,"[{""time"":1314204698,""comment_sentiment"":0.2105...","[{""time"":1314204698,""comment_sentiment"":0.2105...",9
7,Daniel_Newby,0.09096,21.286754,-6.91505,11.613924,1835,94,39,"[{""time"":1262413986,""comment_sentiment"":0.0845...","[{""time"":1276718795,""comment_sentiment"":-0.321...",158
8,Pravin,0.278283,0.834848,0.0,1.0,3,3,0,"[{""time"":1216792904,""comment_sentiment"":0.1666...","[{""time"":1216792904,""comment_sentiment"":0.1666...",3
9,b05us,0.195677,1.307396,-0.133333,12.0,72,5,1,"[{""time"":1257962656,""comment_sentiment"":0.1263...","[{""time"":1257962656,""comment_sentiment"":0.1263...",6


## The Graveyard - Ideas that didn't.

In [None]:
# Nope
dsr3 = ds2
dsr3['cleaned_comment'] = dsr3.text.apply(lambda x: noURLS(noHTML(encode_decode(x)))).compute()

In [None]:
# OTHER

dsr3 = dd.from_pandas(ds2, npartitions=2000)

finalDF = dsr2
def fin (daskDataframe):
    daskDataframe['comment_sentiment'] = daskDataframe.text.apply(lambda x: get_sentiment(noURLS(noHTML(encode_decode(x)))))
    daskDataframe['cleaned_comment'] = daskDataframe.text.apply(lambda x: noURLS(noHTML(encode_decode(x))))
    return finalDF

with ProgressBar():
    res = fin(dsr2).compute()

In [None]:

# Select subdf for the selected author. 
subdf = ds2[ds2['by'].values == x]

# Commentor
commentor = x

# Create the first output, a float indicating commentor's mean sentiment score. 
commentor_sentiment = subdf['comment_sentiment'].mean() 

# Upvotes Mean
commentor_upvotes_mean = subdf['ranking'].mean() 

# Upvotes Total
commentor_upvotes_total = subdf['ranking'].sum()

# Total Happiness
commentor_total_happyness = subdf[subdf['comment_sentiment'] > 0.0].comment_sentiment.sum() 

# Total Saltiness
commentor_total_saltiness = subdf[subdf['comment_sentiment'] < 0.0].comment_sentiment.sum() 

# Third output, total number of commments
total_comments = len(subdf.index)

# Total salty comments
qty_salty_comments = (subdf.comment_sentiment < 0.0).sum()

# Total non-salty comments
qty_non_salty_comments = (subdf.comment_sentiment > 0.0).sum()

# Create the second output, a list of the commentor's saltiest comments. 
salty_comments = subdf[['time','comment_sentiment','ranking','cleaned_comment', 'stitle']][0:9].to_json(orient='records')

# Ten most positive comments
sweet_comments = subdf[['time','comment_sentiment','ranking','cleaned_comment', 'stitle']].tail(10).to_json(orient='records')

outputDF = pd.DataFrame.from_records([{ 'commentor': commentor, 
                                        'commentor_sentiment': commentor_sentiment, 
                                        'commentor_upvotes_mean': commentor_upvotes_mean,
                                        'commentor_upvotes_total': commentor_upvotes_total,
                                        'commentor_total_happyness': commentor_total_happyness,
                                        'commentor_total_saltiness': commentor_total_saltiness,
                                        'total_comments': total_comments,
                                        'qty_salty_comments': qty_salty_comments, 
                                        'qty_non_salty_comments': qty_non_salty_comments,
                                        'salty_comments': salty_comments, 
                                        'sweet_comments': sweet_comments} ])
delayed_results = pd.concat((delayed_results, outputDF))

In [None]:
# How many Descendants per story? (Descending)

StoriesPerUser = """
SELECT descendants, COUNT(*) as cnt
FROM `bigquery-public-data.hacker_news.stories`
GROUP BY descendants
ORDER BY descendants DESC
"""

display(querytodf(StoriesPerUser).head(10))

In [None]:
'''%%time

list_names=commentorList

delayed_results = testingDF
for x in list_names:

    # Select subdf for the selected author. 
    subdf = ds2[ds2['by'].values == x]

    # Generate the sentiment analysis for each of the commentor's comments. 
    subdf['comment_sentiment'] = subdf.text.apply(lambda x: get_sentiment(noURLS(noHTML(encode_decode(x)))))
    subdf['cleaned_comment'] = subdf.text.apply(lambda x: noURLS(noHTML(encode_decode(x))))
    subdf = subdf.sort_values(by=['comment_sentiment'],ascending=True)

    # Commentor
    commentor = x

    # Create the first output, a float indicating commentor's mean sentiment score. 
    commentor_sentiment = subdf['comment_sentiment'].mean() 

    # Upvotes Mean
    commentor_upvotes_mean = subdf['ranking'].mean() 

    # Upvotes Total
    commentor_upvotes_total = subdf['ranking'].sum()

    # Total Happiness
    commentor_total_happyness = subdf[subdf['comment_sentiment'] > 0.0].comment_sentiment.sum() 

    # Total Saltiness
    commentor_total_saltiness = subdf[subdf['comment_sentiment'] < 0.0].comment_sentiment.sum() 

    # Third output, total number of commments
    total_comments = len(subdf.index)

    # Total salty comments
    qty_salty_comments = (subdf.comment_sentiment < 0.0).sum()

    # Total non-salty comments
    qty_non_salty_comments = (subdf.comment_sentiment > 0.0).sum()

    # Create the second output, a list of the commentor's saltiest comments. 
    salty_comments = subdf[['time','comment_sentiment','ranking','cleaned_comment', 'stitle']][0:9].to_json(orient='records')

    # Ten most positive comments
    sweet_comments = subdf[['time','comment_sentiment','ranking','cleaned_comment', 'stitle']].tail(10).to_json(orient='records')

    outputDF = pd.DataFrame.from_records([{ 'commentor': commentor, 
                                            'commentor_sentiment': commentor_sentiment, 
                                            'commentor_upvotes_mean': commentor_upvotes_mean,
                                            'commentor_upvotes_total': commentor_upvotes_total,
                                            'commentor_total_happyness': commentor_total_happyness,
                                            'commentor_total_saltiness': commentor_total_saltiness,
                                            'total_comments': total_comments,
                                            'qty_salty_comments': qty_salty_comments, 
                                            'qty_non_salty_comments': qty_non_salty_comments,
                                            'salty_comments': salty_comments, 
                                            'sweet_comments': sweet_comments} ])
    delayed_results = pd.concat((delayed_results, outputDF))

    
results = dask.compute(delayed_results)'''

In [None]:
from dask.distributed import Client, LocalCluster
cluster = LocalCluster()
client = Client(cluster)
cluster

In [None]:
https://myinstance.notebook.us-east-1.sagemaker.aws/notebooks/image_classify.ipynb
And URL of accessing Dask Dashboard will be:

https://myinstance.notebook.us-east-1.sagemaker.aws/proxy/8787/