# Emotional contagion through social networks
ref: https://www.pnas.org/content/111/24/8788

In [5]:
from IPython.core.display import display, HTML
display(HTML("<style>.container { width:100% !important; }</style>"))

In [6]:
import findspark
findspark.init('/Users/sdargude/Playground/Spark/spark-2.4.5-bin-hadoop2.7')

In [7]:
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.functions import explode
from pyspark.sql.functions import split

## Initialize Spark Session.

In [8]:
spark = SparkSession.builder.appName("Twitter_").getOrCreate()

In [9]:
spark

## Load Delta Table.

In [10]:
spark.sparkContext.setLogLevel("ERROR")
tweets = spark.read.format("delta").load("deltatables/processed_new")
tweets.printSchema()
tweets.createOrReplaceTempView("tweettable")
tweets = spark.sql("select *, to_date(date) as DayofYear from tweettable")
tweets.createOrReplaceTempView("tweettable")
tweets.printSchema()

root
 |-- tweetid: long (nullable = true)
 |-- friendname: string (nullable = true)
 |-- profilename: string (nullable = true)
 |-- text: string (nullable = true)
 |-- date: string (nullable = true)
 |-- Sentiment: struct (nullable = true)
 |    |-- sentiment: integer (nullable = true)
 |    |-- psentiment: integer (nullable = true)
 |    |-- ngsentiment: integer (nullable = true)
 |    |-- nsentiment: integer (nullable = true)
 |    |-- nltk_sentiment: integer (nullable = true)
 |    |-- nltk_psentiment: integer (nullable = true)
 |    |-- nltk_ngsentiment: integer (nullable = true)
 |    |-- nltk_nsentiment: integer (nullable = true)

root
 |-- tweetid: long (nullable = true)
 |-- friendname: string (nullable = true)
 |-- profilename: string (nullable = true)
 |-- text: string (nullable = true)
 |-- date: string (nullable = true)
 |-- Sentiment: struct (nullable = true)
 |    |-- sentiment: integer (nullable = true)
 |    |-- psentiment: integer (nullable = true)
 |    |-- ngsentimen

## Create  a in memory table/View with flatten Sentiment attributes.

In [11]:
SQL  =    "select tweetid as TweetId, friendname as Friendname, \
           profilename as ProfileName, text as Text, DayofYear as Date, \
           Sentiment.nltk_sentiment as Nltk_Sentiment, \
           Sentiment.nltk_psentiment Nltk_Positive,\
           Sentiment.nltk_ngsentiment Nltk_Negative,\
           Sentiment.nltk_nsentiment Nltk_Neutral,\
           Sentiment.sentiment as Sentiment, \
           Sentiment.psentiment as Positive,\
           Sentiment.ngsentiment as Negative,\
           Sentiment.nsentiment as Neutral\
           from tweettable order by DayofYear desc"

df = spark.sql(SQL)
df.show()
df.createOrReplaceTempView("tweettable")

+-------------------+--------------+------------+--------------------+----------+--------------+-------------+-------------+------------+---------+--------+--------+-------+
|            TweetId|    Friendname| ProfileName|                Text|      Date|Nltk_Sentiment|Nltk_Positive|Nltk_Negative|Nltk_Neutral|Sentiment|Positive|Negative|Neutral|
+-------------------+--------------+------------+--------------------+----------+--------------+-------------+-------------+------------+---------+--------+--------+-------+
|1281386382078828544|     Tamanna22|EnayetSpeaks|Etni izzat kyu de...|2020-07-10|             1|            0|            0|           1|        2|       1|       0|      0|
|1281379357714333696|     urspessi1|EnayetSpeaks|      @vinodpdg Done|2020-07-10|             1|            0|            0|           1|        2|       1|       0|      0|
|1281386838943387648|    khanumarfa|EnayetSpeaks|RT @rohini_sgh: D...|2020-07-10|             1|            0|            0|      

In [12]:
df.printSchema()

root
 |-- TweetId: long (nullable = true)
 |-- Friendname: string (nullable = true)
 |-- ProfileName: string (nullable = true)
 |-- Text: string (nullable = true)
 |-- Date: date (nullable = true)
 |-- Nltk_Sentiment: integer (nullable = true)
 |-- Nltk_Positive: integer (nullable = true)
 |-- Nltk_Negative: integer (nullable = true)
 |-- Nltk_Neutral: integer (nullable = true)
 |-- Sentiment: integer (nullable = true)
 |-- Positive: integer (nullable = true)
 |-- Negative: integer (nullable = true)
 |-- Neutral: integer (nullable = true)



## Create in memory view for tweets sent by 'Friends'.

In [14]:
query = "Select Date,\
         ProfileName,Friendname,count(TweetId) as Total,\
         Sum(Nltk_Positive) as Nltk_Positive,\
         Sum(Nltk_Negative) as Nltk_Negative,\
         Sum(Nltk_Neutral) as Nltk_Neutral,\
         Sum(Positive) as Positive,\
         Sum(Negative) as Negative,\
         Sum(Neutral) as Neutral,\
         Avg(Sentiment) as Sent_Avg,\
         Avg(Nltk_Sentiment) as Nltk_Avg\
         from tweettable \
         where Friendname != ProfileName \
         group by ProfileName, Date,Friendname\
         order by Date Desc,ProfileName"
