In [1]:
import pyspark, pickle
from pyspark import SparkContext
from pyspark.sql.functions import countDistinct
from pyspark.storagelevel import StorageLevel
import pandas as pd
import numpy as np

In [None]:
conf = pyspark.SparkConf().setAppName("App")
conf = (conf.setMaster('local[*]')
        .set('spark.executor.memory', '67G')
        .set('spark.driver.memory', '67G')
        .set('spark.driver.maxResultSize', '67G'))

sc = pyspark.SparkContext(conf=conf)
sq = pyspark.sql.SQLContext(sc)

# init the spark session
spark = pyspark.sql.SparkSession.builder.getOrCreate()

## Check schema of dataframe made from tweets

In [3]:
!ls ~/notebooks

1_tweet_api_pull-JON.ipynb		7_create_wordclouds-JON.ipynb
2_preprocessNEW.ipynb			8_sentiment_analysis-JON.ipynb
2_preprocess_tweets-JON.ipynb		9_research_user_mrlukeyluke-JON.ipynb
3_model_climate_acceptance-JON.ipynb	archive
4_predict_climate_acceptance-JON.ipynb	between_central.csv
5_network_analysis-JON.ipynb		files
5_NETWORK_NEW.ipynb			model_list.p
6_topic_modeling-JON.ipynb		spark-warehouse
6_TOPICMODEL_NEW.ipynb			tim_notebooks


In [5]:
df = spark.read.json('files/trump2.json')
df.printSchema()

root
 |-- _id: struct (nullable = true)
 |    |-- $oid: string (nullable = true)
 |-- contributors: string (nullable = true)
 |-- coordinates: struct (nullable = true)
 |    |-- coordinates: array (nullable = true)
 |    |    |-- element: double (containsNull = true)
 |    |-- type: string (nullable = true)
 |-- created_at: string (nullable = true)
 |-- display_text_range: array (nullable = true)
 |    |-- element: long (containsNull = true)
 |-- entities: struct (nullable = true)
 |    |-- hashtags: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |-- element: long (containsNull = true)
 |    |    |    |-- text: string (nullable = true)
 |    |-- media: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- additional_media_info: struct (nullable = true)
 |    |    |    |    |-- description: string (nullable = true)
 |    |    |    |    |-- em

## Function to read in tweet files, extract necessary data, and save

