## Getting and saving the cleaned twitter data
* We get the necessary columns for the twitter data from the  .json files by running the code on the cluster and saving the dataframes as parquet.
* The code to get the data is in cleaned_twitter.py

In [22]:
import pandas as pd
import os
from tqdm import tqdm
from pyspark import SparkConf, HiveContext
import pyspark.sql
from pyspark.sql.types import *
from pyspark.sql import SQLContext
from pyspark.sql.functions import col
import pyspark.sql.functions as func
from pyspark.sql.functions import udf
import pickle


conf = SparkConf().setAppName("ADA-gcl")
#sc = SparkContext(conf=conf)
sqlContext = HiveContext(sc)
#sqlContext.setConf("spark.sql.parquet.compression.codec", "snappy")


## Load the parquet data

In [7]:
base_path = 'file:///home/kirtan/Academics/EPFL/sem1/ADA/ADA-Project/twitter_data/cleaned/'
files = os.listdir('twitter_data/cleaned')

In [8]:
df = sqlContext.read.parquet(base_path+files[0])

In [9]:
df_twitter = df

In [10]:
for f in tqdm(files[1:]):
    temp = sqlContext.read.parquet(base_path + f)
    df_twitter = df_twitter.unionAll(temp)
   
    



In [6]:
# Number of tweets
df_twitter.count()

10828070

In [7]:
df_twitter.show()

+-------------------+--------+---------+--------------------+--------------------+--------------------+
|                 id|language|sentiment|            location|                main|                date|
+-------------------+--------+---------+--------------------+--------------------+--------------------+
|1467622550934000011|      en| POSITIVE|            Schenkon| @beknuesel Fast :-)|2016-07-04T08:51:23Z|
|1467661901184400101|      en| NEGATIVE|            Svizzera|@sebdivo @ildivoo...|2016-07-04T17:05:55Z|
|1467664145156400055|      en|  NEUTRAL|           Neuchâtel|Need you by my side.|2016-07-04T17:30:19Z|
|1467656305452100106|      en| POSITIVE|                null|One Nation. One L...|2016-07-04T16:16:34Z|
|1467672681723400019|      en| POSITIVE|         Orlando, FL|Happy Independenc...|2016-07-04T22:05:47Z|
|1467663376923900019|      en|  NEUTRAL|         Switzerland|@Moreterm Full of...|2016-07-04T17:21:37Z|
|1467670928189100298|      en|  NEUTRAL|         Switzerland|@An

In [11]:
# Keeping only the major languages
t1 = df_twitter.where(col("language").isin({"en", "de", "fr"}))

In [12]:
# Removing those which do not have a sentiment
t1 = t1.filter(df.sentiment != 'null')


In [8]:
t1.count()

8719750

In [9]:
# See main languages
t1.groupBy(['language', 'sentiment']).count().sort(col("language").desc()).collect()

[Row(language='fr', sentiment='NEUTRAL', count=2018350),
 Row(language='fr', sentiment='POSITIVE', count=53314),
 Row(language='fr', sentiment='NEGATIVE', count=8254),
 Row(language='en', sentiment='POSITIVE', count=1423976),
 Row(language='en', sentiment='NEGATIVE', count=676615),
 Row(language='en', sentiment='NEUTRAL', count=2333867),
 Row(language='de', sentiment='NEGATIVE', count=4804),
 Row(language='de', sentiment='NEUTRAL', count=2102071),
 Row(language='de', sentiment='POSITIVE', count=98499)]

In [13]:
t1.show()

+-------------------+--------+---------+--------------------+--------------------+--------------------+
|                 id|language|sentiment|            location|                main|                date|
+-------------------+--------+---------+--------------------+--------------------+--------------------+
|1467622550934000011|      en| POSITIVE|            Schenkon| @beknuesel Fast :-)|2016-07-04T08:51:23Z|
|1467661901184400101|      en| NEGATIVE|            Svizzera|@sebdivo @ildivoo...|2016-07-04T17:05:55Z|
|1467664145156400055|      en|  NEUTRAL|           Neuchâtel|Need you by my side.|2016-07-04T17:30:19Z|
|1467656305452100106|      en| POSITIVE|                null|One Nation. One L...|2016-07-04T16:16:34Z|
|1467672681723400019|      en| POSITIVE|         Orlando, FL|Happy Independenc...|2016-07-04T22:05:47Z|
|1467663376923900019|      en|  NEUTRAL|         Switzerland|@Moreterm Full of...|2016-07-04T17:21:37Z|
|1467670928189100298|      en|  NEUTRAL|         Switzerland|@An

