In [1]:
# heavylifting functions are abstracted and modularized in utils package
from utils.fetch_tweet import UserTweetsFetcher, SearchTweetsFetcher
from utils.analyze_sentiment import SentimentAnalyzer
from pprint import pprint
from datetime import datetime

In [2]:
# if not using docker, run this customized initializer first
from utils.spark_initializer import SparkInitializer
SparkInitializer.init_spark()

In [3]:
# run without any fuss if using official jupyter/spark docker base image
from pyspark.sql import functions
from pyspark.sql.functions import from_json, col, udf
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, DateType, MapType, StringType, FloatType

In [4]:
spark = SparkSession.builder.appName('TradeWar').getOrCreate()

### retrieve data source

In [5]:
topic = ['china', 'trade']
media = ['marketwatch', 'wsj', 'ft', 'business', 'theeconomist', 'cnbc', 'cnn']

In [6]:
# standardize the datetime format from different API
def date_str_reformat(s, format_codes):
    dt = datetime.strptime(s, format_codes)
    date_obj = dt.date()
    return date_obj

In [7]:
media_tweets_fetcher = SearchTweetsFetcher(media, topic)

In [8]:
media_tweets = media_tweets_fetcher.fetch_search_tweets()

In [9]:
media_tweet_dateformat = '%a %b %d %H:%M:%S %z %Y'
for md in media_tweets:
    md['created_at'] = date_str_reformat(md['created_at'], media_tweet_dateformat)

In [10]:
# retrieved json format tweets
pprint(media_tweets[0])