In [3]:
def extract_tweet_data(tweets_folder, save_as):
    """
    tweets_folder: a folder with .json files containing tweets
    save_as: filename to save processed tweets
    This function loads the tweets, extracts the necessary data, and saves a new parquet file with that data.
    """

    df = spark.read.json(tweets_folder)
    df.registerTempTable("df")

    # Extract tweets, nested retweets, and nested quoted tweets into separate flattened dataframes
    flat_tweets = spark.sql("""
        select user.created_at as account_created_at,
        
        id_str as tweet_id, coordinates.coordinates[0] as lon, coordinates.coordinates[1] as lat, created_at,
            in_reply_to_status_id_str as in_reply_to_tweet_id, in_reply_to_screen_name, source, user.description,
            user.followers_count, user.friends_count, user.id_str as user_id, user.screen_name as screen_name,
            user.location, user.name, user.statuses_count, user.time_zone, user.utc_offset, retweet_count, 
            retweeted_status.id_str as retweet_id, retweeted_status.user.screen_name as retweeted_screen_name,
            quoted_status_id_str as quote_id, quoted_status.user.screen_name as quoted_screen_name, entities.hashtags,
            entities.user_mentions,
                case when retweeted_status is not null then retweeted_status.text
                else text end as text

        from df
    """)

    flat_RTs = spark.sql("""
        select user.created_at as account_created_at,
        
        id_str as tweet_id, coordinates.coordinates[0] as lon, coordinates.coordinates[1] as lat, created_at,
            in_reply_to_status_id_str as in_reply_to_tweet_id, in_reply_to_screen_name, source, user.description,
            user.followers_count, user.friends_count, user.id_str as user_id, user.screen_name as screen_name,
            user.location, user.name, user.statuses_count, user.time_zone, user.utc_offset, retweet_count, 
            null as retweet_id, null as retweeted_screen_name, quoted_status_id_str as quote_id,
            quoted_status.user.screen_name as quoted_screen_name, entities.hashtags, entities.user_mentions, text
        from 
            (select retweeted_status.*
             from df
             where retweeted_status is not null
            ) as sub
    """)

    flat_QTs = spark.sql("""
        select user.created_at as account_created_at,
        
        id_str as tweet_id, coordinates.coordinates[0] as lon, coordinates.coordinates[1] as lat, created_at,
            in_reply_to_status_id_str as in_reply_to_tweet_id, in_reply_to_screen_name, source, user.description,
            user.followers_count, user.friends_count, user.id_str as user_id, user.screen_name as screen_name,
            user.location, user.name, user.statuses_count, user.time_zone, user.utc_offset, retweet_count, 
            null as retweet_id, null as retweeted_screen_name, quoted_status_id_str as quote_id,
            null as quoted_screen_name, entities.hashtags, entities.user_mentions, text
        from 
            (select quoted_status.*
             from df
             where quoted_status is not null
            ) as sub
    """)

    flat_tweets.registerTempTable('flat_tweets')
    flat_RTs.registerTempTable('flat_RTs')
    flat_QTs.registerTempTable('flat_QTs')
    
    # Combine the above and keep distinct tweet_ids
    undistinct_tweets = spark.sql("""
        select *
        from flat_tweets
        union all
        select *
        from flat_RTs
        union all
        select *
        from flat_QTs

    """)
    undistinct_tweets.registerTempTable('undistinct_tweets')
    
    distinct_tweets = spark.sql("""
        select *
        from
            (select *,
                row_number() over (partition by tweet_id order by created_at) as n_repeats
            from undistinct_tweets) as sub
        where n_repeats = 1
    """)
    distinct_tweets.persist(StorageLevel.MEMORY_AND_DISK)
    distinct_tweets.registerTempTable('distinct_tweets')
    
    distinct_tweets.write.parquet(save_as)
    print('Extracted and saved!')

## Extract and save data

In [5]:
extract_tweet_data('files/trump2.json',
                   'files/trump2b.parquet')

Extracted and saved!


## Load simplified data and union distinct tweet_ids

In [9]:
trump_tweets = spark.read.parquet('files/trump2b.parquet')

In [10]:
trump_tweets.registerTempTable('trump_tweets')

In [11]:
all_tweets = trump_tweets
all_tweets.persist(StorageLevel.MEMORY_AND_DISK)
all_tweets.registerTempTable('all_tweets')

In [12]:
all_tweets.count()

3363952

## Do some checks on the data

In [13]:
all_tweets.columns

['account_created_at',
 'tweet_id',
 'lon',
 'lat',
 'created_at',
 'in_reply_to_tweet_id',
 'in_reply_to_screen_name',
 'source',
 'description',
 'followers_count',
 'friends_count',
 'user_id',
 'screen_name',
 'location',
 'name',
 'statuses_count',
 'time_zone',
 'utc_offset',
 'retweet_count',
 'retweet_id',
 'retweeted_screen_name',
 'quote_id',
 'quoted_screen_name',
 'hashtags',
 'user_mentions',
 'text',
 'n_repeats']

In [16]:
# View some of the data
(all_tweets.filter('lon is not null')
          .select('tweet_id', 'screen_name', 'lon', 'lat', 'text')
          .show())

+------------------+---------------+-------------+-----------+--------------------+
|          tweet_id|    screen_name|          lon|        lat|                text|
+------------------+---------------+-------------+-----------+--------------------+
|974674127587102727|       rob_blue| -86.89117484|40.41784362|@USMC_Razorback @...|
|974718622181412864|        Stikine|   -122.16553|   52.14843|@realDonaldTrump ...|
|976507732290359296|    apattyquote|  -92.3750398| 34.7634995|@realDonaldTrump ...|
|975188531860586496| translatorbali|  106.6537692|  -6.242585|Happy 39th Birthd...|
|975064337709662208| watters_nrgsuk|   -4.1416316|  55.860401|@realDonaldTrump ...|
|977610740793331713|          ItsGQ|     -77.0367|    38.8951|There’s more peop...|
|629554738766479360|realDonaldTrump|  -73.9751351| 40.7624514|.@FrankLuntz is a...|
|975354953832255488|      HaHaNoNWO| -89.92451929| 35.1893897|@realDonaldTrump ...|
|976850728856809479|          Kiddi| -21.89541254|64.12966563|@realDonaldTru