### Combining other data

In [13]:
base_path1 = 'file:///home/kirtan/Academics/EPFL/sem1/ADA/ADA-Project/twitter_data/gender/'
files1 = os.listdir('twitter_data/gender/')

In [14]:
df1 = sqlContext.read.parquet(base_path1 + files1[0])

In [15]:
df_twitter1 = df1

In [16]:
for f in tqdm(files1[1:]):
    temp = sqlContext.read.parquet(base_path1 + f)
    df_twitter1 = df_twitter1.unionAll(temp)
   
    



In [17]:
df_twitter1 = df_twitter1.dropDuplicates()

In [15]:
df_twitter1.count()

10557432

In [15]:
df_twitter1.agg(func.countDistinct('id')).show()

+---------+
|count(id)|
+---------+
| 10557432|
+---------+



In [18]:
main_df = t1.join(df_twitter1, t1.id == df_twitter1.id, 'inner').drop(df_twitter1.id)

In [17]:
main_df.count()

8719750

In [19]:
main_df = main_df.dropDuplicates().drop(main_df.canton)

In [44]:
main_df.count()

8500030

### Adding more data

In [20]:
base_path2 = 'file:///home/kirtan/Academics/EPFL/sem1/ADA/ADA-Project/twitter_data/canton/'
files2 = os.listdir('twitter_data/canton')

In [21]:
df2 = sqlContext.read.parquet(base_path2 + files2[0])

In [22]:
df_twitter2 = df2

In [23]:
for f in tqdm(files2[1:]):
    temp = sqlContext.read.parquet(base_path2 + f)
    df_twitter2 = df_twitter2.unionAll(temp)
   
    



In [24]:
df_twitter2 = df_twitter2.dropDuplicates()

In [25]:
final = main_df.join(df_twitter2, main_df.id == df_twitter2.id, 'left').drop(df_twitter2.id)

In [35]:
final.count()

8500030

In [26]:
final.groupBy(['language', 'sentiment']).count().sort(col("language").desc()).collect()

[Row(language='fr', sentiment='NEUTRAL', count=1963897),
 Row(language='fr', sentiment='POSITIVE', count=51940),
 Row(language='fr', sentiment='NEGATIVE', count=8040),
 Row(language='en', sentiment='POSITIVE', count=1387348),
 Row(language='en', sentiment='NEGATIVE', count=657896),
 Row(language='en', sentiment='NEUTRAL', count=2279445),
 Row(language='de', sentiment='NEGATIVE', count=4679),
 Row(language='de', sentiment='NEUTRAL', count=2050716),
 Row(language='de', sentiment='POSITIVE', count=96069)]

In [26]:
final = final.where((col('language') == 'en') | (col('sentiment').isin({'POSITIVE', 'NEGATIVE'})))

In [24]:
final.show()

+-------------------+--------+---------+-----------+--------------------+--------------------+-------+------+
|                 id|language|sentiment|   location|                main|                date| gender|canton|
+-------------------+--------+---------+-----------+--------------------+--------------------+-------+------+
|1451607480000009678|      en|  NEUTRAL|Switzerland|I really need to ...|2016-01-01T00:17:25Z| FEMALE|  null|
|1451612913000008971|      de| POSITIVE|      Basel|SteffiShy: Hier k...|2016-01-01T01:48:09Z| FEMALE|  null|
|1451613258000000293|      en|  NEUTRAL|Switzerland|@RealHarryHudson ...|2016-01-01T01:53:50Z|UNKNOWN|  null|
|1451619487000012173|      en|  NEUTRAL|Switzerland|#nowplaying Kid C...|2016-01-01T03:37:43Z|UNKNOWN|  null|
|1451620127000015104|      en|  NEUTRAL|       Bern|Could Everton be ...|2016-01-01T03:48:18Z|UNKNOWN|  null|
|1451621497000009805|      en|  NEUTRAL|     Geneva|@jayashree_64 Tha...|2016-01-01T04:10:52Z|UNKNOWN|  null|
|145162452

In [37]:
final.groupBy(['language', 'sentiment']).count().sort(col("language").desc()).collect()

