In [3]:
import json
from pandas.io.json import json_normalize
import pyspark
import pandas as pd
import numpy as np
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType, IntegerType, DoubleType
import functions as fct
from pyspark.ml.feature import HashingTF, IDF, Tokenizer
from pyspark.ml import Pipeline
from pyspark.ml.classification import NaiveBayes
from pyspark.sql import functions as F
from pyspark.mllib.util import MLUtils
from pyspark.mllib.evaluation import MulticlassMetrics
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)

In [4]:
df = sqlContext.read.json('one_data_each_month/*.json')
interest = df.select("_source.main", "_source.sentiment", "_source.lang", "_source.tags", "_source.date_found")

In [3]:
interest.count()

11294

# Text preprocessing 

In [5]:
first_clean_udf = udf(lambda s: fct.first_clean(s), StringType())
remove_features_udf = udf(lambda s: fct.remove_features(s), StringType())
strip_accents_udf = udf(lambda s: fct.strip_accents(s), StringType())
stops_udf = udf(lambda s: fct.remove_stops(s), StringType())

# Remove instagram noise: # tags, @mentions and 'http://' url ..
interest1 = interest.withColumn('main', first_clean_udf(interest.main))

# Remove all accents 
interest2 = interest1.withColumn('main', strip_accents_udf(interest1.main))

# Remove all punctuation except ., ! and ?, they could indicate a emotion sign 
# !!! for excitation or ... for annoyance
interest3 = interest2.withColumn('main', remove_features_udf(interest2.main))

# Remove english stopwords, all in lower case
interest4 = interest3.withColumn('main', stops_udf(interest3.main))

# Removing all text lower than 8 characters
interest5 = interest4.filter(interest4.main!='')

# saving result
interest5.write.save("afterprocessing.parquet",mode='overwrite', format="parquet")
after_process = sqlContext.read.parquet("afterprocessing.parquet")

# Separation

# with sentiment value [ML training]

In [6]:
# Keep in a dataframe those with sentiment given
mlinterest = after_process.na.drop(subset=["sentiment"])
sent_value = udf(lambda s: fct.sentiment_values(s), IntegerType())

# Change sentiment into numeric value 0, 1 and 2
MLinterest = mlinterest.withColumn('label', sent_value(mlinterest.sentiment))

# Keep only those in english
MLINTEREST = MLinterest.filter(MLinterest.lang=="en")

# Change of type for the ML pipeline
MLINTEREST1 = MLINTEREST.withColumn("label", MLINTEREST.label.cast(DoubleType()))

In [7]:
# Create a ML pipeline using TF-IDF for vectorization of words
tokenizer = Tokenizer(inputCol="main", outputCol="words")
hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures")
idf = IDF(inputCol="rawFeatures", outputCol="features")
nb = NaiveBayes()
pipeline = Pipeline(stages=[tokenizer, hashingTF, idf, nb])

# Train our model
model = pipeline.fit(MLINTEREST1)

# Accuracy of the model

In [8]:
#MLINTEREST1.show(5)

In [26]:
z=MLINTEREST1.withColumnRenamed('label', 'true_label')
pred =model.transform(z) 

In [39]:
wesh=pred.select('prediction', 'true_label').rdd

In [40]:
metrics = MulticlassMetrics(wesh)

In [41]:
precision = metrics.precision()
recall = metrics.recall()
f1Score = metrics.fMeasure()
print("Summary Stats")
print("Precision = %s" % precision)
print("Recall = %s" % recall)
print("F1 Score = %s" % f1Score)

Summary Stats
Precision = 0.9481446241674596
Recall = 0.9481446241674596
F1 Score = 0.9481446241674596


# without sentiment value [ML Prediction]

In [10]:
# Now keep in a dataframe those without sentiment value
ml_topredict = after_process.filter(after_process.sentiment.isNull())

