## Setup

In [1]:
!pip install elasticsearch-dsl

[33mYou are using pip version 9.0.1, however version 10.0.1 is available.
You should consider upgrading via the 'pip install --upgrade pip' command.[0m


In [2]:
import os
import pyspark
from pyspark.sql import SQLContext

# Add the elasticsearch-hadoop jar
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jovyan/elasticsearch-hadoop-6.2.2.jar pyspark-shell'
conf = pyspark.SparkConf()

# Point to the master.
conf.setMaster("spark://tweetsets.library.gwu.edu:7101")
import os
import pyspark

# Add the elasticsearch-hadoop jar
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars /home/jovyan/elasticsearch-hadoop-6.2.2.jar pyspark-shell'
conf = pyspark.SparkConf()

# Point to the master.
conf.setMaster("spark://tweetsets.library.gwu.edu:7101")
conf.setAppName("beltway-fox")
conf.set("spark.driver.bindAddress", "0.0.0.0")
# Don't hog all of the cores.
conf.set("spark.cores.max", "3")
# Specify a port for the block manager (which runs as part of the worker). The range 7003-7028 is set 
# to be open in the Spark worker container.
conf.set("spark.blockManager.port", "7003")

# create the context
sc = pyspark.SparkContext(conf=conf)
sqlContext = SQLContext(sc)

# Configure for ElasticSearch cluster and index.
es_conf = {"es.nodes": "tweetsets.library.gwu.edu",
           "es.port": "9200",
           "es.resource": "tweets-c2c0c8/doc",
           "es.read.field.as.array.include": "hashtags,text,urls,mention_screen_names"}

## Screen names, phrases, and terms

In [3]:
# Mentions any of these accounts
screen_names = ("FoxNews",
                "foxandfriends",
                "IngrahamAngle",
                "Shepnewsteam",
                "TeamCavuto",
                "DanaPerino",
                "FoxNewsSunday",
                "HeatherChilders",
                "AmericaNewsroom",
                "JillianMele",
                "SchmittNYC",
                "OutnumberedFNC",
                "ainsleyearhardt",
                "kilmeade",
                "SteveDoocy",
                "BillHemmer",
                "SandraSmithFox",
                "JonScottFNC",
                "HARRISFAULKNER",
                "TheFive",
                "dailybriefing",
                "GregGutfeldShow",
                "SpecialReport",
                "BretBaier",
                "JesseBWatters",
                "TheJuanWilliams",
                "kimguilfoyle",
                "TheStoryFNC",
                "marthamaccallum",
                "TuckerCarlson",
                "seanhannity",
                "FoxNewsTonight",
                "foxnewspolitics",
                "foxnewsnight",
                "ShannonBream",
                "HowardKurtz",
                "HuntsmanAbby",
                "ffweekend",
                "FoxNewsSunday",
                "MediaBuzzFNC",
                "SundayFutures",
                "mariabartiromo",
                "WattersWorld")

# or uses any of these phrases
phrases = ("Fox News",
           "Fox and Friends",
           "Fox Friends",
           "Heather Childers",
           "Jillian Mele",
           "Ainsley Earhardt",
           "Steve Doocy",
           "Bill Hemmer",
           "Sandra Smith",
           "Jon Scott",
           "Harris Faulkner",
           "Greg Gutfeld",
           "Bret Baier",
           "Jesse Watters",
           "Juan Williams",
           "Kim Guilfoyle",
           "Tucker Carlson"
           "Shannon Bream",
           "Howard Kurtz",
           "Abby Huntsman",
           "Media Buzz",
           "Sunday Futures",
           "Maria Bartiromo"
           
)

# or any of these terms.
terms = ("Ingraham",
         "Cavuto",
         "Perino",
         "Hannity")

## Query

In [4]:
from elasticsearch_dsl import Search
from elasticsearch_dsl.query import Q
import json

s = Search()
q = Q('terms', mention_screen_names=screen_names)
for term in terms:
    q = q | Q("match", text=term)
for phrase in phrases:
    q = q | Q("match_phrase", text=phrase)
# After June 1, 2017
q = q & Q("range", created_at={"gte": "2017-06-01", "lt":"now", "format": "yyyy-MM-dd"})

s = s.filter(q)
es_conf['es.query'] = json.dumps(s.query.to_dict())

tweets_df = sqlContext.read.format("org.elasticsearch.spark.sql").options(**es_conf).load()
tweets_df.count()

33745

## Export

In [5]:
import csv
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def clean_text(text):
    return (' '.join(text)).replace('\n', ' ')
 
clean_text_udf=udf(clean_text, StringType())

csv_df = tweets_df.select("tweet_id", "user_screen_name", "created_at", "tweet_type", clean_text_udf('text').alias('text'))
csv_df.toPandas().to_csv('fox_tweets.csv', quoting=csv.QUOTE_NONNUMERIC, index=False, header=True)