{'contributors': None,
 'coordinates': None,
 'created_at': datetime.date(2019, 12, 24),
 'entities': {'hashtags': [],
              'symbols': [],
              'urls': [{'display_url': 'cnb.cx/395MOSZ',
                        'expanded_url': 'https://cnb.cx/395MOSZ',
                        'indices': [76, 99],
                        'url': 'https://t.co/xvuHoZFORY'}],
              'user_mentions': []},
 'favorite_count': 17,
 'favorited': False,
 'geo': None,
 'id': 1209315177163902977,
 'id_str': '1209315177163902977',
 'in_reply_to_screen_name': None,
 'in_reply_to_status_id': None,
 'in_reply_to_status_id_str': None,
 'in_reply_to_user_id': None,
 'in_reply_to_user_id_str': None,
 'is_quote_status': False,
 'lang': 'en',
 'metadata': {'iso_language_code': 'en', 'result_type': 'recent'},
 'place': None,
 'possibly_sensitive': False,
 'retweet_count': 8,
 'retweeted': False,
 'source': '<a href="http://www.socialflow.com" rel="nofollow">SocialFlow</a>',
 'text': 'The US-China tr

### manipulate data using Spark

In [11]:
# as shown in the sample tweet, user is in nested json. MapType is the best choice to StructType it
media_tweet_schema = StructType([StructField('created_at', DateType(), True),
                                 StructField('user', MapType(StringType(), StringType()), True),
                                 StructField('text', StringType(), True),
                                 ])

In [12]:
media_df_origin = spark.createDataFrame(media_tweets, schema=media_tweet_schema)

In [13]:
media_df_origin.show(10)

+----------+--------------------+--------------------+
|created_at|                user|                text|
+----------+--------------------+--------------------+
|2019-12-24|[utc_offset ->, f...|The US-China trad...|
|2019-12-24|[utc_offset ->, f...|RT @CNNBusiness: ...|
|2019-12-23|[utc_offset ->, f...|Asia stocks set t...|
|2019-12-23|[utc_offset ->, f...|The latest tariff...|
|2019-12-23|[utc_offset ->, f...|After a volatile ...|
|2019-12-23|[utc_offset ->, f...|China will reduce...|
|2019-12-23|[utc_offset ->, f...|The latest tariff...|
|2019-12-23|[utc_offset ->, f...|Caught in Trump’s...|
|2019-12-23|[utc_offset ->, f...|China, South Kore...|
|2019-12-23|[utc_offset ->, f...|China says it wil...|
+----------+--------------------+--------------------+
only showing top 10 rows



In [14]:
media_df_origin.printSchema()

root
 |-- created_at: date (nullable = true)
 |-- user: map (nullable = true)
 |    |-- key: string
 |    |-- value: string (valueContainsNull = true)
 |-- text: string (nullable = true)



In [15]:
media_df_origin.select(col('user')['name']).show(10, False)

+-----------------------+
|user[name]             |
+-----------------------+
|CNBC                   |
|CNN                    |
|CNBC                   |
|The Wall Street Journal|
|CNBC                   |
|CNN                    |
|The Wall Street Journal|
|Bloomberg              |
|MarketWatch            |
|The Wall Street Journal|
+-----------------------+
only showing top 10 rows



In [16]:
media_df_name = media_df_origin.withColumn('name', col('user')['name'])

In [17]:
media_df_name = media_df_name.select(col('name'), col('text'), col('created_at'))

In [18]:
media_df_name.show(10)

+--------------------+--------------------+----------+
|                name|                text|created_at|
+--------------------+--------------------+----------+
|                CNBC|The US-China trad...|2019-12-24|
|                 CNN|RT @CNNBusiness: ...|2019-12-24|
|                CNBC|Asia stocks set t...|2019-12-23|
|The Wall Street J...|The latest tariff...|2019-12-23|
|                CNBC|After a volatile ...|2019-12-23|
|                 CNN|China will reduce...|2019-12-23|
|The Wall Street J...|The latest tariff...|2019-12-23|
|           Bloomberg|Caught in Trump’s...|2019-12-23|
|         MarketWatch|China, South Kore...|2019-12-23|
|The Wall Street J...|China says it wil...|2019-12-23|
+--------------------+--------------------+----------+
only showing top 10 rows



In [19]:
# using customized sentiment analyzer class
sa = SentimentAnalyzer()

In [20]:
# transform a customized function into Spark User-Defined-Function
udf_sentscore = udf(sa.sentiment_score, FloatType())

In [21]:
# get sentiment score of the tweet
media_df_sentiment = media_df_name.withColumn('sentiment_score', udf_sentscore(col('text')))

In [22]:
media_df_sentiment.show(10)

+--------------------+--------------------+----------+---------------+
|                name|                text|created_at|sentiment_score|
+--------------------+--------------------+----------+---------------+
|                CNBC|The US-China trad...|2019-12-24|        -0.3818|
|                 CNN|RT @CNNBusiness: ...|2019-12-24|         0.2263|
|                CNBC|Asia stocks set t...|2019-12-23|         0.0258|
|The Wall Street J...|The latest tariff...|2019-12-23|         0.1655|
|                CNBC|After a volatile ...|2019-12-23|         0.3182|
|                 CNN|China will reduce...|2019-12-23|         0.4019|
|The Wall Street J...|The latest tariff...|2019-12-23|         0.1655|
|           Bloomberg|Caught in Trump’s...|2019-12-23|            0.0|
|         MarketWatch|China, South Kore...|2019-12-23|        -0.2732|
|The Wall Street J...|China says it wil...|2019-12-23|        -0.2732|
+--------------------+--------------------+----------+---------------+
only s

### what do mainstream media feel about the trade war ?

In [23]:
media_df_sentiment.groupby('name').mean('sentiment_score').show()

+--------------------+--------------------+
|                name|avg(sentiment_score)|
+--------------------+--------------------+
|The Wall Street J...|-0.13022727248343555|
|         MarketWatch| 0.13023636557839133|
|           Bloomberg|   0.174869699572975|
|                 CNN|  0.1751666640241941|
|                CNBC| 0.04709729743567673|
|       The Economist| 0.23946666717529297|
+--------------------+--------------------+



### The Trump Factor

In [24]:
trump_tweets_fetcher = UserTweetsFetcher('realDonaldTrump')

In [25]:
trump_tweets = trump_tweets_fetcher.fetch_user_tweets()

In [26]:
trump_tweet_dateformat = '%Y-%m-%d %H:%M:%S'
for td in trump_tweets:
    td['created_at'] = date_str_reformat(td['created_at'], trump_tweet_dateformat)

In [27]:
pprint(trump_tweets[0])

{'created_at': datetime.date(2019, 12, 24),
 'text': 'RT @EpochTimes: Over 400 people packed the meeting room, filled the '
         'lobby, and spilled into the parking lot in rural Buckingham County, '
         '#Vi…'}


In [28]:
trump_tweet_schema = StructType([StructField('created_at', DateType(), True),
                                 StructField('text', StringType(), True),
                                 ])

In [29]:
trump_df_origin = spark.createDataFrame(trump_tweets, schema=trump_tweet_schema)

In [30]:
trump_df_origin.show(10)

+----------+--------------------+
|created_at|                text|
+----------+--------------------+
|2019-12-24|RT @EpochTimes: O...|
|2019-12-24|“The real victims...|
|2019-12-24|What the Democrat...|
|2019-12-24|RT @HuckabeeOnTBN...|
|2019-12-24|STOCK MARKET CLOS...|
|2019-12-23|NASDAQ UP 72.2% S...|
|2019-12-23|RT @LindseyGraham...|
|2019-12-23|RT @LindseyGraham...|
|2019-12-23|RT @LindseyGraham...|
|2019-12-23|RT @LindseyGraham...|
+----------+--------------------+
only showing top 10 rows



In [31]:
trump_df_sentiment = trump_df_origin.withColumn('sentiment_score', udf_sentscore(col('text')))

#### Is Mr. Trump happy ?

In [32]:
trump_df_sentiment.select(functions.mean(col('sentiment_score'))).show()

+--------------------+
|avg(sentiment_score)|
+--------------------+
| 0.06351356711893824|
+--------------------+



### Join the singals together ?

In [33]:
media_groupby_date = media_df_sentiment.groupby('created_at').mean('sentiment_score') \
    .select(col('created_at'), col('avg(sentiment_score)').alias('media_avg_sentiment'))
media_groupby_date.show()

+----------+--------------------+
|created_at| media_avg_sentiment|
+----------+--------------------+
|2019-12-18|0.023206668595472973|
|2019-12-22| 0.23946666717529297|
|2019-12-23| 0.01673999838531017|
|2019-12-16| 0.18378437764476985|
|2019-12-20| 0.11665000170469284|
|2019-12-19| 0.05554444011714724|
|2019-12-17| 0.01582307769702031|
|2019-12-21|-0.01857499592006...|
|2019-12-24|-0.07774999737739563|
+----------+--------------------+



In [34]:
trump_groupby_date = trump_df_sentiment.groupby('created_at').mean('sentiment_score') \
    .select(col('created_at'), col('avg(sentiment_score)').alias('trump_avg_sentiment'))
trump_groupby_date.show()

+----------+--------------------+
|created_at| trump_avg_sentiment|
+----------+--------------------+
|2019-12-18|0.026277779291073482|
|2019-12-22|-0.18884999677538872|
|2019-12-23|-0.13394615498299783|
|2019-12-20| 0.26934749642387035|
|2019-12-19|0.006705063053324253|
|2019-12-21|  0.2852347810631213|
|2019-12-24|-0.14087999388575553|
+----------+--------------------+



In [35]:
sentiment_groupby_date = media_groupby_date.join(trump_groupby_date, on=['created_at'], how='inner')
sentiment_groupby_date.sort(col('created_at')).show()

+----------+--------------------+--------------------+
|created_at| media_avg_sentiment| trump_avg_sentiment|
+----------+--------------------+--------------------+
|2019-12-18|0.023206668595472973|0.026277779291073482|
|2019-12-19| 0.05554444011714724|0.006705063053324253|
|2019-12-20| 0.11665000170469284| 0.26934749642387035|
|2019-12-21|-0.01857499592006...|  0.2852347810631213|
|2019-12-22| 0.23946666717529297|-0.18884999677538872|
|2019-12-23| 0.01673999838531017|-0.13394615498299783|
|2019-12-24|-0.07774999737739563|-0.14087999388575553|
+----------+--------------------+--------------------+

