## Reading Tweets from msca-bdp-tweets bucket

### Using PySpark kernel

In [2]:
import os
import subprocess
from pyspark.sql.functions import *
from pyspark.sql.types import *
import datetime
import pytz

In [3]:
datetime.datetime.now(pytz.timezone('US/Central')).strftime("%a, %d %B %Y %H:%M:%S")

'Sat, 13 March 2021 13:50:32'

In [4]:
spark.conf.set("spark.sql.repl.eagerEval.enabled",True)

In [49]:
from google.cloud import storage

def list_blobs(bucket_name, folder_name):
    gcs_client = storage.Client()
    bucket = gcs_client.bucket(bucket_name)
    blobs = list(bucket.list_blobs(prefix=folder_name))
    
    file_list = [blob.name for blob in blobs]
    return file_list

    #for blob in blobs:
        #print(blob.name + '\t' + str(blob.size))

In [50]:
list_blobs('msca-bdp-project', 'Tweets')[:10]

['Tweets/tweets201706221015.json',
 'Tweets/tweets201706221115.json',
 'Tweets/tweets201706221215.json',
 'Tweets/tweets201706221315.json',
 'Tweets/tweets201706221415.json',
 'Tweets/tweets201706221515.json',
 'Tweets/tweets201706221615.json',
 'Tweets/tweets201706221715.json',
 'Tweets/tweets201706221815.json',
 'Tweets/tweets201706221915.json']

In [8]:
def copy_blob(
    bucket_name, blob_name, destination_bucket_name, destination_blob_name
):
    """Copies a blob from one bucket to another with a new name."""
    # bucket_name = "your-bucket-name"
    # blob_name = "your-object-name"
    # destination_bucket_name = "destination-bucket-name"
    # destination_blob_name = "destination-object-name"

    storage_client = storage.Client()

    source_bucket = storage_client.bucket(bucket_name)
    source_blob = source_bucket.blob(blob_name)
    destination_bucket = storage_client.bucket(destination_bucket_name)

    blob_copy = source_bucket.copy_blob(
        source_blob, destination_bucket, destination_blob_name
    )

    print(
        "Blob {} in bucket {} copied to blob {} in bucket {}.".format(
            source_blob.name,
            source_bucket.name,
            blob_copy.name,
            destination_bucket.name,
        )
    )

In [35]:
directory = 'gs://msca-bdp-project/Tweets/'
#file = 'tweets201706221015.json'
file = '^tweets201706221'
path = directory + file
print(path)

gs://msca-bdp-project/Tweets/^tweets201706221


In [52]:
bucket = 'gs://msca-bdp-project/'

files = [bucket + file for file in list_blobs('msca-bdp-project', 'Tweets')[:10]]
print(files)

['gs://msca-bdp-project/Tweets/tweets201706221015.json', 'gs://msca-bdp-project/Tweets/tweets201706221115.json', 'gs://msca-bdp-project/Tweets/tweets201706221215.json', 'gs://msca-bdp-project/Tweets/tweets201706221315.json', 'gs://msca-bdp-project/Tweets/tweets201706221415.json', 'gs://msca-bdp-project/Tweets/tweets201706221515.json', 'gs://msca-bdp-project/Tweets/tweets201706221615.json', 'gs://msca-bdp-project/Tweets/tweets201706221715.json', 'gs://msca-bdp-project/Tweets/tweets201706221815.json', 'gs://msca-bdp-project/Tweets/tweets201706221915.json']


In [53]:
%time tweets_df = spark.read.json(files)

CPU times: user 6.91 ms, sys: 544 µs, total: 7.45 ms
Wall time: 14.2 s


In [54]:
tweets_df.count()

149689

In [19]:
tweets_df.limit(5)

