# DS577 Final Project - Spark Part

**Note** Please be advised that the results were running on clusters. Some additional configurations may be needed when you reproduce the results locally.

**Acknowlegments** Thanks for the computing resources from Department of Computer Science, Rutgers University.

In [1]:
sc = spark.sparkContext

Starting Spark application


ID,YARN Application ID,Kind,State,Spark UI,Driver log,Current session?
517,application_1542400534304_0382,pyspark,idle,Link,Link,âœ”


SparkSession available as 'spark'.


In [2]:
import json
import pandas as pd

## Bitcoin

In [20]:
raw_lines = sc.textFile('twitter-raw/bitcoin.raw')

### Most Authoriatative Account

In [21]:
def map_func(line):
    try:
        tweet = json.loads(line)
        return [tweet['user']['id_str'], tweet['user']['name'], tweet['user']['followers_count']]
    except:
        return None

In [14]:
records = (
    raw_lines
        .map(lambda x: map_func(x))
        .filter(lambda x: x is not None)
)

In [16]:
df = records.toDF()
pandas_df = df.toPandas()

In [23]:
pandas_df.sort_values(by='_3', ascending=False).drop_duplicates(subset='_1').head(10)

              _1                _2        _3
13993    1652541  Reuters Top News  19991612
56352   91478624            Forbes  14996057
6347    37034483              NDTV  10995618
699    124172948        La Patilla   6828592
4650    14173315          NBC News   6260067
1555    15872418      ABS-CBN News   5918143
26265   20751449         The Hindu   5005645
49843   34713362         Bloomberg   5002254
53620     624413       MarketWatch   3599891
46235   20402945              CNBC   3008531

### Most Retweets

In [22]:
def map_rt(line):
    try:
        tweet = json.loads(line)
        if 'retweeted_status' in tweet:
            origin_tweet = tweet['retweeted_status']
            return [origin_tweet['id_str'], origin_tweet['text'], origin_tweet['retweet_count']]
    except:
        return None

In [23]:
retweets = (
    raw_lines
        .map(lambda x: map_rt(x))
        .filter(lambda x: x is not None)
)

In [29]:
pandas_df = retweets.toDF().toPandas()
sorted_table = pandas_df.sort_values(by='_3', ascending=False).drop_duplicates(subset='_1')
sorted_table.head(10)

                        _1  ...       _3
18426  1050944089053519872  ...    13363
4398   1054520588734058496  ...     8951
2316    935900326007328768  ...     7042
10549  1030187498109067264  ...     5812
15059  1045274485954433024  ...     5031
23474  1038367786349854721  ...     3813
19101  1052259344685965318  ...     3431
15253  1022431002084487168  ...     3269
11499  1060224184176050176  ...     3196
22577  1064141922183335936  ...     2863

[10 rows x 3 columns]

In [47]:
sorted_table.head(10)['_2']

18426    UNIQUE SELLING POINT(USP)\nUse your Alpha-X re...
4398     @vicentes @Grimezsz Wanna buy some Bitcoin? ðŸ˜‰ðŸ˜‰...
2316     When I predicted Bitcoin at $500,000 by the en...
10549    AIRDROP IS RUNNING!\nGet your free DOS Token n...
15059    Connecty is the first #blockchain platform ded...
23474    Trakx is announcing the https://t.co/uhz62eKx2...
19101    ICO IS LIVE! Go to https://t.co/gS1qemNkRx and...
15253    SOCIALREMIT Airdrop has started!\nFREE up to 2...
11499    If I Fail No Nut November I Will Give Away One...
22577    About time someone spoke truth to power.\n\nLo...
Name: _2, dtype: object

### Language

In [13]:
file_ = sc.textFile('twitter-raw/lang-code')
dict_ = file_.map(lambda x: (x.split(' - ')[1].rstrip('\n'), x.split(' - ')[0]))
lang_code = dict_.collectAsMap()
lang_code['und'] = 'undefined'

In [14]:
def map_lang(line):
    try:
        tweet = json.loads(line)
        return (tweet['lang'], 1)
    except:
        return None

In [17]:
from operator import add
langs = (
    raw_lines
        .map(lambda x: map_lang(x))
        .filter(lambda x: x is not None)
        .reduceByKey(add)
)

In [21]:
lang_table = langs.toDF().toPandas()
lang_table['Language'] = lang_table['_1'].apply(lambda x: lang_code[x] if x in lang_code else 'unknown')

In [48]:
lang_table.sort_values(by='_2', ascending=False).head(10)

     _1     _2    Language