In [18]:
# Number of distinct users: 663,916
(all_tweets.agg(countDistinct(all_tweets.user_id))
          .show())

+-----------------------+
|count(DISTINCT user_id)|
+-----------------------+
|                 663916|
+-----------------------+



In [19]:
# Most retweeted users
spark.sql("""
    select screen_name, sum(retweet_count) as total_rts
    from all_tweets
    where retweet_id is null
    group by screen_name
    order by total_rts desc
""").show()

+---------------+---------+
|    screen_name|total_rts|
+---------------+---------+
|realDonaldTrump|  6848941|
|    BarackObama|  2673046|
| HillaryClinton|   790548|
|    jimmykimmel|   708287|
|         funder|   671508|
|        tedlieu|   614553|
|   krassenstein|   550223|
|     SenSanders|   462445|
|        rihanna|   430354|
|          Comey|   421396|
|        POTUS44|   347955|
|   kylegriffin1|   317820|
| RealJamesWoods|   312624|
|      EdKrassen|   296122|
|    colesprouse|   281987|
|   SethAbramson|   281671|
|        FoxNews|   246257|
|  RepAdamSchiff|   238105|
|Fire_Sister_Bee|   234297|
|      TomFitton|   221728|
+---------------+---------+
only showing top 20 rows



In [20]:
# Users with most followers - this isn't that informative, because these followers aren't
# necessarily found in our data
spark.sql("""
    select screen_name, max(followers_count) as n_followers
    from all_tweets
    group by screen_name
    order by n_followers desc
""").show()

+---------------+-----------+
|    screen_name|n_followers|
+---------------+-----------+
|      katyperry|  108963807|
|   justinbieber|  105671694|
|    BarackObama|  101129213|
|        rihanna|   86614734|
|       ladygaga|   77645853|
|   TheEllenShow|   77312893|
|        Twitter|   62510556|
|  KimKardashian|   59262874|
|       ddlovato|   55513564|
|         cnnbrk|   54517839|
|realDonaldTrump|   49540870|
|        nytimes|   41550011|
|   narendramodi|   41384479|
|      KingJames|   41101418|
|     MileyCyrus|   40048130|
|            CNN|   39872507|
|    BBCBreaking|   37539864|
|   SportsCenter|   35595957|
|           espn|   34040157|
|     wizkhalifa|   33298483|
+---------------+-----------+
only showing top 20 rows



## Read a bunch of tweets from each of my search categories to make sure they are generally relevant

Jon - this section isn't really relevant to me, but because of my massive amount of tweets, it's fine!

In [42]:
# # 'climatechange' is all about climate change

# spark.sql("""
#     select screen_name, created_at, text
#     from all_tweets
#     where text like '%climatechange%'
# """).take(50)

In [43]:
# # '#climate' is all about climate change

# spark.sql("""
#     select screen_name, created_at, text
#     from all_tweets
#     where text like '%#climate %'
# """).take(50)

In [44]:
# # 'climate' and combinations of certain other words are all about climate change

# spark.sql("""
#     select screen_name, created_at, text
#     from all_tweets
#     where text like '%climate%' and text rlike 'science|scientists?|alarmists?|change|realists?|denial|deniers?' 
# """).take(50)

In [45]:
# # 'globalwarming' and 'global' or 'warming' are all about climate change

# spark.sql("""
#     select screen_name, created_at, text
#     from all_tweets
#     where (text like '%globalwarming%') or (text like '%global%' and text like '%warming%') 
# """).take(50)

