In [1]:
import numpy as np
import pandas as pd
import pyspark as ps
import matplotlib.pyplot as plt

## Analysis of example set to me more exploratory

Here we preform an exploration of the partial Spinn3r dataset provided.

We first find how to explore the data using one sampel json file:

In [None]:
EXAMPLE_PATH = 'swiss-tweet/example.json'

In [None]:
example = pd.read_json(EXAMPLE_PATH)
example.columns

We note that the data we are interessted in is contained in the _source variable, in object format:

In [None]:
example.dtypes

In [None]:
example.head()

The object format is a nested json that we could not find how to extract dirrectly using the read json funtion provided. We thus use a json normaliter contained in the pandas libary to extract it:

In [None]:
from pandas.io.json import json_normalize
import json

We set max dispaly columns to a larger number in able to see all columns:

In [None]:
pd.set_option('display.max_columns', 58)
example1 = json_normalize(example._source[0])
print(example1.columns)
print(len(example1.columns))

In [None]:
print(example1)

From the spinner documentation we see that:

The values we are most interessted in:
 - main: content of tweet
 - sentiment: overal sentiment of tweet
 - author_gender: gender of author
 - source_location: to know where was posted
 - lang: to get the language of the tweet
 - source_created: to know when it was made


This brings us the the first things we need to be careful about:
 - what does the sentiment realy say about the tweet?
 - is the location accurate?

Additionaly, after some trying around we find that the inbuilt read_json function was not ideal for or pourposed. We thus open the file using 'open' and transform it into a dataframe using json_normalize. 

In [None]:
with open(EXAMPLE_PATH
         ) as data_file:    
    data = json.load(data_file)

In [None]:
#example of how data looks like, for first tweet
data[0]['_source']

In [None]:
twitter_df = json_normalize(data)

In [None]:
#nicer naming
twitter_df.columns = [ column.replace('_source.','') for column in twitter_df.columns]
print(twitter_df.columns)
print(twitter_df.dtypes)

In [None]:
twitter_df.head()

In [None]:
language_freq = twitter_df['lang'].value_counts()
language_freq

In [None]:
gender_freq = twitter_df['author_gender'].value_counts()
gender_freq

In [None]:
sentiment_freq = twitter_df['sentiment'].value_counts()
sentiment_freq

In [None]:
negative_tweets = twitter_df[twitter_df['sentiment'] == 'NEGATIVE']['main'] 
negative_tweets.map(lambda x: ':(' in x).value_counts() ##so it's just the smileys

In [None]:
negative_tweets

In [None]:
positive_tweets = twitter_df[twitter_df['sentiment'] == 'POSITIVE']['main'] 
positive_tweets

In [None]:
positive_tweets.map(lambda x: (':-)' in x) or (':)' in x) or (';)' in x)).value_counts() #also just smileys basicaly

In [None]:
plt.hist(twitter_df.main_length, bins=50)
plt.show()

We can see right away that the location names depend on the language of the user.

In [None]:
twitter_df['source_location'].head()

In [None]:
twitter_df['source_location'].value_counts()

We also dirrectly see that often, the location is not very accurate, giving us only 'switzerland'.
This will not be enough for our analysis as we want to see if we can find a difference between the röstigraben.
But we could still work with this by using language as an inicator instead.

In [None]:
twitter_df['source_spam_probability'].value_counts() #none of them are suspected to be spam

In [None]:
twitter_df['tags'].dropna()

We see a münchen in there -> munich -> this indicated that geolocation may be off

In [None]:
twitter_df['published'] = pd.to_datetime(twitter_df['published'])

In [None]:
twitter_df['published'].map(lambda x: (x.day, x.month)).value_counts() #all published on same day, same month

In [None]:
twitter_df['source_user_interactions'].value_counts()

In [None]:
#twitter_df['source_following'].value_counts() just to see, not that usefull as values, make graph maybe if useable.

In [None]:
#twitter_df['source_favorites'].value_counts()

In [None]:
#twitter_df['source_followers'].value_counts()

In [None]:
twitter_df['source_likes'].value_counts() #no facebook likes

In [None]:
twitter_df.drop(twitter_df['lang'] == 'und')

In [None]:
twitter_df.iloc[-11:, 3:]['main']

don't forget to remove all links
check for swiss german! (some do get into dutch but not that many)

In [None]:
#remove all languages we don't care about
mask = (twitter_df['lang'] != 'de') & (twitter_df['lang'] != 'fr') & (twitter_df['lang'] != 'en')
twitter_df.drop(twitter_df[mask].index, inplace=True)
twitter_df.reset_index(drop=True, inplace=True)