15   en  44801     English
1   und   3018   undefined
2    ja   2270    Japanese
3    es   1609     Spanish
27   fr   1171      French
32   tr    640     Turkish
26   in    621  Indonesian
4    de    432      German
16   pt    324  Portuguese
0    it    316     Italian

## Thanksgiving

In [54]:
raw_lines = sc.textFile('twitter-raw/tweets.raw')

### Most Authoriatative Account

In [4]:
def map_func(line):
    try:
        tweet = json.loads(line)
        return [tweet['user']['id_str'], tweet['user']['name'], tweet['user']['followers_count']]
    except:
        return None

In [5]:
records = (
    raw_lines
        .map(lambda x: map_func(x))
        .filter(lambda x: x is not None)
)

In [6]:
df = records.toDF()
pandas_df = df.toPandas()

In [7]:
pandas_df.sort_values(by='_3', ascending=False).drop_duplicates(subset='_1').head(10)

               _1                       _2        _3
305394     807095       The New York Times  42341389
223134     759251                      CNN  40775502
97700   169686021                       ye  28642036
103971     742143         BBC News (World)  24087709
274303   17471979      National Geographic  22352000
270730    1652541         Reuters Top News  19992916
176711    3108351  The Wall Street Journal  16149026
303933   28785486                 ABC News  13964238
247033  136361303           Vogue Magazine  13549559
210918    2467791      The Washington Post  13070373

### Most Retweets

In [55]:
def map_rt(line):
    try:
        tweet = json.loads(line)
        if 'retweeted_status' in tweet:
            origin_tweet = tweet['retweeted_status']
            return [origin_tweet['id_str'], origin_tweet['text'], origin_tweet['retweet_count']]
    except:
        return None

In [56]:
retweets = (
    raw_lines
        .map(lambda x: map_rt(x))
        .filter(lambda x: x is not None)
)

In [57]:
pandas_df = retweets.toDF().toPandas()
sorted_table = pandas_df.sort_values(by='_3', ascending=False).drop_duplicates(subset='_1')
sorted_table.head(10)

                         _1   ...        _3
66695    933707825821421568   ...    208932
183153   931342169788395520   ...    143189
202431  1063550295408549889   ...    106868
195627   928496752889679872   ...     98105
101770   933712342319288320   ...     90144
75233    933919161226424320   ...     80388
111853  1017252290493845506   ...     77569
112916  1053685710828552192   ...     69519
193555   405778664039534592   ...     68009
42001    798656192071372801   ...     64668

[10 rows x 3 columns]

In [12]:
# Retweets content
sorted_table.head(10)['_2']

66695     From the Obama family to yours, we wish you a ...
183153    Just a reminder last year on Thanksgiving that...
202431    Both of you need to be in prison https://t.co/...
195627    *knocks on door*\nMcConaughey: â€œDo you have a ...
101770    Happy thanksgiving to this woman only https://...
75233     Happy Thanksgiving everyone. Hope you're havin...
111853    *Thanksgiving dinner 2080*\nMe (has dementia):...
112916    Thanksgiving 5 weeks aways yall got yall outfi...
193555    Happy Thanksgiving to all--even the haters and...
42001     You're supposed to bake these ? We bust em up ...
Name: _2, dtype: object

### Language

In [15]:
file_ = sc.textFile('twitter-raw/lang-code')
dict_ = file_.map(lambda x: (x.split(' - ')[1].rstrip('\n'), x.split(' - ')[0]))
lang_code = dict_.collectAsMap()
lang_code['und'] = 'undefined'

In [16]:
def map_lang(line):
    try:
        tweet = json.loads(line)
        return (tweet['lang'], 1)
    except:
        return None

In [17]:
from operator import add
langs = (
    raw_lines
        .map(lambda x: map_lang(x))
        .filter(lambda x: x is not None)
        .reduceByKey(add)
)

In [18]:
lang_table = langs.toDF().toPandas()
lang_table['Language'] = lang_table['_1'].apply(lambda x: lang_code[x] if x in lang_code else 'unknown')

In [19]:
lang_table.sort_values(by='_2', ascending=False).head(10)

     _1      _2    Language
23   en  313184     English
0   und    4987   undefined
41   ja    3152    Japanese
42   es    1833     Spanish
29   fr     672      French
5    tl     377     Tagalog
1    pt     317  Portuguese
22   de     245      German
14   in     199  Indonesian
6    ht     160     Haitian

&copy; 2018, Tai Yang, Ruixuan Song