In [46]:
# # 'agw' grabs lots of irrelevant stuff (mostly in Japanese...)
# # I changed my API query to search specifically for '#agw', which appears to be much more effective

# spark.sql("""
#     select screen_name, created_at, text
#     from all_tweets
#     where text like '%#agw%'
# """).take(50)

## Remove bots

In [24]:
# View retweet/tweet ratios for users. 

spark.sql("""
select screen_name, n_rt, n_orig, n_rt/(n_rt + n_orig)*100 as rt_tw_ratio
from
    (select screen_name, 
        sum(case when type = 'rt' then n else 0 end) as n_rt,
        sum(case when type = 'orig' then n else 0 end) as n_orig
    from
        (select screen_name, type, count(*) as n
        from
            (select screen_name,
                case when retweet_id is null then 'orig'
                else 'rt' end as type
            from all_tweets) as sub
        group by screen_name, type) as sub2
    group by screen_name) as sub3
order by rt_tw_ratio DESC, n_rt ASC
""").show()

+---------------+----+------+-----------+
|    screen_name|n_rt|n_orig|rt_tw_ratio|
+---------------+----+------+-----------+
|        LushoBP|   1|     0|      100.0|
|  auntie_karen7|   1|     0|      100.0|
|  BrownRanger97|   1|     0|      100.0|
|   tlantermason|   1|     0|      100.0|
|Linda_Thomas_RN|   1|     0|      100.0|
|    Gam3r4l1f31|   1|     0|      100.0|
|     My140Chars|   1|     0|      100.0|
|  calkjohnetta2|   1|     0|      100.0|
|   karenrigatti|   1|     0|      100.0|
|   ShannonP1120|   1|     0|      100.0|
|  ibrakeforjake|   1|     0|      100.0|
|   blacksignori|   1|     0|      100.0|
|     abella0417|   1|     0|      100.0|
|y6oDCXl6RcxgjY1|   1|     0|      100.0|
|      pazcowen3|   1|     0|      100.0|
|        qarloff|   1|     0|      100.0|
|         dab4jc|   1|     0|      100.0|
|          42Kmi|   1|     0|      100.0|
|  dissentingops|   1|     0|      100.0|
|  jackburton414|   1|     0|      100.0|
+---------------+----+------+-----

In [25]:
#Remove users that have retweet/tweet ratios greater than 97%, and that have more than 10 retweets.
# They are either bots or curated lists who don't engage in conversation.

bot_list = spark.sql("""
    SELECT screen_name
    FROM
        (SELECT screen_name, 
            sum(case when type = 'rt' then n else 0 end) as n_rt,
            sum(case when type = 'orig' then n else 0 end) as n_orig
        FROM
            (SELECT screen_name, type, count(*) as n
            FROM
                (SELECT screen_name,
                    case when retweet_id is null then 'orig'
                    else 'rt' end as type
                FROM all_tweets) as sub
            group by screen_name, type) as sub2
        group by screen_name
        having n_rt/(n_rt + n_orig)*100 > 90 and n_rt > 10) bots
""")

bot_list.registerTempTable('bot_list')
bot_list.persist(StorageLevel.MEMORY_AND_DISK)

DataFrame[screen_name: string]

In [26]:
bot_list.count()

14846

In [133]:
# filtered out the bots criteria above, and they have to have at least 2/3 as many followers as friends

all_tweets = spark.sql("""
    select account_created_at,t.screen_name as screen_name, tweet_id, lon, lat, created_at, in_reply_to_tweet_id, in_reply_to_screen_name, source, description,
        followers_count, friends_count, user_id, location, name, statuses_count, time_zone, utc_offset, retweet_count,
        retweet_id, retweeted_screen_name, quote_id, quoted_screen_name, hashtags, user_mentions, text
    from all_tweets t
    left join bot_list b
    on b.screen_name = t.screen_name
    where b.screen_name is null
    and followers_count > 75
    having (t.followers_count / t.friends_count) > .66
""")

In [136]:
all_tweets.count() # down from 3363952

1331125

In [28]:
print(all_tweets.columns)

