# Spark Case Study

### Chase Middleman, Steven Weld, Daniel Jimenez, Zohaib Khawaja

Using the [data](https://s3.us-east-2.amazonaws.com/jgartner-test-data/twitter/zippedData.zip) provided to us by Twitter, we sought after first cleaning up the data by removing any unnecessary columns.  We went through the data dictionary provided by Twitter object by object to determine which ones would be useful to our analysis.  We then separated the data into either tweets supporting Macron, tweets supporting Le Pen, or tweets that weren't supporting either of them, the third category which would be excluded from further analysis.  Each tweet would have specific entities that we would look at, most importantly hashtags, mentions/tags, and whether or not the tweet had been retweeted or favorited.  Finally, we made a scatterplot of approximate locations of tweets sent out in support of either candidate with hopes of finding correlations between home locations and support.

For the sake of this project's scope, we assume that all mentions of a candidate in a given tweet is an acknowledgement of support for said candidate.  It could be said that there is no such thing as bad publicity.

In [4]:
import json
import pprint
import pyspark as ps
import pandas as pd
import numpy as np
pd.options.display.max_colwidth = 200

In [5]:
spark = ps.sql.SparkSession.builder \
            .master("local[4]") \
            .appName("df case study") \
            .getOrCreate()

sc = ps.SparkContext
tweets_df = spark.read.json('./data/french_tweets.json').sample(False, 0.02)

## Cleaning up the code to only necessary columns

In [6]:
# Remove unnecessary columns, expand and remove nested columns based on needed keys
def CleanUp(rdd, columns):
    return (rdd[columns].withColumn('user_id', rdd['user.id'])
            .withColumn('user_screen_name', rdd['user.screen_name'])
            .withColumn('place_type', rdd['place.place_type'])
            .withColumn('place_coordinates', rdd['place.bounding_box.coordinates'])
            .withColumn('hashtags', rdd['entities.hashtags.text'])
            .withColumn('mentions_id', rdd['entities.user_mentions.id'])
            .drop('user')
            .drop('place')
            .drop('entities')
           )

cleaned = CleanUp(tweets_df, ['entities', 'filter_level', 'id', 'in_reply_to_user_id', 
    'lang', 'place', 'possibly_sensitive', 'text', 'timestamp_ms', 'user'])

In [7]:
cleaned.createOrReplaceTempView("dataset")

## Dataframe Schema

In [9]:
cleaned.printSchema()

root
 |-- filter_level: string (nullable = true)
 |-- id: long (nullable = true)
 |-- in_reply_to_user_id: long (nullable = true)
 |-- lang: string (nullable = true)
 |-- possibly_sensitive: boolean (nullable = true)
 |-- text: string (nullable = true)
 |-- timestamp_ms: string (nullable = true)
 |-- user_id: long (nullable = true)
 |-- user_screen_name: string (nullable = true)
 |-- place_type: string (nullable = true)
 |-- place_coordinates: array (nullable = true)
 |    |-- element: array (containsNull = true)
 |    |    |-- element: array (containsNull = true)
 |    |    |    |-- element: double (containsNull = true)
 |-- hashtags: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- mentions_id: array (nullable = true)
 |    |-- element: long (containsNull = true)



## Determining whether a tweet is in support of Macron or Le Pen

#### If a tweet mentions either candidate, replies to either candidate, or has a hashtag that contains either candidate's name, we mark that up as being in support of said candidate

In [13]:
import pyspark.sql.functions as F
from pyspark.sql.functions import udf
from pyspark.sql.types import IntegerType

In [14]:
# Candidate public Twitter IDs
le_pen_id = 217749896
macron_id = 1976143068

le_pen_support = -1
macron_support = 1

# Find out if the candidates ID is mentioned in the tweet, returns -1 for Le Pen, 1 for Macron, or 0 for neither/both
def GetSupport(col):
    if le_pen_id in col and macron_id in col:
        return 0
    elif le_pen_id in col:
        return le_pen_support
    elif macron_id in col:
        return macron_support
    else:
        return 0
    pass

udf_func = udf(GetSupport, IntegerType())
cleaned = cleaned.withColumn('supports', udf_func(F.col('mentions_id')))
no_tags = cleaned.withColumn('amount_hashtags', F.size(F.col('hashtags'))).filter(F.col('amount_hashtags') > 0)

In [18]:
def coords_estimate(entry):
    '''
    params: row: the individual row of the dataframe
    returns: an array of length 2 with the average x and y coordinates of the bounding box.
    '''
    coords = np.array(entry[0])
    return np.mean(coords, axis = 0) 
pd_df = cleaned.toPandas()
pd_df['coord_estimates'] = pd_df['place_coordinates'].dropna().apply(coords_estimate)

NameError: name 'np' is not defined