In [0]:
from pyspark.sql.functions import *


In [0]:
positive  = ["gagner", "pepite", "victoire", "espoir", "exceptionnel","arracher", "grand", "fort", "bons", "au dessus", "succès", "triomphe", "consécration", "sacre", "avantage", "connexion", "lien vert", "incroyable", "felicitations", "record", "allezlesbleus", "miracle", "heureux", "plaisir", "monstrueux", "poulet", "courage", "déclic", "interessant", "honneur", "confiance", "solide", "chance", "s'impose", "bien", "tempete", "vitesse", "missile", "back", "sourire"]

negative = ["nul", "défaite", "perdu", "echec", "malheureusement","perdre", "echouer", "miserable", "triste", "branlée", "minables", "dessous", "fiasco", "raclée", "fessée", "rater", "louper", "manquer", "pire", "malade", "boycott", "vol", "catastrophique", "disqualifié", "inattendu", "irrespectueux", "fausse", "clown", "pourri", "merde", "bouse", "catastrophe", "blaireaux", "bouffon", "honte", "plot", "raciste", "immigré", "maghrebin","purge", "problématique", "hideuse", "siffle", "athletico", "atletico", "déteste", "faible", "immonde", "degueulasse", "perfide", "stress", "hue", "sifflement", "sifflet", "guez", "vide", "dechet", "degout" ]



In [0]:
@udf
def compute(tweet):
    """
    Simple list checking and point system
    Note: UDF use might be suboptimal here but it's ok for a project of this size
    """
    score = 0
    for x in tweet.split():
        if x in positive:
            score +=1
        elif x in negative:
            score -= 1
        else:
            continue
    return score



In [0]:
df = spark.read.format("delta").load("/batch_location")

In [0]:
df_stream = spark.read.format("delta").load("/stream_output/output/") 

In [0]:
%sql

OPTIMIZE "dbfs:/stream_output/output"

In [0]:
merged_df = df.unionByName(df_stream.dropna()).dropDuplicates()
merged_df.count()

Out[7]: 598459

In [0]:
merged_df.write.option("path","/stream_output/raw").mode("overwrite").saveAsTable("raw_data")

In [0]:
#OPTIMIZED BY SPARK
test_df = (merged_df
          .withColumn("words", explode(split(col("tweet")," "))) 
          .withColumn("score", when(col("words").isin(positive), 1).when(col("words").isin(negative), -1).otherwise(0)) #sentiment analysis
          .withColumn("date", to_date(col("created_at")))
          .groupBy("date")
          .sum("score")
          .orderBy(desc("date"))
          )
display(test_df)

date,sum(score)
2022-12-21,770
2022-12-20,1808
2022-12-19,3843
2022-12-18,3937
2022-12-17,1165
2022-12-16,977
2022-12-15,2405
2022-12-14,7251
2022-12-13,951
2022-12-12,600


Output can only be rendered in Databricks

In [0]:
tweet_count_daily = (merged_df
 .withColumn("date", to_date(col("created_at")))
 .groupBy("date")
 .count()
 .orderBy("count")
 )
display(tweet_count_daily)

date,count
2022-11-25,3
2022-11-26,9
2022-11-24,15
2022-11-27,19
2022-11-28,23
2022-11-23,26
2022-11-29,42
2022-12-07,52
2022-12-06,57
2022-12-02,205


In [0]:
computed_daily = (merged_df
           .withColumn("score", compute("tweet"))
           .withColumn("date", to_date(col("created_at")))
           .groupBy("date")
           .agg({"score":"sum"}).alias("total_score")
           .orderBy(desc("date"))
           )
display(computed_daily)

date,sum(score)
2022-12-18,379.0
2022-12-17,1177.0
2022-12-16,976.0
2022-12-15,2413.0
2022-12-14,7274.0
2022-12-13,958.0
2022-12-12,602.0
2022-12-11,1818.0
2022-12-10,3566.0
2022-12-09,83.0


Output can only be rendered in Databricks

In [0]:
computed_daily_described = (merged_df
           .withColumn("score", compute("tweet"))
           .withColumn("negative", expr('CASE WHEN score<0 THEN score ELSE 0 END'))
           .withColumn("positive", expr('CASE WHEN score>0 THEN score ELSE 0 END'))
           .withColumn("date", to_date(col("created_at")))
           .groupBy("date")
           .agg(sum("score").alias("total_score"), 
                sum("positive").alias("total_positive"),
                sum("negative").alias("total_negative"),
                round((sum("positive")/count("*"))*100, 2).alias("percentage_pos"))
           .orderBy(desc("date"))
           )
display(computed_daily_described)