[Row(language='fr', sentiment='POSITIVE', count=51940),
 Row(language='fr', sentiment='NEGATIVE', count=8040),
 Row(language='en', sentiment='POSITIVE', count=1387348),
 Row(language='en', sentiment='NEGATIVE', count=657896),
 Row(language='en', sentiment='NEUTRAL', count=2279445),
 Row(language='de', sentiment='NEGATIVE', count=4679),
 Row(language='de', sentiment='POSITIVE', count=96069)]

In [25]:
final.count()

4485417

In [26]:
print("Proportion with a canton:", final.filter(final.canton != 'null').count()/4485417)

Proportion with a canton: 0.37345312598583363


In [2]:
final = sqlContext.read.parquet('file:///home/kirtan/Academics/EPFL/sem1/ADA/ADA-Project/twitter_data/final_twitter.parquet')

In [3]:
dict_map =  pickle.load( open( "small_dict.p", "rb" ) )

In [30]:
len(loc)

17449

### Make dictionary

In [31]:
location = df_twitter.select(df_twitter.id, df_twitter.location)
canton = df_twitter2.select(df_twitter2.id, df_twitter2.canton)

In [32]:
df_dict = location.join(canton, location.id == canton.id, 'inner').drop(canton.id).drop(location.id)

In [29]:
df_dict1 = df_dict.filter(df_dict.canton != 'null')

In [35]:
lst = df_dict1.collect()

In [37]:
loc = {}

In [38]:
for row in lst:
    loc[row.location] = row.canton

In [33]:
df_dict_ = df_dict.filter(df_dict.canton == 'null').drop(df_dict.canton)

In [34]:
lst1 = df_dict_.collect()

In [35]:
locs = []
for row in lst1:
    locs.append(row.location)

In [36]:
locs_uniq = set(locs)

In [37]:
dict_map = {k:loc[k] for k in locs_uniq if k in loc}
for l in locs_uniq:
    if l not in dict_map.keys():
        dict_map[l] = 'null'

In [38]:
pickle.dump( dict_map, open( "small_dict.p", "wb" ) )

In [4]:
def map_location(value):
    if value in dict_map.keys():
        return(dict_map[value])
    else:
        return('null')
udfmap = udf(map_location, StringType())

In [5]:
final_loc = final.withColumn('canton_', udfmap("location"))

In [6]:
f = final_loc.drop(final_loc.canton)

In [22]:
print("Proportion with a canton:", final_loc.filter(final_loc.canton_ != 'null').count()/4485417)

Proportion with a canton: 0.9803710558015007


In [25]:
len(set(dict_map.values()))

33

In [7]:
bad = ['Baden-Württemberg', 'Bavaria', 'Kocaeli', 'Lower Silesian Voivodeship', 'Maha Sarakham', 'Rio de Janeiro', 'São Paulo', 'null']

In [8]:
f = f.drop(f.location)

In [29]:
f.show()

+-------------------+--------+---------+--------------------+--------------------+-------+----------+
|                 id|language|sentiment|                main|                date| gender|   canton_|
+-------------------+--------+---------+--------------------+--------------------+-------+----------+
|1451606716000000246|      en|  NEUTRAL|Gregory Larsen - ...|2016-01-01T00:04:33Z|UNKNOWN|    Zurich|
|1451606971000001136|      en|  NEUTRAL|@killMOOSEkill @W...|2016-01-01T00:04:02Z|UNKNOWN|      Vaud|
|1451611676000005396|      en|  NEUTRAL|Leave charging 20...|2016-01-01T01:27:19Z|UNKNOWN|      Vaud|
|1451612158000004904|      en|  NEUTRAL|Happy new year fr...|2016-01-01T01:35:30Z|UNKNOWN|    Glarus|
|1451612584000003207|      en|  NEUTRAL|@Yanruliu1 Accord...|2016-01-01T01:42:39Z|UNKNOWN|    Zurich|
|1451625427000008147|      en|  NEUTRAL|AccentForex | “Ca...|2016-01-01T05:16:46Z|UNKNOWN|      Vaud|
|1451637836000007370|      en|  NEUTRAL|@malcolm_fox2 Che...|2016-01-01T08:43:22Z|

In [30]:
f.groupBy(['language', 'sentiment']).count().sort(col("language").desc()).collect()

[Row(language='fr', sentiment='POSITIVE', count=51940),
 Row(language='fr', sentiment='NEGATIVE', count=8040),
 Row(language='en', sentiment='POSITIVE', count=1387348),
 Row(language='en', sentiment='NEGATIVE', count=657896),
 Row(language='en', sentiment='NEUTRAL', count=2279445),
 Row(language='de', sentiment='NEGATIVE', count=4679),
 Row(language='de', sentiment='POSITIVE', count=96069)]