left_df = spark.sql(query)
left_df.show()
left_df.createOrReplaceTempView("timeline")

+----------+---------------+--------------+-----+-------------+-------------+------------+--------+--------+-------+------------------+------------------+
|      Date|    ProfileName|    Friendname|Total|Nltk_Positive|Nltk_Negative|Nltk_Neutral|Positive|Negative|Neutral|          Sent_Avg|          Nltk_Avg|
+----------+---------------+--------------+-----+-------------+-------------+------------+--------+--------+-------+------------------+------------------+
|2020-07-10|      AskAnshul|         earth|    2|            0|            0|           2|       0|       0|      2|               1.0|               1.0|
|2020-07-10|      AskAnshul|    AartiTikoo|    2|            0|            0|           2|       0|       2|      0|               0.0|               1.0|
|2020-07-10|      AskAnshul|Captain_Mani72|    4|            0|            0|           4|       4|       0|      0|               2.0|               1.0|
|2020-07-10|      AskAnshul|      telegram|    4|            0|       

## Create in memory view for tweet  sent by 'Profile'

In [15]:
query = "Select Date,\
         ProfileName,Friendname,count(TweetId) as Total,\
         Sum(Nltk_Positive) as Nltk_Positive,\
         Sum(Nltk_Negative) as Nltk_Negative,\
         Sum(Nltk_Neutral) as Nltk_Neutral,\
         Sum(Positive) as Positive,\
         Sum(Negative) as Negative,\
         Sum(Neutral) as Neutral,\
         Avg(Sentiment) as Sent_Avg,\
         Avg(Nltk_Sentiment) as Nltk_Avg\
         from tweettable \
         where Friendname = ProfileName \
         group by ProfileName, Date,Friendname\
         order by Date Desc,ProfileName"
right_df = spark.sql(query)
right_df.show()
right_df.createOrReplaceTempView("profiletweet")

+----------+---------------+---------------+-----+-------------+-------------+------------+--------+--------+-------+--------+--------+
|      Date|    ProfileName|     Friendname|Total|Nltk_Positive|Nltk_Negative|Nltk_Neutral|Positive|Negative|Neutral|Sent_Avg|Nltk_Avg|
+----------+---------------+---------------+-----+-------------+-------------+------------+--------+--------+-------+--------+--------+
|2020-07-09|      AskAnshul|      AskAnshul|   10|            0|            0|          10|       2|       4|      4|     0.8|     1.0|
|2020-07-09|   EnayetSpeaks|   EnayetSpeaks|   50|            0|            4|          46|      18|       2|     30|    1.32|    0.92|
|2020-07-09|realDonaldTrump|realDonaldTrump|   20|            0|            0|          20|       7|       4|      9|    1.15|     1.0|
|2020-07-08|      AskAnshul|      AskAnshul|    8|            0|            0|           8|       2|       2|      4|     1.0|     1.0|
|2020-07-08|realDonaldTrump|realDonaldTrump|    

## Aggreage a in-memory view for tweets sent by 'Profile' group by Profilename

In [16]:
query = "Select Date, ProfileName, sum(Total) as Total, \
        sum(Nltk_Positive) as Nltk_Positive , sum(Nltk_Negative) as Nltk_Negative, \
        sum(Nltk_Neutral) as Nltk_Neutral, sum(Positive) as Positive , \
        sum(Negative) as Negative, sum(Neutral) as Neutral, \
        sum(Sent_Avg) as Sent_Avg, sum(Nltk_Avg) as Nltk_Avg\
        from profiletweet group by ProfileName,Date"
agg_profile_df = spark.sql(query)
agg_profile_df.show()
agg_profile_df.createOrReplaceTempView("aggprofile")

+----------+---------------+-----+-------------+-------------+------------+--------+--------+-------+--------+--------+
|      Date|    ProfileName|Total|Nltk_Positive|Nltk_Negative|Nltk_Neutral|Positive|Negative|Neutral|Sent_Avg|Nltk_Avg|
+----------+---------------+-----+-------------+-------------+------------+--------+--------+-------+--------+--------+
|2020-07-09|      AskAnshul|   10|            0|            0|          10|       2|       4|      4|     0.8|     1.0|
|2020-07-09|   EnayetSpeaks|   50|            0|            4|          46|      18|       2|     30|    1.32|    0.92|
|2020-07-09|realDonaldTrump|   20|            0|            0|          20|       7|       4|      9|    1.15|     1.0|
|2020-07-08|      AskAnshul|    8|            0|            0|           8|       2|       2|      4|     1.0|     1.0|
|2020-07-08|realDonaldTrump|    5|            0|            0|           5|       1|       1|      3|     1.0|     1.0|
|2020-07-07|      AskAnshul|    8|      