date,total_score,total_positive,total_negative,percentage_pos
2022-12-21,752.0,1605.0,-853.0,11.54
2022-12-20,1801.0,3660.0,-1859.0,11.48
2022-12-19,3836.0,8418.0,-4582.0,13.14
2022-12-18,3953.0,11064.0,-7111.0,9.71
2022-12-17,1177.0,1801.0,-624.0,12.01
2022-12-16,976.0,1498.0,-522.0,10.22
2022-12-15,2413.0,3727.0,-1314.0,11.62
2022-12-14,7273.0,11693.0,-4420.0,10.38
2022-12-13,957.0,1494.0,-537.0,10.62
2022-12-12,602.0,899.0,-297.0,11.23


In [0]:
computed_hourly = (merged_df
           .withColumn("score", compute("tweet"))
           .withColumn("hour", hour(col("created_at")))
           .groupBy("hour")
           .agg(sum("score").alias("total_score"))
           .orderBy("hour")
           )
display(computed_hourly)

hour,total_score
0,179.0
1,92.0
2,78.0
3,34.0
4,39.0
5,77.0
6,96.0
7,176.0
8,232.0
9,271.0


Output can only be rendered in Databricks

In [0]:
score_day_hour = (merged_df
                 .withColumn("date", to_date(col("created_at")))
                 .withColumn("hour", hour(col("created_at")))
                 .withColumn("score", compute("tweet"))
                 .groupBy("hour")
                 .pivot("date")
                 .agg(sum("score").alias("total_score"))
                 .orderBy("hour")
                 .fillna(0)
                 .drop("hour")
                 )
display(score_day_hour)

2022-11-23,2022-11-24,2022-11-25,2022-11-26,2022-11-27,2022-11-28,2022-11-29,2022-11-30,2022-12-01,2022-12-02,2022-12-03,2022-12-04,2022-12-05,2022-12-06,2022-12-07,2022-12-08,2022-12-09,2022-12-10,2022-12-11,2022-12-12,2022-12-13,2022-12-14,2022-12-15
0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-1.0,0.0,0.0,65.0,0.0,0.0,0.0,0.0,2.0,58.0,5.0,14.0,39.0,60.0
0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,28.0,0.0,0.0,0.0,0.0,0.0,44.0,8.0,2.0,10.0,13.0
0.0,0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,1.0,0.0,21.0,0.0,0.0,0.0,0.0,0.0,35.0,5.0,6.0,9.0,34.0
0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,5.0,0.0,0.0,0.0,0.0,0.0,21.0,1.0,3.0,5.0,12.0
0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,2.0,0.0,0.0,6.0,0.0,0.0,0.0,0.0,2.0,18.0,1.0,4.0,6.0,24.0
0.0,0.0,0.0,0.0,0.0,0.0,1.0,0.0,0.0,0.0,0.0,0.0,28.0,0.0,0.0,0.0,0.0,0.0,32.0,1.0,5.0,10.0,38.0
0.0,0.0,0.0,0.0,0.0,0.0,0.0,0.0,-1.0,0.0,0.0,-1.0,28.0,1.0,0.0,0.0,0.0,1.0,24.0,14.0,0.0,30.0,59.0
0.0,0.0,1.0,0.0,0.0,-1.0,0.0,0.0,-1.0,1.0,0.0,1.0,54.0,0.0,0.0,2.0,0.0,-2.0,62.0,12.0,8.0,39.0,88.0
0.0,1.0,0.0,0.0,0.0,1.0,0.0,-2.0,-3.0,2.0,1.0,2.0,76.0,0.0,0.0,0.0,-1.0,3.0,91.0,13.0,21.0,48.0,64.0
0.0,0.0,0.0,0.0,0.0,1.0,0.0,2.0,-1.0,0.0,0.0,3.0,95.0,0.0,0.0,1.0,0.0,0.0,135.0,21.0,17.0,46.0,0.0


In [0]:
score_day_hour.write.option("path","/power_bi/score_day_hour").mode("overwrite").saveAsTable("score_day_hour")

In [0]:
tweet_count_hourly = (df
 .withColumn("hour", hour(col("created_at")))
 .groupBy("hour")
 .count()
 .orderBy("hour")
 )
display(tweet_count_hourly)

hour,count
0,495
1,343
2,202
3,148
4,148
5,250
6,417
7,631
8,817
9,965


In [0]:
score_hour_count = computed_hourly.join(tweet_count_hourly, "hour").orderBy("hour")
display(score_hour_count)

hour,total_score,count
0,64.0,495
1,28.0,343
2,23.0,202
3,5.0,148
4,8.0,148
5,30.0,250
6,26.0,417
7,53.0,631
8,78.0,817
9,98.0,965