contributors,coordinates,created_at,display_text_range,entities,extended_entities,extended_tweet,favorite_count,favorited,filter_level,geo,id,id_str,in_reply_to_screen_name,in_reply_to_status_id,in_reply_to_status_id_str,in_reply_to_user_id,in_reply_to_user_id_str,is_quote_status,lang,limit,place,possibly_sensitive,quoted_status,quoted_status_id,quoted_status_id_str,retweet_count,retweeted,retweeted_status,source,text,timestamp_ms,truncated,user,withheld_in_countries
,,Thu Jun 22 14:15:...,,"[[],, [], [[stanf...",,,0,False,low,,877892686513975296,877892686513975296,,,,,,False,en,,,False,,,,0,False,"[,, Thu Jun 22 14...","<a href=""http://t...",RT @ArkansasBlog:...,1498140901579,False,"[false, Sun Jun 1...",
,,Thu Jun 22 14:15:...,"[0, 140]","[[],, [], [[twitt...",,"[[0, 129], [[[[11...",0,False,low,,877892684756566016,877892684756566016,,,,,,False,en,,,False,,,,0,False,,"<a href=""https://...",IL Healthcare Tru...,1498140901160,True,"[false, Fri Jun 2...",
,,Thu Jun 22 14:15:...,"[0, 140]","[[],, [], [[twitt...",,"[[0, 129], [[[[11...",0,False,low,,877892686744698882,877892686744698882,,,,,,False,en,,,False,,,,0,False,,"<a href=""https://...",IL Healthcare Tru...,1498140901634,True,"[false, Thu Feb 0...",
,,Thu Jun 22 14:15:...,"[0, 140]","[[],, [], [[twitt...",,"[[0, 129], [[[[11...",0,False,low,,877892689118715904,877892689118715904,,,,,,False,en,,,False,,,,0,False,,"<a href=""https://...",IL Healthcare Tru...,1498140902200,True,"[false, Tue Feb 1...",
,,Thu Jun 22 14:15:...,"[0, 112]","[[[[91, 103], ISM...",[[[pic.twitter.co...,,0,False,low,,877892690242936834,877892690242936834,,,,,,False,en,,,False,,,,0,False,,"<a href=""http://b...",Day 1 of Chicago ...,1498140902468,False,"[false, Thu May 1...",


In [14]:
tweets_df.select('entities.urls.display_url')

display_url
[stanford.io/2sW8...
[twitter.com/i/we...
[twitter.com/i/we...
[twitter.com/i/we...
[]
[]
[kansascity.com/s...
[]
[bit.ly/2sX3cCU]
[]


In [55]:
tweets_df = tweets_df.\
select('created_at', 'id_str', 'in_reply_to_status_id_str', 'in_reply_to_user_id_str',
       'retweet_count', 'favorite_count', col('retweeted_status.text').alias('original_tweet_text'),
       col('retweeted_status.entities.hashtags.text').alias('original_tweet_has')'text',
       col('entities.hashtags.text').alias('hashtags'), 'entities.urls.display_url', 'place.place_type',
       col('place.full_name').alias('place_name'), 'user').\
cache()

In [56]:
tweets_df.limit(5)

created_at,id_str,in_reply_to_status_id_str,in_reply_to_user_id_str,retweet_count,favorite_count,retweeted_status,text,hashtags,display_url,place_type,place_name,user
Thu Jun 22 19:16:...,877968438756593664,,,0,0,"[,, Thu Jun 22 19...",RT @racopeman: Ha...,[],[],,,"[false, Thu Oct 0..."
Thu Jun 22 19:16:...,877968440228585472,,,0,0,,PII of 1 million ...,[],[csoonline.com/ar...,,,"[false, Wed Dec 0..."
Thu Jun 22 19:16:...,877968444800380928,,,0,0,"[,, Thu Jun 22 17...",RT @_TheJoshuaMoo...,[],[],,,"[false, Mon Feb 0..."
Thu Jun 22 19:16:...,877968444792086528,,,0,0,,#DaBears #BearsTa...,"[DaBears, BearsTa...",[ow.ly/4T5Y50cl5V8],,,"[false, Mon Feb 0..."
Thu Jun 22 19:16:...,877968445324771328,,,0,0,"[,, Thu Jun 22 18...",RT @usatodayhss: ...,[],[usat.ly/2sZ3Mk8],,,"[false, Mon May 0..."


In [57]:
tweets_df.groupby("place_type").agg(count('id_str').alias('num_tweets'))

place_type,num_tweets
,143255
city,4319
country,59
admin,428
neighborhood,17
poi,86


In [61]:
tweets_df.filter(array_contains(col('hashtags'), 'UChicago'))

created_at,id_str,in_reply_to_status_id_str,in_reply_to_user_id_str,retweet_count,favorite_count,retweeted_status,text,hashtags,display_url,place_type,place_name,user
Thu Jun 22 19:16:...,877968482746454016,,,0,0,,#UChicago “The Re...,[UChicago],[ow.ly/ANPo50cl62n],,,"[false, Sun Aug 1..."
Thu Jun 22 20:29:...,877986917500952576,,,0,0,,My #UChicago team...,"[UChicago, alumni...",[jobopportunities...,,,"[false, Thu Jun 1..."


In [60]:
# search text column
tweets_df.filter(lower(tweets_df.text).contains("uchicago")).select('hashtags', 'text')

hashtags,text
[],Non-UChicago peop...
[],the four uchicago...
[],RT @HdxAcademy: W...
"[PaulSereno, hist...",Great visit from ...
[thursdaythought],#thursdaythought:...
"[PaulSereno, hist...",RT @MORPaleo: Gre...
[],@AriDavidPaul Cur...
"[PaulSereno, hist...",RT @MORPaleo: Gre...
[],Spending an eveni...
[],@cblatts I though...


In [64]:
# search place column
tweets_df.filter(lower(tweets_df.place_name).contains("University of Chicago")).select('place_name', 'hashtags', 'text')

place_name,hashtags,text


In [24]:
users = tweets_df.select('user.*')

In [25]:
users.limit(3)

contributors_enabled,created_at,default_profile,default_profile_image,description,favourites_count,follow_request_sent,followers_count,following,friends_count,geo_enabled,id,id_str,is_translator,lang,listed_count,location,name,notifications,profile_background_color,profile_background_image_url,profile_background_image_url_https,profile_background_tile,profile_banner_url,profile_image_url,profile_image_url_https,profile_link_color,profile_sidebar_border_color,profile_sidebar_fill_color,profile_text_color,profile_use_background_image,protected,screen_name,statuses_count,time_zone,url,utc_offset,verified
False,Sun Jun 13 02:13:...,False,False,Secular humanist ...,56574,,1554,,2577,True,155078285,155078285,False,en,116,"Pine Bluff, Arkansas",Lake Man,,000000,http://pbs.twimg....,https://pbs.twimg...,False,https://pbs.twimg...,http://pbs.twimg....,https://pbs.twimg...,91D2FA,FFFFFF,DDEEF6,333333,True,False,andyrosebrook,95959,Central Time (US ...,http://www.andyro...,-18000.0,False
False,Fri Jun 29 20:27:...,False,False,Whatever you can ...,167,,22,,87,False,7159992,7159992,False,en,2,Chicago,Annie Williams,,9AE4E8,http://abs.twimg....,https://abs.twimg...,False,https://pbs.twimg...,http://pbs.twimg....,https://pbs.twimg...,1B95E0,87BC44,E0FF92,0,True,False,annoir,417,Central Time (US ...,,-18000.0,False
False,Thu Feb 02 16:15:...,True,False,,111,,264,,231,False,827188692766834688,827188692766834688,False,en,18,,Indivisible9IL,,F5F8FA,,,False,https://pbs.twimg...,http://pbs.twimg....,https://pbs.twimg...,1DA1F2,C0DEED,DDEEF6,333333,True,False,indivisible9IL,342,,,,False


In [24]:
tweets_df.printSchema()

root
 |-- created_at: string (nullable = true)
 |-- id_str: string (nullable = true)
 |-- in_reply_to_status_id_str: string (nullable = true)
 |-- in_reply_to_user_id_str: string (nullable = true)
 |-- retweet_count: long (nullable = true)
 |-- favorite_count: long (nullable = true)
 |-- retweeted_status: struct (nullable = true)
 |    |-- contributors: string (nullable = true)
 |    |-- coordinates: struct (nullable = true)
 |    |    |-- coordinates: array (nullable = true)
 |    |    |    |-- element: double (containsNull = true)
 |    |    |-- type: string (nullable = true)
 |    |-- created_at: string (nullable = true)
 |    |-- display_text_range: array (nullable = true)
 |    |    |-- element: long (containsNull = true)
 |    |-- entities: struct (nullable = true)
 |    |    |-- hashtags: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- indices: array (nullable = true)
 |    |    |    |    |    |-- element: long (contains