['account_created_at', 'tweet_id', 'lon', 'lat', 'created_at', 'in_reply_to_tweet_id', 'in_reply_to_screen_name', 'source', 'description', 'followers_count', 'friends_count', 'user_id', 'screen_name', 'location', 'name', 'statuses_count', 'time_zone', 'utc_offset', 'retweet_count', 'retweet_id', 'retweeted_screen_name', 'quote_id', 'quoted_screen_name', 'hashtags', 'user_mentions', 'text', 'n_repeats']


## Save tweets

In [31]:
try:
    all_tweets.write.parquet('files/processed_tweets2b.parquet')
    
except Exception as e:
    print("Does the file already exist? See error:")
    print(e)

Does the file already exist? See error:
'path file:/home/ubuntu/notebooks/files/processed_tweets2b.parquet already exists.;'


## Extract the messy nested objects (i.e. hashtags and user_mentions) from the tweets  
## Save as separate tables

### Hashtags

In [32]:
all_tweets = spark.read.parquet('files/processed_tweets2b.parquet')

In [33]:
all_tweets.registerTempTable('all_tweets')

In [34]:
all_tweets.count()

1331125

In [35]:
hashtags = spark.sql("""
    select distinct tweet_id, lower(hashtags.text) as hashtag
    from
        (select tweet_id, explode(hashtags) as hashtags
        from all_tweets) as sub
""")

In [36]:
hashtags.show()

+------------------+-----------------+
|          tweet_id|          hashtag|
+------------------+-----------------+
|974364366605815809|             全国两会|
|974619151812694016|     midterms2018|
|975168743411154944|             maga|
|976128796444188674|      dosomething|
|976206388501168128|        deepstate|
|976493709150834688|6monthsaftermaria|
|977676811340599296|          omnibus|
|974590026343501824|     realitycheck|
|974684723619430400|        deepstate|
|975111635005333504|              doj|
|975184406255054848|            qanon|
|975799479088635910|             eeuu|
|975922940633145344|      immigration|
|976887319411093504|      donaldtrump|
|976894893363814401|       rosenstein|
|976899446658273281|   vetotheomnibus|
|977654612785184774| marchforourlives|
|974649155376336897|       qanon8chan|
|975093781547626496|            trump|
|975107684688113664|thegreatawakening|
+------------------+-----------------+
only showing top 20 rows



In [38]:
try:
    hashtags.write.parquet('files/hashtags_all2.parquet')

except Exception as e:
    print("Does the file already exist? See error:")
    print(e)

Does the file already exist? See error:
'path file:/home/ubuntu/notebooks/files/hashtags_all2.parquet already exists.;'


### Mentions

In [39]:
mentions = spark.sql("""
    select distinct tweet_id, mentions.screen_name as mention_screen_name
    from
        (select tweet_id, explode(user_mentions) as mentions
        from all_tweets) as sub
""")

In [40]:
mentions.show()

+------------------+-------------------+
|          tweet_id|mention_screen_name|
+------------------+-------------------+
|954895035983679488|    polishprincessh|
|974569292455972864|    realDonaldTrump|
|974599816188694528|    realDonaldTrump|
|974638075358924800|    TyEducatingLibs|
|974680978420633600|    AmericaFirstPol|
|974695059810549760|            NBCNews|
|974715764522483712|    realDonaldTrump|
|974732170236481537|       StacyLStiles|
|974738148004687872|        vrijdenkend|
|974750631477968897|          BigDonTee|
|974810705823182848|           SebGorka|
|974816514187685888|           SebGorka|
|974821789183500288|        Edible3Ball|
|975074807107768320|    realDonaldTrump|
|975087154882863106|         AWAKEALERT|
|975101863447154690|    realDonaldTrump|
|975108027303915520|    TheGreatFeather|
|975118718652084224|    realDonaldTrump|
|975136057057468416|                 VP|
|975144575957422081|              POTUS|
+------------------+-------------------+
only showing top

In [14]:
mentions.write.parquet('/home/ubuntu/notebooks/mentions_all2.parquet')