In [None]:
twitter_df[twitter_df['sentiment'] == 'NEGATIVE']['lang'].value_counts()

In [None]:
twitter_df[twitter_df['main'].map(lambda x: 'therap' in x) ]['main'] ##issue of adds

In [None]:
twitter_df[twitter_df['main'].map(lambda x: 'suic' in x) ]['main'] #issue of news

In [None]:
twitter_df.iloc[2105, :]['main'] #example depression tweet that was not recognized

In [None]:
twitter_df.iloc[14146, :]['main']

In [None]:
twitter_df.iloc[11539, :]['main'] #again, neutral when this is negative

In [None]:
twitter_df.iloc[1128, :]['main'] #we should remove if news

# Applying it to Spark

In [22]:
from pyspark.sql import *
from pyspark import SparkContext, SQLContext
from pyspark.sql.functions import regexp_replace, col, lower, explode

In [3]:
from pyspark.sql import SparkSession

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .config("spark.some.config.option", "some-value") \
    .getOrCreate()

In [118]:
df = spark.read.json('swiss-tweet/')
# Displays the content of the DataFrame to stdout

In [37]:
# df.dtypes # we only care about the inner nested types in ._source

In [119]:
df = df.select('_source.*') #remove nesting

In [120]:
#columns we care about
columns = ['main', 'published', 'source_spam_probability', 'source_location', 'tags', 'lang', 'sentiment',
                   'author_gender', 'source_followers', 'source_following']

In [121]:
df = df[columns]

In [129]:
mask = (df.lang == 'en') | (df.lang == 'fr') | (df.lang == 'de')
df = df.filter(mask)  #gets tweets with right language
df = df.filter(df.sentiment != 'POSITIVE') #remove positive sentient tweets
df = df.filter(df.source_spam_probability < 0.5) #remove spam
df = df.withColumn('main', lower(df.main)) #get lower case

In [123]:
df = df.withColumn('main', regexp_replace(col('main'), 'pic.twitter\S+', ' ')) #remove picture urls

In [124]:
df = df.filter(~(df.main.contains('http'))).filter(~(df.main.contains('.com'))).filter(~(df.main.contains('.ch'))).filter(~(df.main.contains('www')))

In [109]:
df.filter(df.main.isin('lol'))

In [139]:
df.select('main').map(dict_filter)

AttributeError: 'DataFrame' object has no attribute 'map'

In [125]:
df.select('main').head(15)

[Row(main='en espérant que 2016 soit meilleur que 2015 quand même'),
 Row(main='nice bmw ...he kills ant s!!!  '),
 Row(main='@madmenna ich hab nichts dagegen, wenn mir andere beim saufen zuschauen, solange sie pro minute 5.- zahlen. @deinwoelfchen'),
 Row(main='@ietsist dank je. heel veel lieve nieuwjaarswensen terug aan jullie allemaal. net terug na een leuk feest in belgië zonder internet.'),
 Row(main='happy new jear!!!! i am back from 2 weeks cuba! let s go into the 2016!  '),
 Row(main='@megadriver16 bonne année à toi je prends beaucoup de plaisir à suivre tes vidéos je suis un grand fande tour de france merci continue ainsi'),
 Row(main='@sweaggygirl oh la la'),
 Row(main="putain j'ai maaaal au crane la"),
 Row(main="@claudiocolu alla fine gn simm #b'ccat ️"),
 Row(main='so, das ist ein kleiner teil von bangkok  '),
 Row(main="gister was de trein reis van belgie naar nederland wel echt de allergrootste ramp ooit. :')"),
 Row(main='beauty is around  '),
 Row(main='neujahrsmorgen,

In [107]:
df.write.json('reduced_tweets') #write dataframe to folder

In [113]:
df.coalesce(1).write.json('reduced_tweets2') #write just one file instead of multiple! :D

In [131]:
tweets = df.toPandas() #turns it into pandas df

In [132]:
tweets.lang.value_counts() #not that many german/germanic tweets, lots of spanish etc.

en    3755
fr    3026
de    1950
Name: lang, dtype: int64

In [137]:
def dict_filter(word):
    found = False
    for w in ['lol', 'depri', 'na']:
        if( w in word):
            found = True
    return found

In [61]:
# f = df.select(regexp_replace(col('main'), 'http\S+|www.\S+', ' ').alias('filtered'))

In [None]:
# f = f.select(regexp_replace(col('filtered'), '.*[\r\n]', ' ').alias('filtered'))

In [None]:
# main = df.select('_source.main').withColumn("main", regexp_replace('main', "http\S+|www.\S+", " "))