# english detection using Langid library
check_english_udf = udf(lambda s: fct.check_english(s), StringType())
ml_topredict1 = ml_topredict.withColumn("lang", check_english_udf(ml_topredict["main"]))

# keep only the english predicted instagrams
ml_topredict2 = ml_topredict1.filter(ml_topredict1["lang"] == "en")

# Use our model (last section) to predict new sentiment values
prediction = model.transform(ml_topredict2)
prediction = prediction.withColumnRenamed('prediction', 'label').select('main','label','date_found','tags')

In [11]:
# Merge the two dataframe to have all sentiments tpgether
final0= MLINTEREST1.select('main','label','date_found', 'tags')
final = prediction.unionAll(final0)
mois_num_udf = udf(lambda s: fct.mois_num(s), StringType())
final = final.withColumn('date_found', mois_num_udf(final['date_found']))

# Save results
final.write.save("final.parquet",mode = 'overwrite', format="parquet")
load_final = sqlContext.read.parquet("final.parquet")

# localisation based on  hashtags

In [6]:
# Create a liste of all the municipalities in Switzerland and their canton
final_df = fct.create_city_id()
final_df.head(), final_df.shape

(  canton    id  len_name          name
 0     GR  3501      10.0    alvaschein
 1     SG  3403      12.0  ganterschwil
 2     GR  3523       6.0        wiesen
 3     GR  3522       7.0       filisur
 4     GR  3521       6.0        bergun, (5750, 4))

In [108]:
canton_udf=udf(lambda s: s.split(',')[1][:-1],StringType())
city_udf=udf(lambda s: s.split(',')[0][1:],StringType())
localise_udf = udf(lambda s: fct.localise(s, final_df), StringType())

# Add a new column which give the city and the canton for each instagram, give -1 when nothing is found
localize = load_final.withColumn('site', localise_udf(load_final.tags))
localize1=localize.withColumn('canton',canton_udf(localize.site)).withColumn('city',city_udf(localize.site))

# We remove all the instagrams where no location was find
localize2 = localize1.filter(localize1.canton!='-1')

# We save the results
localize2.write.save("final_localization.parquet",mode = 'overwrite', format="parquet")
load_final_local = sqlContext.read.parquet("final_localization.parquet")

## Visualization pre-processing

Operations on dataframes for the final vizualization results

#### Stats on the sentiments

In [None]:
# pie chart pos/neg/neutre 
pnn_pie = load_final.groupBy('label').count().show()

In [None]:
#pos/neg/neutre per month
pnn_per_month = load_final.crosstab('label', 'date_found')

In [None]:
# Tag study
a = load_final.select('tags').flatMap(lambda line: line[0]).map(lambda tag: (tag,1)).reduceByKey(lambda x,y: x+y)
tag_count = a.toDF()
# stat about tags
stats_tags = tag_count.describe()

# Tag count for happy/neg/neutral instagrams
for i in [0, 1, 2]:
    # Tag count
    a = load_final.filter(load_final.label==i).select('tags').flatMap(lambda line: line[0]).map(lambda tag: (tag,1)).reduceByKey(lambda x,y: x+y)
    # Keep 1000 more used tags
    sentiment_tags = text1.toDF().sort(F.desc("_2")).limit(1000)

#### Stats on the geo-location

In [7]:
# for canton map
# MARKERS
for i in [0,1,2]:
    per_canton_month = load_final_local.filter(load_final_local['label']==i).crosstab('site', 'date_found')
    #per_canton_month.write.save("per_canton_month.parquet", format="parquet")

#CHOROPLETH MAP
nb_hapinsta = load_final_local.filter(load_final_local["label"]==0).groupBy('site').count()
nb_insta = load_final_local.groupBy('site').count().withColumnRenamed('count', 'total')
final_insta = nb_hapinsta.join(nb_insta, on = 'site')

In [129]:
# For the city map
city_map = load_final_local_city.groupBy('site').count()