In [9]:
import time

In [10]:
def time_of_day(iso_time):
    time_struct = time.strptime(iso_time, "%Y-%m-%dT%H:%M:%SZ")
    hour = time_struct.tm_hour
    if 6 <= hour < 18:
        return "Day"
    else:
        return "Night"

In [11]:
def month(iso_time):
    time_struct = time.strptime(iso_time, "%Y-%m-%dT%H:%M:%SZ")
    return time_struct.tm_mon

In [12]:
def sentiment_map(sentiment):
    if(sentiment == 'NEGATIVE'):
        return(-1)
    elif(sentiment == 'POSITIVE'):
        return(1)
    elif(sentiment == 'NEUTRAL'):
        return(0)
    else:
        return(2)

In [13]:
udfTOD = udf(time_of_day, StringType())
udfMonth = udf(month, StringType())

In [14]:
udfSenti = udf(sentiment_map, IntegerType())

In [15]:
f1 = f.withColumn('time_period', udfTOD("date"))
f2 = f1.withColumn('month', udfMonth("date"))

In [16]:
f3 = f2.withColumn('sentiment_', udfSenti("sentiment"))

In [41]:
f3.show()

+-------------------+--------+---------+--------------------+--------------------+-------+----------+-----------+-----+----------+
|                 id|language|sentiment|                main|                date| gender|   canton_|time_period|month|sentiment_|
+-------------------+--------+---------+--------------------+--------------------+-------+----------+-----------+-----+----------+
|1451606716000000246|      en|  NEUTRAL|Gregory Larsen - ...|2016-01-01T00:04:33Z|UNKNOWN|    Zurich|      Night|    1|         0|
|1451606971000001136|      en|  NEUTRAL|@killMOOSEkill @W...|2016-01-01T00:04:02Z|UNKNOWN|      Vaud|      Night|    1|         0|
|1451611676000005396|      en|  NEUTRAL|Leave charging 20...|2016-01-01T01:27:19Z|UNKNOWN|      Vaud|      Night|    1|         0|
|1451612158000004904|      en|  NEUTRAL|Happy new year fr...|2016-01-01T01:35:30Z|UNKNOWN|    Glarus|      Night|    1|         0|
|1451612584000003207|      en|  NEUTRAL|@Yanruliu1 Accord...|2016-01-01T01:42:39Z|U

In [17]:
f4 = f3.drop(f3.sentiment).drop(f3.date)

In [18]:
for b in bad:
    f4 = f4.filter(f4.canton_ != b)

In [24]:
f4.show()

+-------------------+--------+--------------------+-------+----------+-----------+-----+----------+---+
|                 id|language|                main| gender|   canton_|time_period|month|sentiment_|tmp|
+-------------------+--------+--------------------+-------+----------+-----------+-----+----------+---+
|1451606716000000246|      en|Gregory Larsen - ...|UNKNOWN|    Zurich|      Night|    1|         0|  1|
|1451606971000001136|      en|@killMOOSEkill @W...|UNKNOWN|      Vaud|      Night|    1|         0|  1|
|1451611676000005396|      en|Leave charging 20...|UNKNOWN|      Vaud|      Night|    1|         0|  1|
|1451612158000004904|      en|Happy new year fr...|UNKNOWN|    Glarus|      Night|    1|         0|  1|
|1451612584000003207|      en|@Yanruliu1 Accord...|UNKNOWN|    Zurich|      Night|    1|         0|  1|
|1451625427000008147|      en|AccentForex | “Ca...|UNKNOWN|      Vaud|      Night|    1|         0|  1|
|1451637836000007370|      en|@malcolm_fox2 Che...|UNKNOWN|     

In [19]:
from pyspark.sql.functions import lit
f4 = f4.withColumn('tmp', lit(1))

In [26]:
f5 = f4.drop(f4.id).drop(f4.main)

In [30]:
f5.show()