## Aggregate  in-memory view for tweets sent by 'Friends' group by Profilename

In [17]:
query = "Select Date, ProfileName, sum(Total) as Total, count(Friendname) as Friendname, \
        sum(Nltk_Positive) as Nltk_Positive, sum(Nltk_Negative) as Nltk_Negative, \
        sum(Nltk_Neutral) as Nltk_Neutral, sum(Positive) as Positive, \
        sum(Negative) as Negative, sum(Neutral) as Neutral, \
        Avg(Sent_Avg) as Sent_Avg, Avg(Nltk_Avg) as Nltk_Avg \
        from timeline group by ProfileName,Date"
agg_timeline_df = spark.sql(query)
agg_timeline_df.show()
agg_timeline_df.createOrReplaceTempView("aggtimeline")

+----------+---------------+-----+----------+-------------+-------------+------------+--------+--------+-------+------------------+------------------+
|      Date|    ProfileName|Total|Friendname|Nltk_Positive|Nltk_Negative|Nltk_Neutral|Positive|Negative|Neutral|          Sent_Avg|          Nltk_Avg|
+----------+---------------+-----+----------+-------------+-------------+------------+--------+--------+-------+------------------+------------------+
|2020-07-10|      AskAnshul|   12|         4|            0|            0|          12|       8|       2|      2|              1.25|               1.0|
|2020-07-10|   EnayetSpeaks|   34|         4|            0|            0|          32|      18|       2|     14|1.3229166666666667|           1.03125|
|2020-07-10|realDonaldTrump|   27|        11|            0|            0|          24|       6|       8|     13|0.8136363636363636|1.0515151515151517|
|2020-07-09|      AskAnshul|  854|        40|            0|            6|         794|     272

## Join Friend aggregation and Profile aggregations.

In [19]:
query = "Select t.*, p.Total as P_Total,\
         p.Nltk_Positive as P_Nltk_Positive,\
         p.Nltk_Negative as P_Nltk_Negative,\
         p.Nltk_Neutral as P_Nltk_Neutral,\
         p.Positive as P_Positive,\
         p.Negative as P_Negative,\
         p.Neutral as P_Neutral,\
         p.Sent_Avg as P_Sent_Avg,\
         p.Nltk_Avg as P_Nltk_Avg\
         from aggtimeline as t\
         inner join aggprofile as p\
         on t.Date = p.Date and t.ProfileName = p.ProfileName"

all_agg_df = spark.sql(query)
all_agg_df.show()
all_agg_df.createOrReplaceTempView("aggall")

+----------+---------------+-----+----------+-------------+-------------+------------+--------+--------+-------+------------------+------------------+-------+---------------+---------------+--------------+----------+----------+---------+----------+----------+
|      Date|    ProfileName|Total|Friendname|Nltk_Positive|Nltk_Negative|Nltk_Neutral|Positive|Negative|Neutral|          Sent_Avg|          Nltk_Avg|P_Total|P_Nltk_Positive|P_Nltk_Negative|P_Nltk_Neutral|P_Positive|P_Negative|P_Neutral|P_Sent_Avg|P_Nltk_Avg|
+----------+---------------+-----+----------+-------------+-------------+------------+--------+--------+-------+------------------+------------------+-------+---------------+---------------+--------------+----------+----------+---------+----------+----------+
|2020-07-09|      AskAnshul|  854|        40|            0|            6|         794|     272|     184|    398|1.1364458195911111|1.0456313466917013|     10|              0|              0|            10|         2|    

# Check contingent Effect

In [20]:
query = "Select Date, ProfileName, Total,P_Total,\
         Sent_Avg,P_Sent_Avg,\
         Nltk_Avg,P_Nltk_Avg\
         from aggall order by ProfileName"
spark.sql(query).show()

+----------+---------------+-----+-------+------------------+----------+------------------+----------+
|      Date|    ProfileName|Total|P_Total|          Sent_Avg|P_Sent_Avg|          Nltk_Avg|P_Nltk_Avg|
+----------+---------------+-----+-------+------------------+----------+------------------+----------+
|2020-07-07|      AskAnshul|  220|      8| 1.167577895355673|       1.0|1.1015873015873014|      1.25|
|2020-07-09|      AskAnshul|  854|     10|1.1364458195911111|       0.8|1.0456313466917013|       1.0|
|2020-07-08|      AskAnshul|  352|      8|1.1358884766027626|       1.0| 1.027845804988662|       1.0|
|2020-07-06|      AskAnshul|  186|      8|1.1590025252525251|       1.0|1.0808964646464645|      1.25|
|2020-07-05|      AskAnshul|  170|      8|1.1491228070175439|      0.75|              1.05|       1.0|
|2020-07-04|      AskAnshul|  122|      8| 1.013020833333333|      1.75|         1.0515625|       1.0|
|2020-07-09|   EnayetSpeaks| 1638|     50|1.1380178939644578|      1.32|1