+--------+-------+----------+-----------+-----+----------+---+
|language| gender|   canton_|time_period|month|sentiment_|tmp|
+--------+-------+----------+-----------+-----+----------+---+
|      en|UNKNOWN|    Zurich|      Night|    1|         0|  1|
|      en|UNKNOWN|      Vaud|      Night|    1|         0|  1|
|      en|UNKNOWN|      Vaud|      Night|    1|         0|  1|
|      en|UNKNOWN|    Glarus|      Night|    1|         0|  1|
|      en|UNKNOWN|    Zurich|      Night|    1|         0|  1|
|      en|UNKNOWN|      Vaud|      Night|    1|         0|  1|
|      en|UNKNOWN|      Vaud|        Day|    1|         0|  1|
|      en|UNKNOWN|      Vaud|        Day|    1|         0|  1|
|      en|UNKNOWN|    Zurich|        Day|    1|         0|  1|
|      en|   MALE|    Geneva|        Day|    1|         0|  1|
|      en|UNKNOWN|Basel-City|        Day|    1|         0|  1|
|      en|UNKNOWN|      Vaud|        Day|    1|         0|  1|
|      en| FEMALE|      Vaud|        Day|    1|        

In [28]:
f4.write.parquet('file:///home/kirtan/Academics/EPFL/sem1/ADA/ADA-Project/twitter_data/final_twitter1.parquet')

In [29]:
f5.write.parquet('file:///home/kirtan/Academics/EPFL/sem1/ADA/ADA-Project/twitter_data/final_twitter2.parquet')

In [35]:
f5 = f5.withColumn('tmp2', f5.sentiment_*f5.tmp)

In [36]:
f5.schema

StructType(List(StructField(language,StringType,true),StructField(gender,StringType,true),StructField(canton_,StringType,true),StructField(time_period,StringType,true),StructField(month,StringType,true),StructField(sentiment_,IntegerType,true),StructField(tmp,IntegerType,false),StructField(tmp2,IntegerType,true)))

In [48]:
f6 = f5.groupBy('canton_', 'month', 'language', 'gender', 'time_period').agg((func.sum('tmp2')/func.sum('tmp')).alias('avg_sentiment'))

In [49]:
f6.show()

+----------------+-----+--------+-------+-----------+-------------------+
|         canton_|month|language| gender|time_period|      avg_sentiment|
+----------------+-----+--------+-------+-----------+-------------------+
|      Basel-City|    7|      en|   MALE|        Day|0.16316931982633864|
|          Geneva|    8|      en|   MALE|        Day|0.13686382393397525|
|         Thurgau|    8|      en|   MALE|        Day|0.10526315789473684|
|       Solothurn|    9|      en| FEMALE|      Night| 0.2608695652173913|
|          Schwyz|    5|      en|UNKNOWN|        Day|                0.5|
|Basel-Landschaft|    5|      en| FEMALE|        Day|                0.2|
|      Basel-City|    9|      fr|UNKNOWN|      Night|                1.0|
|          Glarus|    4|      en|   MALE|        Day|                1.0|
|      Basel-City|    1|      de|UNKNOWN|        Day| 0.8735042735042735|
|            Vaud|    4|      fr|   MALE|        Day| 0.8545454545454545|
|            Bern|    6|      de|   MA

In [50]:
f7 = f6.selectExpr("canton_ as canton", "month as month", "language as language", "gender as gender", "time_period as time_period",
                  "avg_sentiment as avg_sentiment")

In [51]:
f7.show()

+----------------+-----+--------+-------+-----------+-------------------+
|          canton|month|language| gender|time_period|      avg_sentiment|
+----------------+-----+--------+-------+-----------+-------------------+
|      Basel-City|    7|      en|   MALE|        Day|0.16316931982633864|
|          Geneva|    8|      en|   MALE|        Day|0.13686382393397525|
|         Thurgau|    8|      en|   MALE|        Day|0.10526315789473684|
|       Solothurn|    9|      en| FEMALE|      Night| 0.2608695652173913|
|          Schwyz|    5|      en|UNKNOWN|        Day|                0.5|
|Basel-Landschaft|    5|      en| FEMALE|        Day|                0.2|
|      Basel-City|    9|      fr|UNKNOWN|      Night|                1.0|
|          Glarus|    4|      en|   MALE|        Day|                1.0|
|      Basel-City|    1|      de|UNKNOWN|        Day| 0.8735042735042735|
|            Vaud|    4|      fr|   MALE|        Day| 0.8545454545454545|
|            Bern|    6|      de|   MA

In [52]:
f7.write.parquet('file:///home/kirtan/Academics/EPFL/sem1/ADA/ADA-Project/twitter_data/final_twitter3.parquet')

In [53]:
f7.count()

2448

In [54]:
fp = f7.toPandas()

In [62]:
fp.to_csv('twitter_data/twitter.csv', sep=',', encoding='utf-8')